Flink - Kafka Connector使用详解1(读取kafka数据流)
作者:hangge | 2025-03-11 08:39
Flink 社区提供了丰富的连接器(Connectors)以方便与不同的数据源进行交互,其 Flink-Kafka-Connector 是 Flink 提供的一个专门用于与 Kafka 集成的组件。通过这个连接器,用户可以轻松地从 Kafka 中读取数据流 (Source)或将数据流写入到 Kafka(Sink)。本文首先介绍如何从 Kafka 中读取数据。
(2)下面是实现同样功能的 Java 语言代码:
(2)接着创建一个该主题的消息生产者:
(3)发送如下两条测试数据:
(4)可以看到控制台输出内容如下:
(2)我们也可也订阅与正则表达式所匹配的 Topic 下的所有 Partition:
(3)还可以通过 Partition 列表订阅指定的 Partition:
(2)为了避免重复消费问题,确保程序重启后从上一次位置继续消费,我们将 KafkaSource 配置部分做如下修改,启用自动提交消费位移(offset)的功能,并将消费者的起始位移被设置为已提交的最早位移。
一、读取 Kafka 数据(Kafka Source)
1,准备工作
首先,我们创建一个 Maven 项目,然后在 pom.xml 文件中除了添加 Flink 相关依赖外,还需要添加 Kafka Consumer 依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.16.0</version>
</dependency>
2,使用样例
(1)我们读取 Kafka 的制定 topic 数据并进行单词统计,然后打印到控制台。下面是 Scala 语言代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object StreamKafkaSourceScala {
def main(args: Array[String]): Unit = {
//获取执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
import org.apache.flink.api.scala._
// 指定 KafkaSource 相关配置
val source: KafkaSource[String] = KafkaSource.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setTopics("source_topic")
.setGroupId("flink_kafka")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
// 指定 KafkaSource 相关配置
val kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
// 转换
val wordCount = kafkaDS.flatMap(_.split(" ")) //将每一行数据根据空格切分单词
.map((_, 1)) //每一个单词转换为tuple2的形式(单词,1)
.keyBy(tup => tup._1) //官方推荐使用keyselector选择器选择数据
.window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法
.sum(1) // 使用sum或者reduce都可以
// 输出
wordCount.print()
//执行任务
env.execute()
}
}
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
public class StreamKafkaSourceJava {
public static void main(String[] args) throws Exception {
// 获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 指定 KafkaSource 相关配置
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("192.168.121.128:9092")
.setTopics("source_topic")
.setGroupId("flink_kafka")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 创建 Kafka 数据流
DataStream<String> kafkaDS = env.fromSource(source, WatermarkStrategy.noWatermarks(),
"Kafka Source");
// 转换
SingleOutputStreamOperator<Tuple2<String, Integer>> wordCount = kafkaDS
.flatMap(new FlatMapFunction<String, String>() {
public void flatMap(String line, Collector<String> out) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
out.collect(word);
}
}
}).map(new MapFunction<String, Tuple2<String, Integer>>() {
public Tuple2<String, Integer> map(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}
})
.keyBy(tup -> tup.f0) // 使用 KeySelector 选择器选择数据
.window(TumblingProcessingTimeWindows.of(Time.seconds(2))) // 使用窗口方法
.sum(1); // 使用 sum 或者 reduce 都可以
// 输出
wordCount.print();
// 执行任务
env.execute();
}
}
3,运行测试
(1)首先我们创建好 Kafka 的 topic。然后运行我们编写的 Flink 程序。
kafka-topics.sh --create --bootstrap-server localhost:9092 --partitions 5 --replication-factor 1 --topic source_topic
(2)接着创建一个该主题的消息生产者:
kafka-console-producer.sh --broker-list localhost:9092 --topic source_topic
(3)发送如下两条测试数据:
hangge com com 1 hangge
(4)可以看到控制台输出内容如下:

附:进阶技巧
1,Topic / Partition 订阅方式
(1)下面样例我们订阅 Topic 列表中所有 Partition 的消息:
// 指定 KafkaSource 相关配置
val source: KafkaSource[String] = KafkaSource.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setTopics("source_topic1", "source_topic2", "source_topic3")
.setGroupId("flink_kafka")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
(2)我们也可也订阅与正则表达式所匹配的 Topic 下的所有 Partition:
// 指定 KafkaSource 相关配置
val source: KafkaSource[String] = KafkaSource.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setTopics("source_topic*")
.setGroupId("flink_kafka")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
(3)还可以通过 Partition 列表订阅指定的 Partition:
// 指定 KafkaSource 相关配置
val partitionSet = Set(
new TopicPartition("source_topic1", 0), // Partition 0 of topic "source_topic1"
new TopicPartition("source_topic2", 3) // Partition 3 of topic "source_topic2"
)
import scala.collection.JavaConverters._ // 用于转换 Set[TopicPartition] 为 Java Set
val source: KafkaSource[String] = KafkaSource.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setPartitions(partitionSet.asJava) // 使用转换后的 Java Set
.setGroupId("flink_kafka")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
2,起始消费位点、自定提交 offset
(1)Kafka source 能够通过位点初始化器(OffsetsInitializer)来指定从不同的偏移量开始消费 。内置的位点初始化器包括:
- 从最早位点开始消费(默认)
// 指定 KafkaSource 相关配置
val source: KafkaSource[String] = KafkaSource.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setTopics("source_topic")
.setGroupId("flink_kafka")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
- 从最末尾位点开始消费:
.setStartingOffsets(OffsetsInitializer.latest())
- 从时间戳大于等于指定时间戳(毫秒)的数据开始消费:
.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
- 从消费组提交的位点开始消费,不指定位点重置策略:
.setStartingOffsets(OffsetsInitializer.committedOffsets())
- 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点:
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
(2)为了避免重复消费问题,确保程序重启后从上一次位置继续消费,我们将 KafkaSource 配置部分做如下修改,启用自动提交消费位移(offset)的功能,并将消费者的起始位移被设置为已提交的最早位移。
// 指定 KafkaSource 相关配置
val source = KafkaSource.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setTopics("source_topic")
.setGroupId("con")
.setProperty("enable.auto.commit","true") // 启用自动提交消费位移(offset)的功能
.setProperty("auto.commit.interval.ms","1000") // 设置自动提交消费位移的时间间隔为1秒
// 消费者的起始位移被设置为已提交的最早位移
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
.setValueOnlyDeserializer(new SimpleStringSchema())
.build
3,动态分区检查
(1)为了在不重启 Flink 作业的情况下处理 Topic 扩容或新建 Topic 等场景,可以将 Kafka Source 配置为在提供的 Topic / Partition 订阅模式下定期检查新分区。
(2)分区检查功能默认不开启。要启用动态分区检查,将 partition.discovery.interval.ms 设置为非负值即可。
// 指定 KafkaSource 相关配置
val source: KafkaSource[String] = KafkaSource.builder[String]
.setBootstrapServers("192.168.121.128:9092")
.setTopics("source_topic")
.setGroupId("flink_kafka")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("partition.discovery.interval.ms", "10000") // 每 10 秒检查一次新分区
.build
全部评论(0)