Flink 学习笔记

 

Requirements

The only requirements are working Maven 3.0.4 (or higher) and Java 8.x installations.

官网

https://flink.apache.org/

Create Project

  1. 使用 maven 命令:

$ mvn archetype:generate                         \
  -DarchetypeGroupId=org.apache.flink            \
  -DarchetypeArtifactId=flink-quickstart-java    \
  -DarchetypeVersion=1.8.0    \
  -DarchetypeCatalog=local

2, 使用 flink 提供的 maven 命令:

$ curl https://flink.apache.org/q/quickstart.sh | bash -s 1.8.0

开发流程 (批处理)

  • 1: 建立一个批处理执行的上下文.
  • 2: 读如数据
  • 3: 转换操作: 业务逻辑的核心所在.
  • 4: 执行程序: execute program.

Java Demo



    public static void main(String[] args) throws Exception {
        String input = Paths.get("src/test/resources/wc.txt")
                .toUri().toString();

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<String> text = env.readTextFile(input);

        text.print();

        FlatMapOperator<String, Tuple2<String, Integer>> stringTuple2FlatMapOperator = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] tokens = s.toLowerCase()
                        .split("\\s");
                for (String token : tokens) {
                    if (token.length() > 0) {
                        collector.collect(new Tuple2<>(token, 1));
                    }
                }
            }

        });

        AggregateOperator<Tuple2<String, Integer>> sum = stringTuple2FlatMapOperator.groupBy(0).sum(1);


        sum.print();


    }

Scala Demo

Maven 命令行

$ mvn archetype:generate  \
  -DarchetypeGroupId=org.apache.flink  \
  -DarchetypeArtifactId=flink-quickstart-scala  \
  -DarchetypeVersion=1.8.0  \
  -DarchetypeCatalog=local

$ mvn archetype:generate \
    -DarchetypeGroupId=org.apache.flink \
    -DarchetypeArtifactId=flink-quickstart-scala \
    -DarchetypeVersion=1.14.0 \
    -DgroupId=flink-demo \
    -DartifactId=flink-demo \
    -Dversion=0.1 \
    -Dpackage=spendreport \
    -DinteractiveMode=false

数据源

StreamExecutionEnvironment.addSource(sourceFunction)

sourceFunction :

  1. 实现 SourceFunction (非并行源)
  2. 实现 ParallelSourceFunction 接口(并行源)
  3. 继承 RichParallelSourceFunction (并行源)

Scala 代码



package vip.chengchao.lsn001

import java.nio.file.Paths

import org.apache.flink.api.scala.ExecutionEnvironment

/**
  * <p>
  * <strong>
  * 用一句话描述功能
  * </strong><br /><br />
  * 如题
  * </p>
  *
  * @author chengchao - 2019-04-13 13:20 <br />
  * @since [产品模块版本]
  * @see [相关类方法]
  *
  */
object ScalaWordCoundApp {


  def main(args:Array[String]): Unit = {

    val path = Paths.get("src/test/resources/wc.txt")
      .toUri.toString

    val env:ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

    val text = env.readTextFile(path)
    text.print()

    import org.apache.flink.api.scala._

    text.flatMap(_.toLowerCase.split("\\s"))
      .filter(_.nonEmpty)
      .map((_, 1))
      .groupBy(0)
      .sum(1)
      .print()


  }

}

  • 1: 获取执行环境
  • 2: 获取数据
  • 3: 作用 transformation
  • 4: sink
  • 5: 触发执行 (env.execute)

EOF.