Flink - DataStream API使用详解2(数据流合并与分流:union、connect、side output)
作者:hangge | 2025-02-20 08:39
一、准备工作
1,创建项目
如果想要使用 Scala 语言编写程序的话,开发工具要安装 Scala 插件并进行相关配置以实现对 Scala 的支持,具体可以参考我之前写的文章:
注意:Scala 版本要与我们下面的依赖匹配,比如我们这里就要使用 2.12 版本的 Scala。否则运行 flink 程序时会报错。
2,添加依赖
我们创建一个 Maven 项目,然后在 pom.xml 文件中,添加 Flink 实时计算相关的依赖。


(3)运行结果如下:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.0</version>
</dependency>
二、union()
1,基本介绍
(1)union 表示合并多个流,但是多个流的数据类型必须一致。
(2)多个流 join 之后,就变成了一个流,流里面的数据使用相同的计算规则。

2,使用样例
(1)我们使用使用 union 算子对两个数据流中的数字进行合并。(2)下面是 Scala 实现代码:
object TransformationOpScala {
//注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错
import org.apache.flink.api.scala._
def main(args: Array[String]): Unit = {
// 获取 Flink 流处理执行环境
val env = getEnv
//第 1 份数据流
val text1 = env.fromCollection(Array(1, 2, 3, 4, 5))
//第 2 份数据流
val text2 = env.fromCollection(Array(6, 7, 8, 9, 10))
//合并流
val unionStream = text1.union(text2)
//执行打印操作
unionStream.print()
//执行程序
env.execute("TransformationOpScala")
}
- 下面是 Java 实现代码:
public class TransformationOp {
public static void main(String[] args) throws Exception{
// 获取 Flink 流处理执行环境
StreamExecutionEnvironment env = getEnv();
// 第 1 份数据流
DataStreamSource<Integer> text1 = env.fromElements(1, 2, 3, 4, 5);
// 第 2 份数据流
DataStreamSource<Integer> text2 = env.fromElements(6, 7, 8, 9, 10);
// 合并流
DataStream<Integer> unionStream = text1.union(text2);
// 执行打印操作
unionStream.print();
// 执行 Flink 程序
env.execute("TransformationOpJava");
}
// 获取 Flink 流处理执行环境
private static StreamExecutionEnvironment getEnv() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
}
(3)运行结果如下:
三、connect()
1,基本介绍
(1)connect 只能连接两个流,两个流的数据类型可以不同。
(2)两个流被 connect 之后,只是被放到了同一个流中,它们内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。也就是说两份数据相互还是独立的,每一份数据使用一个计算规则。
(3)connect 方法会返回 connectedStream,在 connectedStream 中需要使用 CoMap、CoFlatMap 这种函数,类似于 map 和 flatmap。

2,使用样例
(1)下面我们使用 connect 算子将两个数据流中的用户信息关联到一起。
(2)下面是 Scala 实现代码:
object TransformationOpScala {
//注意:必须要添加这一行隐式转换的代码,否则下面的 flatMap、Map 等方法会报错
import org.apache.flink.api.scala._
def main(args: Array[String]): Unit = {
// 获取 Flink 流处理执行环境
val env = getEnv
// 第 1 份数据流
val text1 = env.fromElements("user:tom,age:18")
// 第 2 份数据流
val text2 = env.fromElements("user:jack_age:18")
// 连接两个流
val connectStream = text1.connect(text2)
val resStream = connectStream.map(_.replace(",", "-"), _.replace("_", "-"))
//执行打印操作
resStream.print()
//执行程序
env.execute("TransformationOpScala")
}
private def getEnv = {
StreamExecutionEnvironment.getExecutionEnvironment
}
}
- 下面是 Java 实现代码:
public class TransformationOp {
public static void main(String[] args) throws Exception{
// 获取 Flink 流处理执行环境
StreamExecutionEnvironment env = getEnv();
// 第 1 份数据流
DataStreamSource<String> text1 = env.fromElements("user:tom,age:18");
// 第 2 份数据流
DataStreamSource<String> text2 = env.fromElements("user:jack_age:18");
// 连接两个流
ConnectedStreams<String, String> connectStream = text1.connect(text2);
SingleOutputStreamOperator<String> resStream = connectStream.map(
new CoMapFunction<String, String, String>() {
// 处理第 1 份数据流中的数据
@Override
public String map1(String value) throws Exception {
return value.replace(",", "-");
}
// 处理第 2 份数据流中的数据
@Override
public String map2(String value) throws Exception {
return value.replace("_", "-");
}
});
// 执行打印操作
resStream.print().setParallelism(1);
// 执行 Flink 程序
env.execute("TransformationOpJava");
}
// 获取 Flink 流处理执行环境
private static StreamExecutionEnvironment getEnv() {
return StreamExecutionEnvironment.getExecutionEnvironment();
}
}
(3)运行结果如下:

四、Side Output
1,基本介绍
(1)Side Output(旁路输出) 用于从同一个 DataStream 中输出多种不同类型的数据流。它允许用户将处理逻辑中的特定数据分流到侧输出,避免对主输出造成干扰,从而使流处理更加灵活和高效。
(2)使用 Side Output 还可以对流进行多次切分。
提示:split 也可也根据规则把一个数据流切分为多个流。但是 split 只能分一次流,切分出来的流不能继续分流,并且 split 方法已经标记为过时了,官方不推荐使用,现在官方推荐使用 side output 的方式实现。
2,使用样例
(1)这里我们演示如何使用 Side Output 实现流的多次切分:
- 首先,从 text 流中提取奇数和偶数两个流。
- 接着,对偶数流 evenStream 进一步分流为小于等于 5 的偶数流,大于 5 的偶数流。
- 最终,打印小于等于 5 的偶数流的数据。
(2)下面是 Scala 实现代码:
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{OutputTag, StreamExecutionEnvironment}
import org.apache.flink.util.Collector
object StreamSideOutputScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
val text = env.fromCollection(Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
//按照数据的奇偶性对数据进行分流
//首先定义两个sideoutput来准备保存切分出来的数据
val outputTag1 = new OutputTag[Int]("even"){}//保存偶数
val outputTag2 = new OutputTag[Int]("odd"){}//保存奇数
//注意:process属于Flink中的低级api
val outputStream = text.process(new ProcessFunction[Int,Int] {
override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context,
out: Collector[Int]): Unit = {
if(value % 2 == 0 ){
ctx.output(outputTag1,value)
}else{
ctx.output(outputTag2,value)
}
}
})
//获取偶数数据流
val evenStream = outputStream.getSideOutput(outputTag1)
//获取奇数数据流
val oddStream = outputStream.getSideOutput(outputTag2)
//evenStream.print().setParallelism(1)
//对evenStream流进行二次切分
val outputTag11 = new OutputTag[Int]("low"){}//保存小于等五5的数字
val outputTag12 = new OutputTag[Int]("high"){}//保存大于5的数字
val subOutputStream = evenStream.process(new ProcessFunction[Int,Int] {
override def processElement(value: Int, ctx: ProcessFunction[Int, Int]#Context,
out: Collector[Int]): Unit = {
if(value<=5){
ctx.output(outputTag11,value)
}else{
ctx.output(outputTag12,value)
}
}
})
//获取小于等于5的数据流
val lowStream = subOutputStream.getSideOutput(outputTag11)
//获取大于5的数据流
val highStream = subOutputStream.getSideOutput(outputTag12)
lowStream.print().setParallelism(1)
env.execute("StreamSideOutputScala")
}
}
- 下面是 Java 实现代码
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import java.util.Arrays;
public class StreamSideoutputJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> text = env.fromCollection(Arrays
.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
//按照数据的奇偶性对数据进行分流
//首先定义两个sideoutput来准备保存切分出来的数据
OutputTag<Integer> outputTag1 = new OutputTag<Integer>("even") {};
OutputTag<Integer> outputTag2 = new OutputTag<Integer>("odd") {};
SingleOutputStreamOperator<Integer> outputStream = text
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out)
throws Exception {
if (value % 2 == 0) {
ctx.output(outputTag1, value);
} else {
ctx.output(outputTag2, value);
}
}
});
//获取偶数数据流
DataStream<Integer> evenStream = outputStream.getSideOutput(outputTag1);
//获取奇数数据流
DataStream<Integer> oddStream = outputStream.getSideOutput(outputTag2);
//对evenStream流进行二次切分
OutputTag<Integer> outputTag11 = new OutputTag<Integer>("low") {};
OutputTag<Integer> outputTag12 = new OutputTag<Integer>("high") {};
SingleOutputStreamOperator<Integer> subOutputStream = evenStream
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out)
throws Exception {
if (value <= 5) {
ctx.output(outputTag11, value);
} else {
ctx.output(outputTag12, value);
}
}
});
//获取小于等于5的数据流
DataStream<Integer> lowStream = subOutputStream.getSideOutput(outputTag11);
//获取大于5的数据流
DataStream<Integer> highStream = subOutputStream.getSideOutput(outputTag12);
lowStream.print().setParallelism(1);
env.execute("StreamSideoutputJava");
}
}
(3)运行结果如下:

全部评论(0)