Requirements
The only requirements are working Maven 3.0.4 (or higher) and Java 8.x installations.
官网
Create Project
- 使用 maven 命令:
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.8.0 \
-DarchetypeCatalog=local
2, 使用 flink 提供的 maven 命令:
$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.8.0
开发流程 (批处理)
- 1: 建立一个批处理执行的上下文.
- 2: 读如数据
- 3: 转换操作: 业务逻辑的核心所在.
- 4: 执行程序: execute program.
Java Demo
public static void main(String[] args) throws Exception {
String input = Paths.get("src/test/resources/wc.txt")
.toUri().toString();
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> text = env.readTextFile(input);
text.print();
FlatMapOperator<String, Tuple2<String, Integer>> stringTuple2FlatMapOperator = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens = s.toLowerCase()
.split("\\s");
for (String token : tokens) {
if (token.length() > 0) {
collector.collect(new Tuple2<>(token, 1));
}
}
}
});
AggregateOperator<Tuple2<String, Integer>> sum = stringTuple2FlatMapOperator.groupBy(0).sum(1);
sum.print();
}
Scala Demo
Maven 命令行
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.8.0 \
-DarchetypeCatalog=local
$ mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-scala \
-DarchetypeVersion=1.14.0 \
-DgroupId=flink-demo \
-DartifactId=flink-demo \
-Dversion=0.1 \
-Dpackage=spendreport \
-DinteractiveMode=false
数据源
StreamExecutionEnvironment.addSource(sourceFunction)
sourceFunction :
- 实现 SourceFunction (非并行源)
- 实现 ParallelSourceFunction 接口(并行源)
- 继承 RichParallelSourceFunction (并行源)
Scala 代码
package vip.chengchao.lsn001
import java.nio.file.Paths
import org.apache.flink.api.scala.ExecutionEnvironment
/**
* <p>
* <strong>
* 用一句话描述功能
* </strong><br /><br />
* 如题
* </p>
*
* @author chengchao - 2019-04-13 13:20 <br />
* @since [产品模块版本]
* @see [相关类方法]
*
*/
object ScalaWordCoundApp {
def main(args:Array[String]): Unit = {
val path = Paths.get("src/test/resources/wc.txt")
.toUri.toString
val env:ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
val text = env.readTextFile(path)
text.print()
import org.apache.flink.api.scala._
text.flatMap(_.toLowerCase.split("\\s"))
.filter(_.nonEmpty)
.map((_, 1))
.groupBy(0)
.sum(1)
.print()
}
}
Flink 编程模型
- 1: 获取执行环境
- 2: 获取数据
- 3: 作用 transformation
- 4: sink
- 5: 触发执行 (
env.execute
)
EOF.