Flink - DataStream API使用详解1(转换算子:map、flatMap、filter、keyBy)
作者:hangge | 2025-02-18 10:15
Flink 代码原生支持直接在 IDEA 中运行,便于本地调试。同时,Flink 支持 Java 语言和 Scala 语言,本文分别使用这两种语言来演示 Flink 的使用。
一、DataStream API 介绍
1,基本介绍
DataStream API 主要分为 3 块:DataSource、Transformation、DataSink。
- DataSource 是程序的输入数据源。
- Transformation 是具体的操作,它对一个或多个输入数据源进行计算处理,例如 map、flatMap 和 filter 等操作。
- DataSink 是程序的输出,它可以把 Transformation 处理之后的数据输出到指定的存储介质中。
2,Transformation
transformation 是 Flink 程序的计算算子,负责对数据进行处理。Flink 提供了大量的算子,其实 Flink 中的大部分算子的使用和 spark 中算子的使用是一样的,具体见下方表格。
| 算 子 | 介 绍 |
| map() | 对数据流中的每个元素进行处理,输入一个元素返回一个元素(一进一出) |
| flatMap() | 与 map()类似,但是每个元素都可以返回一个或多个新元素 |
| filter() | 对数据流中每个元素进行判断,如果返回 True 则将其保留,否则将其删除 |
| keyBy() | 根据 Key 对数据流进行分组 |
| reduce() | 对当前元素和上一次的结果进行聚合操作 |
| aggregations() | sum(),min(),max()等 |
| union() | 合并多个流,多个流的数据类型必须一致 |
| connect() | 只能连接两个流,两个流的数据类型可以不同 |
| split() | 根据规则把一个数据流切分为多个流 |
| shuffle() | 随机分区 |
| rebalance() | 对数据集进行再平衡,重分区,消除数据倾斜 |
| rescale() | 重分区 |
| partitionCustom() | 自定义分区 |
一、准备工作
1,添加依赖
我们创建一个 Maven 项目,然后在 pom.xml 文件中,添加 Flink 实时计算相关的依赖。
(2)运行结果如下:
(2) 运行结果如下:
(2)运行结果如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.0</version>
</dependency>
二、DataStream API 的使用样例(Java 版)
1,map()
(1)下面代码使用 map 算子对数据流中的每个元素乘以 2。
public class TransformationOp {
public static void main(String[] args) throws Exception{
// 获取 Flink 流处理执行环境
StreamExecutionEnvironment env = getEnv();
// 在测试阶段,可以使用 fromElements 构造实时数据流
DataStreamSource<Integer> text = env.fromElements(1, 2, 3, 4, 5);
// 处理数据
SingleOutputStreamOperator<Integer> numStream = text.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return value * 2;
}
});
// 执行打印操作
numStream.print();
// 执行 Flink 程序
env.execute("TransformationOpJava");
}
// 获取 Flink 流处理执行环境
private static StreamExecutionEnvironment getEnv() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
}
- 我们也可以使用 Lambda 表达式简化代码编写:
public class TransformationOp {
public static void main(String[] args) throws Exception{
// 获取 Flink 流处理执行环境
StreamExecutionEnvironment env = getEnv();
// 在测试阶段,可以使用 fromElements 构造实时数据流
DataStreamSource<Integer> text = env.fromElements(1, 2, 3, 4, 5);
// 处理数据
SingleOutputStreamOperator<Integer> numStream = text.map(value -> value * 2);
// 执行打印操作
numStream.print();
// 执行 Flink 程序
env.execute("TransformationOpJava");
}
// 获取 Flink 流处理执行环境
private static StreamExecutionEnvironment getEnv() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
}
(2)运行结果如下:
注意:在 Flink 中,由于数据流是并行处理的,打印的结果可能不是按照输入顺序输出的。这是因为在流处理中,数据会被并行处理,并可能在不同的任务管理器或线程中处理。打印结果中 > 前面的数字就是并行度编号。

2,flatMap()
下面代码使用 flatMap 算子将数据流中的每行数据拆分为单词。
public class TransformationOp {
public static void main(String[] args) throws Exception{
// 获取 Flink 流处理执行环境
StreamExecutionEnvironment env = getEnv();
// 在测试阶段,可以使用 fromElements 构造实时数据流
DataStreamSource<String> text = env.fromElements("hello hangge", "hello world");
// 处理数据
SingleOutputStreamOperator<String> wordStream = text.flatMap(
new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
});
// 使用一个线程执行打印操作
wordStream.print();
// 执行 Flink 程序
env.execute("TransformationOpJava");
}
// 获取 Flink 流处理执行环境
private static StreamExecutionEnvironment getEnv() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
}
- 我们也可以使用 Lambda 表达式简化代码编写:
public class TransformationOp {
public static void main(String[] args) throws Exception{
// 获取 Flink 流处理执行环境
StreamExecutionEnvironment env = getEnv();
// 在测试阶段,可以使用 fromElements 构造实时数据流
DataStreamSource<String> text = env.fromElements("hello hangge", "hello world");
// 处理数据
SingleOutputStreamOperator<String> wordStream = text.flatMap(
(String line, Collector<String> out) ->
Arrays.stream(line.split(" ")).forEach(out::collect)
).returns(Types.STRING);
// 使用一个线程执行打印操作
wordStream.print();
// 执行 Flink 程序
env.execute("TransformationOpJava");
}
// 获取 Flink 流处理执行环境
private static StreamExecutionEnvironment getEnv() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
}
(2) 运行结果如下:
3,filter()
下面代码使用 filter 算子过滤出数据流中的偶数。
public class TransformationOp {
public static void main(String[] args) throws Exception{
// 获取 Flink 流处理执行环境
StreamExecutionEnvironment env = getEnv();
// 在测试阶段,可以使用 fromElements 构造实时数据流
DataStreamSource<Integer> text = env.fromElements(1, 2, 3, 4, 5);
// 处理数据
SingleOutputStreamOperator<Integer> numStream = text.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer num) throws Exception {
return num % 2 == 0;
}
});
// 使用一个线程执行打印操作
numStream.print();
// 执行 Flink 程序
env.execute("TransformationOpJava");
}
// 获取 Flink 流处理执行环境
private static StreamExecutionEnvironment getEnv() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
}
- 我们也可以使用 Lambda 表达式简化代码编写:
public class TransformationOp {
public static void main(String[] args) throws Exception{
// 获取 Flink 流处理执行环境
StreamExecutionEnvironment env = getEnv();
// 在测试阶段,可以使用 fromElements 构造实时数据流
DataStreamSource<Integer> text = env.fromElements(1, 2, 3, 4, 5);
// 处理数据
SingleOutputStreamOperator<Integer> numStream = text.filter(value -> value % 2 == 0);
// 使用一个线程执行打印操作
numStream.print();
// 执行 Flink 程序
env.execute("TransformationOpJava");
}
// 获取 Flink 流处理执行环境
private static StreamExecutionEnvironment getEnv() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
}
(2)运行结果如下:

4,keyBy()
下面代码使用 keyBy 算子对数据流中的单词进行分组。
public class TransformationOp {
public static void main(String[] args) throws Exception{
// 获取 Flink 流处理执行环境
StreamExecutionEnvironment env = getEnv();
// 在测试阶段,可以使用 fromElements 构造实时数据流
DataStreamSource<String> text = env.fromElements("hello", "hangge", "hello", "world");
// 处理数据
SingleOutputStreamOperator<Tuple2<String, Integer>> wordCountStream = text.map(
new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return new Tuple2<>(word, 1);
}
});
KeyedStream<Tuple2<String, Integer>, String> keyByStream = wordCountStream.keyBy(
new KeySelector<Tuple2<String, Integer>, String>() {
public String getKey(Tuple2<String, Integer> tup) throws Exception {
return tup.f0;
}
});
// 执行打印操作
keyByStream.print();
// 执行 Flink 程序
env.execute("TransformationOpJava");
}
// 获取 Flink 流处理执行环境
private static StreamExecutionEnvironment getEnv() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
}
- 我们也可以使用 Lambda 表达式简化代码编写:
public class TransformationOp {
public static void main(String[] args) throws Exception {
// 获取 Flink 流处理执行环境
StreamExecutionEnvironment env = getEnv();
// 在测试阶段,可以使用 fromElements 构造实时数据流
DataStreamSource<String> text = env.fromElements("hello", "hangge", "hello", "world");
// 处理数据
SingleOutputStreamOperator<Tuple2<String, Integer>> wordCountStream = text
.map(word -> new Tuple2<>(word, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyByStream = wordCountStream
.keyBy(tuple -> tuple.f0);
// 执行打印操作
keyByStream.print();
// 执行 Flink 程序
env.execute("TransformationOpJava");
}
// 获取 Flink 流处理执行环境
private static StreamExecutionEnvironment getEnv() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
}
(2)运行结果如下,可以看到两个 hello 的并行度编号是一样的。因为使用 keyBy 对数据流进行分组时,Flink 将会根据指定的键进行分区,然后每个分区内的数据都由一个任务处理。因此,并行度编号可能会随着 keyBy 操作的引入而发生变化。
三、DataStream API 的使用样例(Scala 版)
注意:如果想要使用 Scala 语言编写程序的话,开发工具要安装 Scala 插件并进行相关配置以实现对 Scala 的支持,具体可以参考我之前写的文章:
1,map()
(1)下面代码使用 map 算子对数据流中的每个元素乘以 2。
object TransformationOpScala {
//注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错
import org.apache.flink.api.scala._
def main(args: Array[String]): Unit = {
// 获取 Flink 流处理执行环境
val env = getEnv
//在测试阶段,可以使用 fromElements 构造实时数据流
val text = env.fromElements(1, 2, 3, 4, 5)
//处理数据
val numStream = text.map(_ * 2)
//程执行打印操作
numStream.print()
//执行程序
env.execute("TransformationOpScala")
}
private def getEnv = {
StreamExecutionEnvironment.getExecutionEnvironment
}
}
(2)运行结果如下:
注意:在 Flink 中,由于数据流是并行处理的,打印的结果可能不是按照输入顺序输出的。这是因为在流处理中,数据会被并行处理,并可能在不同的任务管理器或线程中处理。打印结果中 > 前面的数字就是并行度编号。

2,flatMap()
下面代码使用 flatMap 算子将数据流中的每行数据拆分为单词。
object TransformationOpScala {
//注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错
import org.apache.flink.api.scala._
def main(args: Array[String]): Unit = {
// 获取 Flink 流处理执行环境
val env = getEnv
//在测试阶段,可以使用 fromElements 构造实时数据流
val text = env.fromElements("hello hangge", "hello world")
//处理数据
val wordStream = text.flatMap(_.split(" "))
//程执行打印操作
wordStream.print()
//执行程序
env.execute("TransformationOpScala")
}
private def getEnv = {
StreamExecutionEnvironment.getExecutionEnvironment
}
}
3,filter()
下面代码使用 filter 算子过滤出数据流中的偶数。
object TransformationOpScala {
//注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错
import org.apache.flink.api.scala._
def main(args: Array[String]): Unit = {
// 获取 Flink 流处理执行环境
val env = getEnv
//在测试阶段,可以使用 fromElements 构造实时数据流
val text = env.fromElements(1, 2, 3, 4, 5)
//处理数据
val numStream = text.filter(_ % 2 == 0)
//执行打印操作
numStream.print()
//执行程序
env.execute("TransformationOpScala")
}
private def getEnv = {
StreamExecutionEnvironment.getExecutionEnvironment
}
}

4,keyBy()
下面代码使用 keyBy 算子对数据流中的单词进行分组。
object TransformationOpScala {
//注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错
import org.apache.flink.api.scala._
def main(args: Array[String]): Unit = {
// 获取 Flink 流处理执行环境
val env = getEnv
//在测试阶段,可以使用 fromElements 构造实时数据流
val text = env.fromElements("hello","hangge","hello","world")
//处理数据
val wordCountStream = text.map((_,1))
val keyByStream = wordCountStream.keyBy(_._1)
//执行打印操作
keyByStream.print()
//执行程序
env.execute("TransformationOpScala")
}
private def getEnv = {
StreamExecutionEnvironment.getExecutionEnvironment
}
}
(2)运行结果如下,可以看到两个 hello 的并行度编号是一样的。因为使用 keyBy 对数据流进行分组时,Flink 将会根据指定的键进行分区,然后每个分区内的数据都由一个任务处理。因此,并行度编号可能会随着 keyBy 操作的引入而发生变化。
全部评论(0)