Flink - DataStream API使用详解1(转换算子:map、flatMap、filter、keyBy)
作者:hangge | 2025-02-18 10:15
Flink 代码原生支持直接在 IDEA 中运行,便于本地调试。同时,Flink 支持 Java 语言和 Scala 语言,本文分别使用这两种语言来演示 Flink 的使用。
一、DataStream API 介绍
1,基本介绍
DataStream API 主要分为 3 块:DataSource、Transformation、DataSink。
- DataSource 是程序的输入数据源。
- Transformation 是具体的操作,它对一个或多个输入数据源进行计算处理,例如 map、flatMap 和 filter 等操作。
- DataSink 是程序的输出,它可以把 Transformation 处理之后的数据输出到指定的存储介质中。
2,Transformation
transformation 是 Flink 程序的计算算子,负责对数据进行处理。Flink 提供了大量的算子,其实 Flink 中的大部分算子的使用和 spark 中算子的使用是一样的,具体见下方表格。
算 子 | 介 绍 |
map() | 对数据流中的每个元素进行处理,输入一个元素返回一个元素(一进一出) |
flatMap() | 与 map()类似,但是每个元素都可以返回一个或多个新元素 |
filter() | 对数据流中每个元素进行判断,如果返回 True 则将其保留,否则将其删除 |
keyBy() | 根据 Key 对数据流进行分组 |
reduce() | 对当前元素和上一次的结果进行聚合操作 |
aggregations() | sum(),min(),max()等 |
union() | 合并多个流,多个流的数据类型必须一致 |
connect() | 只能连接两个流,两个流的数据类型可以不同 |
split() | 根据规则把一个数据流切分为多个流 |
shuffle() | 随机分区 |
rebalance() | 对数据集进行再平衡,重分区,消除数据倾斜 |
rescale() | 重分区 |
partitionCustom() | 自定义分区 |
一、准备工作
1,添加依赖
我们创建一个 Maven 项目,然后在 pom.xml 文件中,添加 Flink 实时计算相关的依赖。
(2)运行结果如下:
(2) 运行结果如下:
(2)运行结果如下:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.16.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.16.0</version> </dependency>
二、DataStream API 的使用样例(Java 版)
1,map()
(1)下面代码使用 map 算子对数据流中的每个元素乘以 2。
public class TransformationOp { public static void main(String[] args) throws Exception{ // 获取 Flink 流处理执行环境 StreamExecutionEnvironment env = getEnv(); // 在测试阶段,可以使用 fromElements 构造实时数据流 DataStreamSource<Integer> text = env.fromElements(1, 2, 3, 4, 5); // 处理数据 SingleOutputStreamOperator<Integer> numStream = text.map(new MapFunction<Integer, Integer>() { @Override public Integer map(Integer value) throws Exception { return value * 2; } }); // 执行打印操作 numStream.print(); // 执行 Flink 程序 env.execute("TransformationOpJava"); } // 获取 Flink 流处理执行环境 private static StreamExecutionEnvironment getEnv() { return StreamExecutionEnvironment.getExecutionEnvironment(); } }
- 我们也可以使用 Lambda 表达式简化代码编写:
public class TransformationOp { public static void main(String[] args) throws Exception{ // 获取 Flink 流处理执行环境 StreamExecutionEnvironment env = getEnv(); // 在测试阶段,可以使用 fromElements 构造实时数据流 DataStreamSource<Integer> text = env.fromElements(1, 2, 3, 4, 5); // 处理数据 SingleOutputStreamOperator<Integer> numStream = text.map(value -> value * 2); // 执行打印操作 numStream.print(); // 执行 Flink 程序 env.execute("TransformationOpJava"); } // 获取 Flink 流处理执行环境 private static StreamExecutionEnvironment getEnv() { return StreamExecutionEnvironment.getExecutionEnvironment(); } }
(2)运行结果如下:
注意:在 Flink 中,由于数据流是并行处理的,打印的结果可能不是按照输入顺序输出的。这是因为在流处理中,数据会被并行处理,并可能在不同的任务管理器或线程中处理。打印结果中 > 前面的数字就是并行度编号。

2,flatMap()
下面代码使用 flatMap 算子将数据流中的每行数据拆分为单词。
public class TransformationOp { public static void main(String[] args) throws Exception{ // 获取 Flink 流处理执行环境 StreamExecutionEnvironment env = getEnv(); // 在测试阶段,可以使用 fromElements 构造实时数据流 DataStreamSource<String> text = env.fromElements("hello hangge", "hello world"); // 处理数据 SingleOutputStreamOperator<String> wordStream = text.flatMap( new FlatMapFunction<String, String>() { @Override public void flatMap(String line, Collector<String> out) throws Exception { String[] words = line.split(" "); for (String word : words) { out.collect(word); } } }); // 使用一个线程执行打印操作 wordStream.print(); // 执行 Flink 程序 env.execute("TransformationOpJava"); } // 获取 Flink 流处理执行环境 private static StreamExecutionEnvironment getEnv() { return StreamExecutionEnvironment.getExecutionEnvironment(); } }
- 我们也可以使用 Lambda 表达式简化代码编写:
public class TransformationOp { public static void main(String[] args) throws Exception{ // 获取 Flink 流处理执行环境 StreamExecutionEnvironment env = getEnv(); // 在测试阶段,可以使用 fromElements 构造实时数据流 DataStreamSource<String> text = env.fromElements("hello hangge", "hello world"); // 处理数据 SingleOutputStreamOperator<String> wordStream = text.flatMap( (String line, Collector<String> out) -> Arrays.stream(line.split(" ")).forEach(out::collect) ).returns(Types.STRING); // 使用一个线程执行打印操作 wordStream.print(); // 执行 Flink 程序 env.execute("TransformationOpJava"); } // 获取 Flink 流处理执行环境 private static StreamExecutionEnvironment getEnv() { return StreamExecutionEnvironment.getExecutionEnvironment(); } }
(2) 运行结果如下:

3,filter()
下面代码使用 filter 算子过滤出数据流中的偶数。
public class TransformationOp { public static void main(String[] args) throws Exception{ // 获取 Flink 流处理执行环境 StreamExecutionEnvironment env = getEnv(); // 在测试阶段,可以使用 fromElements 构造实时数据流 DataStreamSource<Integer> text = env.fromElements(1, 2, 3, 4, 5); // 处理数据 SingleOutputStreamOperator<Integer> numStream = text.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer num) throws Exception { return num % 2 == 0; } }); // 使用一个线程执行打印操作 numStream.print(); // 执行 Flink 程序 env.execute("TransformationOpJava"); } // 获取 Flink 流处理执行环境 private static StreamExecutionEnvironment getEnv() { return StreamExecutionEnvironment.getExecutionEnvironment(); } }
- 我们也可以使用 Lambda 表达式简化代码编写:
public class TransformationOp { public static void main(String[] args) throws Exception{ // 获取 Flink 流处理执行环境 StreamExecutionEnvironment env = getEnv(); // 在测试阶段,可以使用 fromElements 构造实时数据流 DataStreamSource<Integer> text = env.fromElements(1, 2, 3, 4, 5); // 处理数据 SingleOutputStreamOperator<Integer> numStream = text.filter(value -> value % 2 == 0); // 使用一个线程执行打印操作 numStream.print(); // 执行 Flink 程序 env.execute("TransformationOpJava"); } // 获取 Flink 流处理执行环境 private static StreamExecutionEnvironment getEnv() { return StreamExecutionEnvironment.getExecutionEnvironment(); } }
(2)运行结果如下:

4,keyBy()
下面代码使用 keyBy 算子对数据流中的单词进行分组。
public class TransformationOp { public static void main(String[] args) throws Exception{ // 获取 Flink 流处理执行环境 StreamExecutionEnvironment env = getEnv(); // 在测试阶段,可以使用 fromElements 构造实时数据流 DataStreamSource<String> text = env.fromElements("hello", "hangge", "hello", "world"); // 处理数据 SingleOutputStreamOperator<Tuple2<String, Integer>> wordCountStream = text.map( new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String word) throws Exception { return new Tuple2<>(word, 1); } }); KeyedStream<Tuple2<String, Integer>, String> keyByStream = wordCountStream.keyBy( new KeySelector<Tuple2<String, Integer>, String>() { public String getKey(Tuple2<String, Integer> tup) throws Exception { return tup.f0; } }); // 执行打印操作 keyByStream.print(); // 执行 Flink 程序 env.execute("TransformationOpJava"); } // 获取 Flink 流处理执行环境 private static StreamExecutionEnvironment getEnv() { return StreamExecutionEnvironment.getExecutionEnvironment(); } }
- 我们也可以使用 Lambda 表达式简化代码编写:
public class TransformationOp { public static void main(String[] args) throws Exception { // 获取 Flink 流处理执行环境 StreamExecutionEnvironment env = getEnv(); // 在测试阶段,可以使用 fromElements 构造实时数据流 DataStreamSource<String> text = env.fromElements("hello", "hangge", "hello", "world"); // 处理数据 SingleOutputStreamOperator<Tuple2<String, Integer>> wordCountStream = text .map(word -> new Tuple2<>(word, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)); KeyedStream<Tuple2<String, Integer>, String> keyByStream = wordCountStream .keyBy(tuple -> tuple.f0); // 执行打印操作 keyByStream.print(); // 执行 Flink 程序 env.execute("TransformationOpJava"); } // 获取 Flink 流处理执行环境 private static StreamExecutionEnvironment getEnv() { return StreamExecutionEnvironment.getExecutionEnvironment(); } }(2)运行结果如下,可以看到两个 hello 的并行度编号是一样的。因为使用 keyBy 对数据流进行分组时,Flink 将会根据指定的键进行分区,然后每个分区内的数据都由一个任务处理。因此,并行度编号可能会随着 keyBy 操作的引入而发生变化。

三、DataStream API 的使用样例(Scala 版)
注意:如果想要使用 Scala 语言编写程序的话,开发工具要安装 Scala 插件并进行相关配置以实现对 Scala 的支持,具体可以参考我之前写的文章:
1,map()
(1)下面代码使用 map 算子对数据流中的每个元素乘以 2。
object TransformationOpScala { //注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错 import org.apache.flink.api.scala._ def main(args: Array[String]): Unit = { // 获取 Flink 流处理执行环境 val env = getEnv //在测试阶段,可以使用 fromElements 构造实时数据流 val text = env.fromElements(1, 2, 3, 4, 5) //处理数据 val numStream = text.map(_ * 2) //程执行打印操作 numStream.print() //执行程序 env.execute("TransformationOpScala") } private def getEnv = { StreamExecutionEnvironment.getExecutionEnvironment } }
(2)运行结果如下:
注意:在 Flink 中,由于数据流是并行处理的,打印的结果可能不是按照输入顺序输出的。这是因为在流处理中,数据会被并行处理,并可能在不同的任务管理器或线程中处理。打印结果中 > 前面的数字就是并行度编号。

2,flatMap()
下面代码使用 flatMap 算子将数据流中的每行数据拆分为单词。
object TransformationOpScala { //注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错 import org.apache.flink.api.scala._ def main(args: Array[String]): Unit = { // 获取 Flink 流处理执行环境 val env = getEnv //在测试阶段,可以使用 fromElements 构造实时数据流 val text = env.fromElements("hello hangge", "hello world") //处理数据 val wordStream = text.flatMap(_.split(" ")) //程执行打印操作 wordStream.print() //执行程序 env.execute("TransformationOpScala") } private def getEnv = { StreamExecutionEnvironment.getExecutionEnvironment } }

3,filter()
下面代码使用 filter 算子过滤出数据流中的偶数。
object TransformationOpScala { //注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错 import org.apache.flink.api.scala._ def main(args: Array[String]): Unit = { // 获取 Flink 流处理执行环境 val env = getEnv //在测试阶段,可以使用 fromElements 构造实时数据流 val text = env.fromElements(1, 2, 3, 4, 5) //处理数据 val numStream = text.filter(_ % 2 == 0) //执行打印操作 numStream.print() //执行程序 env.execute("TransformationOpScala") } private def getEnv = { StreamExecutionEnvironment.getExecutionEnvironment } }

4,keyBy()
下面代码使用 keyBy 算子对数据流中的单词进行分组。
object TransformationOpScala { //注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错 import org.apache.flink.api.scala._ def main(args: Array[String]): Unit = { // 获取 Flink 流处理执行环境 val env = getEnv //在测试阶段,可以使用 fromElements 构造实时数据流 val text = env.fromElements("hello","hangge","hello","world") //处理数据 val wordCountStream = text.map((_,1)) val keyByStream = wordCountStream.keyBy(_._1) //执行打印操作 keyByStream.print() //执行程序 env.execute("TransformationOpScala") } private def getEnv = { StreamExecutionEnvironment.getExecutionEnvironment } }
(2)运行结果如下,可以看到两个 hello 的并行度编号是一样的。因为使用 keyBy 对数据流进行分组时,Flink 将会根据指定的键进行分区,然后每个分区内的数据都由一个任务处理。因此,并行度编号可能会随着 keyBy 操作的引入而发生变化。

全部评论(0)