Flink - Kafka Connector使用详解2(将数据流写入Kafka)
作者:hangge | 2025-03-13 08:42
前文我演示了如何利用 Flink-Kafka-Connector 从 Kafka 中读取数据流 (Source),本文接着通过样例演示如何利用 Kafka Sink 将数据流写入一个或多个 Kafka topic。
(2)下面是实现同样功能的 Java 语言代码:
(2)接着创建一个该主题的消息生产者:
(3)发送如下两条测试数据:
(4)可以看到结果数据已经写入到 Kafka 中了。
二、将数据流写入 Kafka(Kafka Sink)
1,准备工作
首先,我们创建一个 Maven 项目,然后在 pom.xml 文件中除了添加 Flink 相关依赖外,还需要添加 Kafka Consumer 依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.16.0</version>
</dependency>
2,使用样例
(1)我们对上文的样例做个改进,将 WordCount 的结果保存到 Kafka 中。下面是 Scala 语言代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.base.DeliveryGuarantee
import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object StreamKafkaSourceScala {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
// 指定 KafkaSource 相关配置
val source: KafkaSource[String] = KafkaSource.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setTopics("source_topic")
.setGroupId("flink_kafka")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
// 指定 KafkaSource 相关配置
val kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
// 转换
val wordCount = kafkaDS.flatMap(_.split(" ")) //将每一行数据根据空格切分单词
.map((_, 1)) // 每一个单词转换为tuple2的形式(单词,1)
.keyBy(tup => tup._1) // 官方推荐使用 keyselector 选择器选择数据
.window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法
.sum(1) // 使用sum或者reduce都可以
// 创建 kafkaSink
val kafkaSink = KafkaSink.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("sink_topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build
// 输出到 kafka
wordCount
.map(tup => s"${tup._1}:${tup._2}") // 转换为字符串
.sinkTo(kafkaSink) // 输出到kafka
//执行任务
env.execute()
}
}
(2)下面是实现同样功能的 Java 语言代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamKafkaSourceJava {
public static void main(String[] args) throws Exception {
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定 KafkaSource 相关配置
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("192.168.121.128:9092")
.setTopics("source_topic")
.setGroupId("flink_kafka")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 创建 Kafka 数据流
DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"Kafka Source");
// 转换
SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = kafkaDS
.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
public Tuple2<String, Integer> map(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
})
.keyBy(tup -> tup.f0) // 使用 KeySelector 选择器选择数据
.window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法
.sum(1); // 使用 sum 或者 reduce 都可以
// 创建 KafkaSink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.121.128:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopic("sink_topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
// 输出到 Kafka
wordCount.map(tuple -> tuple.f0 + ":" + tuple.f1)
.sinkTo(kafkaSink);
// 执行任务
env.execute();
}
}
3,运行测试
(1)首先我们创建号 Kafka 的 topic。然后运行我们编写的 Flink 程序。
kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 1 --topic source_topic
(2)接着创建一个该主题的消息生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic source_topic
(3)发送如下两条测试数据:
hangge com com 1 hangge
(4)可以看到结果数据已经写入到 Kafka 中了。

附:进阶技巧
1,动态指定 topic
(1)setTopicSelector 是 Flink 的 Kafka Sink API 提供的一个方法,允许动态地为每条消息选择一个目标主题。可以通过实现 KafkaRecordSerializationSchema.TopicSelector 接口(或使用 Lambda 表达式)来动态确定消息的目标主题。
(2)例如我们根据消息内容动态地决定写入的 Kafka 主题,例如:如果消息以 "error" 开头,则写入 error_topic;如果以 "info" 开头,则写入 info_topic。通过将数据流分流到不同的 Kafka 主题中,便于下游消费。
- 下面是 Scala 语言代码:
// 创建 kafkaSink
val kafkaSink = KafkaSink.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopicSelector(new TopicSelector[String] {
override def apply(value: String): String = {
// 根据消息内容动态选择主题
if (value.startsWith("error")) "error_topic"
else if (value.startsWith("info")) "info_topic"
else "default_topic"
}
})
.setValueSerializationSchema(new SimpleStringSchema())
.build
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build
- 下面是使用 Java 语言实现同样的功能:
// 创建 KafkaSink
KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
.setBootstrapServers("192.168.121.128:9092")
.setRecordSerializer(
KafkaRecordSerializationSchema.builder()
.setTopicSelector(new TopicSelector<String>() {
@Override
public String apply(String value) {
// 根据消息内容动态选择主题
if (value.startsWith("error")) {
return "error_topic";
} else if (value.startsWith("info")) {
return "info_topic";
} else {
return "default_topic";
}
}
})
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
2,容错
(1)KafkaSink 总共支持三种不同的语义保证(DeliveryGuarantee):
- DeliveryGuarantee.NONE:不提供任何保证:消息有可能会因 Kafka broker 的原因发生丢失或因 Flink 的故障发生重复。
- DeliveryGuarantee.AT_LEAST_ONCE:sink 在 checkpoint 时会等待 Kafka 缓冲区中的数据全部被 Kafka producer 确认。消息不会因 Kafka broker 端发生的事件而丢失,但可能会在 Flink 重启时重复,因为 Flink 会重新处理旧数据。
- DeliveryGuarantee.EXACTLY_ONCE:该模式下,Kafka sink 会将所有数据通过在 checkpoint 时提交的事务写入。因此,如果 consumer 只读取已提交的数据(参见 Kafka consumer 配置 isolation.level),在 Flink 发生重启时不会发生数据重复。然而这会使数据在 checkpoint 完成时才会可见,因此请按需调整 checkpoint 的间隔。请确认事务 ID 的前缀(transactionIdPrefix)对不同的应用是唯一的,以保证不同作业的事务不会互相影响!此外,强烈建议将 Kafka 的事务超时时间调整至远大于“checkpoint 最大间隔 + 最大重启时间”,否则 Kafka 对未提交事务的过期处理会导致数据丢失。
(2)默认情况下 KafkaSink 使用 DeliveryGuarantee.NONE。我们可以通过 setDeliveryGuarantee 方法进行修改。
// 创建 kafkaSink
val kafkaSink = KafkaSink.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("sink_topic")
.setValueSerializationSchema(new SimpleStringSchema())
.build
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 设置 kafka 输出的保证
.build
全部评论(0)