返回 导航

大数据

hangge.com

Flink - DataStream API使用详解1(转换算子:map、flatMap、filter、keyBy)

作者:hangge | 2025-02-18 10:15
    Flink 代码原生支持直接在 IDEA 中运行,便于本地调试。同时,Flink 支持 Java 语言和 Scala 语言,本文分别使用这两种语言来演示 Flink 的使用。

一、DataStream API 介绍

1,基本介绍

DataStream API 主要分为 3 块:DataSourceTransformationDataSink
  • DataSource 是程序的输入数据源。
  • Transformation 是具体的操作,它对一个或多个输入数据源进行计算处理,例如 mapflatMapfilter 等操作。
  • DataSink 是程序的输出,它可以把 Transformation 处理之后的数据输出到指定的存储介质中。

2,Transformation

    transformationFlink 程序的计算算子,负责对数据进行处理。Flink 提供了大量的算子,其实 Flink 中的大部分算子的使用和 spark 中算子的使用是一样的,具体见下方表格。
算 子 介 绍
map() 对数据流中的每个元素进行处理,输入一个元素返回一个元素(一进一出)
flatMap() 与 map()类似,但是每个元素都可以返回一个或多个新元素
filter() 对数据流中每个元素进行判断,如果返回 True 则将其保留,否则将其删除
keyBy() 根据 Key 对数据流进行分组
reduce() 对当前元素和上一次的结果进行聚合操作
aggregations() sum(),min(),max()等
union() 合并多个流,多个流的数据类型必须一致
connect() 只能连接两个流,两个流的数据类型可以不同
split() 根据规则把一个数据流切分为多个流
shuffle() 随机分区
rebalance() 对数据集进行再平衡,重分区,消除数据倾斜
rescale() 重分区
partitionCustom() 自定义分区

一、准备工作

1,添加依赖

我们创建一个 Maven 项目,然后在 pom.xml 文件中,添加 Flink 实时计算相关的依赖。
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.16.0</version>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.16.0</version>
</dependency>

二、DataStream API 的使用样例(Java 版)

1,map()

(1)下面代码使用 map 算子对数据流中的每个元素乘以 2
public class TransformationOp {
  public static void main(String[] args) throws Exception{
    // 获取 Flink 流处理执行环境
    StreamExecutionEnvironment env = getEnv();

    // 在测试阶段,可以使用 fromElements 构造实时数据流
    DataStreamSource<Integer> text = env.fromElements(1, 2, 3, 4, 5);
    // 处理数据
    SingleOutputStreamOperator<Integer> numStream = text.map(new MapFunction<Integer, Integer>() {
      @Override
      public Integer map(Integer value) throws Exception {
        return value * 2;
      }
    });
    // 执行打印操作
    numStream.print();

    // 执行 Flink 程序
    env.execute("TransformationOpJava");
  }

  // 获取 Flink 流处理执行环境
  private static StreamExecutionEnvironment getEnv() {
    return StreamExecutionEnvironment.getExecutionEnvironment();
  }
}
  • 我们也可以使用 Lambda 表达式简化代码编写:
public class TransformationOp {
  public static void main(String[] args) throws Exception{
    // 获取 Flink 流处理执行环境
    StreamExecutionEnvironment env = getEnv();

    // 在测试阶段,可以使用 fromElements 构造实时数据流
    DataStreamSource<Integer> text = env.fromElements(1, 2, 3, 4, 5);
    // 处理数据
    SingleOutputStreamOperator<Integer> numStream = text.map(value -> value * 2);
    // 执行打印操作
    numStream.print();

    // 执行 Flink 程序
    env.execute("TransformationOpJava");
  }

  // 获取 Flink 流处理执行环境
  private static StreamExecutionEnvironment getEnv() {
    return StreamExecutionEnvironment.getExecutionEnvironment();
  }
}

(2)运行结果如下:
注意:在 Flink 中,由于数据流是并行处理的,打印的结果可能不是按照输入顺序输出的。这是因为在流处理中,数据会被并行处理,并可能在不同的任务管理器或线程中处理。打印结果中 > 前面的数字就是并行度编号。

2,flatMap()

下面代码使用 flatMap 算子将数据流中的每行数据拆分为单词。
public class TransformationOp {
  public static void main(String[] args) throws Exception{
    // 获取 Flink 流处理执行环境
    StreamExecutionEnvironment env = getEnv();

    // 在测试阶段,可以使用 fromElements 构造实时数据流
    DataStreamSource<String> text = env.fromElements("hello hangge", "hello world");
    // 处理数据
    SingleOutputStreamOperator<String> wordStream = text.flatMap(
            new FlatMapFunction<String, String>() {
              @Override
              public void flatMap(String line, Collector<String> out) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                  out.collect(word);
                }
              }
    });
    // 使用一个线程执行打印操作
    wordStream.print();

    // 执行 Flink 程序
    env.execute("TransformationOpJava");
  }

  // 获取 Flink 流处理执行环境
  private static StreamExecutionEnvironment getEnv() {
    return StreamExecutionEnvironment.getExecutionEnvironment();
  }
}
  • 我们也可以使用 Lambda 表达式简化代码编写:
public class TransformationOp {
  public static void main(String[] args) throws Exception{
    // 获取 Flink 流处理执行环境
    StreamExecutionEnvironment env = getEnv();

    // 在测试阶段,可以使用 fromElements 构造实时数据流
    DataStreamSource<String> text = env.fromElements("hello hangge", "hello world");
    // 处理数据
    SingleOutputStreamOperator<String> wordStream = text.flatMap(
            (String line, Collector<String> out) ->
                    Arrays.stream(line.split(" ")).forEach(out::collect)
    ).returns(Types.STRING);
    // 使用一个线程执行打印操作
    wordStream.print();

    // 执行 Flink 程序
    env.execute("TransformationOpJava");
  }

  // 获取 Flink 流处理执行环境
  private static StreamExecutionEnvironment getEnv() {
    return StreamExecutionEnvironment.getExecutionEnvironment();
  }
}

(2) 运行结果如下:

3,filter()

下面代码使用 filter 算子过滤出数据流中的偶数。
public class TransformationOp {
  public static void main(String[] args) throws Exception{
    // 获取 Flink 流处理执行环境
    StreamExecutionEnvironment env = getEnv();

    // 在测试阶段,可以使用 fromElements 构造实时数据流
    DataStreamSource<Integer> text = env.fromElements(1, 2, 3, 4, 5);
    // 处理数据
    SingleOutputStreamOperator<Integer> numStream = text.filter(new FilterFunction<Integer>() {
      @Override
      public boolean filter(Integer num) throws Exception {
        return num % 2 == 0;
      }
    });
    // 使用一个线程执行打印操作
    numStream.print();

    // 执行 Flink 程序
    env.execute("TransformationOpJava");
  }

  // 获取 Flink 流处理执行环境
  private static StreamExecutionEnvironment getEnv() {
    return StreamExecutionEnvironment.getExecutionEnvironment();
  }
}
  • 我们也可以使用 Lambda 表达式简化代码编写:
public class TransformationOp {
  public static void main(String[] args) throws Exception{
    // 获取 Flink 流处理执行环境
    StreamExecutionEnvironment env = getEnv();

    // 在测试阶段,可以使用 fromElements 构造实时数据流
    DataStreamSource<Integer> text = env.fromElements(1, 2, 3, 4, 5);
    // 处理数据
    SingleOutputStreamOperator<Integer> numStream = text.filter(value -> value % 2 == 0);
    // 使用一个线程执行打印操作
    numStream.print();

    // 执行 Flink 程序
    env.execute("TransformationOpJava");
  }

  // 获取 Flink 流处理执行环境
  private static StreamExecutionEnvironment getEnv() {
    return StreamExecutionEnvironment.getExecutionEnvironment();
  }
}

(2)运行结果如下:

4,keyBy()

下面代码使用 keyBy 算子对数据流中的单词进行分组。
public class TransformationOp {
  public static void main(String[] args) throws Exception{
    // 获取 Flink 流处理执行环境
    StreamExecutionEnvironment env = getEnv();

    // 在测试阶段,可以使用 fromElements 构造实时数据流
    DataStreamSource<String> text = env.fromElements("hello", "hangge", "hello", "world");
    // 处理数据
    SingleOutputStreamOperator<Tuple2<String, Integer>> wordCountStream = text.map(
            new MapFunction<String, Tuple2<String, Integer>>() {
              @Override
              public Tuple2<String, Integer> map(String word) throws Exception {
                return new Tuple2<>(word, 1);
              }
    });
    KeyedStream<Tuple2<String, Integer>, String> keyByStream = wordCountStream.keyBy(
            new KeySelector<Tuple2<String, Integer>, String>() {
              public String getKey(Tuple2<String, Integer> tup) throws Exception {
                return tup.f0;
              }
    });
    // 执行打印操作
    keyByStream.print();

    // 执行 Flink 程序
    env.execute("TransformationOpJava");
  }

  // 获取 Flink 流处理执行环境
  private static StreamExecutionEnvironment getEnv() {
    return StreamExecutionEnvironment.getExecutionEnvironment();
  }
}
  • 我们也可以使用 Lambda 表达式简化代码编写:
public class TransformationOp {
  public static void main(String[] args) throws Exception {
    // 获取 Flink 流处理执行环境
    StreamExecutionEnvironment env = getEnv();

    // 在测试阶段,可以使用 fromElements 构造实时数据流
    DataStreamSource<String> text = env.fromElements("hello", "hangge", "hello", "world");
    // 处理数据
    SingleOutputStreamOperator<Tuple2<String, Integer>> wordCountStream = text
            .map(word -> new Tuple2<>(word, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
    KeyedStream<Tuple2<String, Integer>, String> keyByStream = wordCountStream
            .keyBy(tuple -> tuple.f0);
    // 执行打印操作
    keyByStream.print();

    // 执行 Flink 程序
    env.execute("TransformationOpJava");
  }

  // 获取 Flink 流处理执行环境
  private static StreamExecutionEnvironment getEnv() {
    return StreamExecutionEnvironment.getExecutionEnvironment();
  }
}
(2)运行结果如下,可以看到两个 hello 的并行度编号是一样的。因为使用 keyBy 对数据流进行分组时,Flink 将会根据指定的键进行分区,然后每个分区内的数据都由一个任务处理。因此,并行度编号可能会随着 keyBy 操作的引入而发生变化。

三、DataStream API 的使用样例(Scala 版)

注意:如果想要使用 Scala 语言编写程序的话,开发工具要安装 Scala 插件并进行相关配置以实现对 Scala 的支持,具体可以参考我之前写的文章:

1,map()

(1)下面代码使用 map 算子对数据流中的每个元素乘以 2
object TransformationOpScala {
  //注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错
  import org.apache.flink.api.scala._
  def main(args: Array[String]): Unit = {
    // 获取 Flink 流处理执行环境
    val env = getEnv

    //在测试阶段,可以使用 fromElements 构造实时数据流
    val text = env.fromElements(1, 2, 3, 4, 5)
    //处理数据
    val numStream = text.map(_ * 2)
    //程执行打印操作
    numStream.print()

    //执行程序
    env.execute("TransformationOpScala")
  }

  private def getEnv = {
    StreamExecutionEnvironment.getExecutionEnvironment
  }
}

(2)运行结果如下:
注意:在 Flink 中,由于数据流是并行处理的,打印的结果可能不是按照输入顺序输出的。这是因为在流处理中,数据会被并行处理,并可能在不同的任务管理器或线程中处理。打印结果中 > 前面的数字就是并行度编号。

2,flatMap()

下面代码使用 flatMap 算子将数据流中的每行数据拆分为单词。
object TransformationOpScala {
  //注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错
  import org.apache.flink.api.scala._
  def main(args: Array[String]): Unit = {
    // 获取 Flink 流处理执行环境
    val env = getEnv

    //在测试阶段,可以使用 fromElements 构造实时数据流
    val text = env.fromElements("hello hangge", "hello world")
    //处理数据
    val wordStream = text.flatMap(_.split(" "))
    //程执行打印操作
    wordStream.print()

    //执行程序
    env.execute("TransformationOpScala")
  }

  private def getEnv = {
    StreamExecutionEnvironment.getExecutionEnvironment
  }
}

3,filter()

下面代码使用 filter 算子过滤出数据流中的偶数。
object TransformationOpScala {
  //注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错
  import org.apache.flink.api.scala._
  def main(args: Array[String]): Unit = {
    // 获取 Flink 流处理执行环境
    val env = getEnv

    //在测试阶段,可以使用 fromElements 构造实时数据流
    val text = env.fromElements(1, 2, 3, 4, 5)
    //处理数据
    val numStream = text.filter(_ % 2 == 0)
    //执行打印操作
    numStream.print()

    //执行程序
    env.execute("TransformationOpScala")
  }

  private def getEnv = {
    StreamExecutionEnvironment.getExecutionEnvironment
  }
}

4,keyBy()

下面代码使用 keyBy 算子对数据流中的单词进行分组。
object TransformationOpScala {
  //注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错
  import org.apache.flink.api.scala._
  def main(args: Array[String]): Unit = {
    // 获取 Flink 流处理执行环境
    val env = getEnv

    //在测试阶段,可以使用 fromElements 构造实时数据流
    val text = env.fromElements("hello","hangge","hello","world")
    //处理数据
    val wordCountStream = text.map((_,1))
    val keyByStream = wordCountStream.keyBy(_._1)
    //执行打印操作
    keyByStream.print()

    //执行程序
    env.execute("TransformationOpScala")
  }

  private def getEnv = {
    StreamExecutionEnvironment.getExecutionEnvironment
  }
}

(2)运行结果如下,可以看到两个 hello 的并行度编号是一样的。因为使用 keyBy 对数据流进行分组时,Flink 将会根据指定的键进行分区,然后每个分区内的数据都由一个任务处理。因此,并行度编号可能会随着 keyBy 操作的引入而发生变化。
评论

全部评论(0)

回到顶部