Flink - DataStream API使用详解4(DataSoure和DataSink:socket数据输出至Redis)
作者:hangge | 2025-02-24 09:23
DataSink 是输出组件,负责把计算好的数据输出到其它存储介质中。本文演示如何接收 Socket 传输过来的数据,把数据保存到 Redis 的 list 队列中。
一、DataSoure 与 DataSink 介绍
1,基本介绍
DataStream API 主要分为 3 块:DataSource、Transformation、DataSink。
- DataSource 是程序的输入数据源。
- Transformation 是具体的操作,它对一个或多个输入数据源进行计算处理,例如 map、flatMap 和 filter 等操作。
- DataSink 是程序的输出,它可以把 Transformation 处理之后的数据输出到指定的存储介质中。
2,DataSoure
(1)DataSource 是程序的输入数据源,Flink 提供了大量内置的 DataSource,也支持自定义 DataSource,不过目前 Flink 提供的这些已经足够我们正常使用了。
- Flink 内置数据源有:Kafka、Kinesis Streams、RabbitMQ、NiFi、Twitter Streaming API、Google PubSub
- Apache Bahir 数据源(需要添加这个依赖包之后才能使用的):ActiveMQ、Netty
(2)当程序出现错误的时候,Flink 的容错机制能恢复并继续运行程序,这种错误包括机器故障、网络故障、程序故障等。针对 Flink 提供的常用数据源接口,如果程序开启了 checkpoint 快照机制,Flink 可以提供这些容错性保证。
| DataSource | 容错保证 | 备注 |
| Socket | at most once | |
| Collection | exactly once | |
| Kafka | exactly once | 需要使用 0.10 及以上版本 |
3,DataSink
(1)DataSink 是 输出组件,负责把计算好的数据输出到其它存储介质中
- Flink 支持把流数据输出到文件中,不过在实际工作中这种场景不多,因为流数据处理之后一般会存储到一些消息队列里面,或者数据库里面,很少会保存到文件中的。
- 还有就是 print,直接打印,这个其实我们已经用了很多次了,这种用法主要是在测试的时候使用的,方便查看输出的结果信息
- Flink 提供了一批 Connectors,可以实现输出到第三方目的地。在实际工作中最常用的是 kafka、redis
| Flink 内置 | Apache Bahir |
| Kafka | ActiveMQ |
| Cassandra | Flume |
| Kinesis Streams | Redis |
| Elasticsearch | Akka |
| Hadoop FileSysterm | |
| RabbitMQ | |
| NiFi | |
| JDBC |
| DataSink | 容错保证 | 备注 |
| Redis | at least once | |
| Kafka | at least once / exactly once | Kafka0.9 和 0.10 提供 at least once,Kafka0.11 及以上提供 exactly once |
二、演示样例
1,准备工作
我们创建一个 Maven 项目,然后在 pom.xml 文件中,添加 Flink 实时计算相关的依赖。
注意:redis sink 是在 Bahir 这个依赖包中,所以在 pom.xml 中需要添加对应的依赖。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.16.0</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1.0</version>
</dependency>
2,样例代码
(1)下面是使用 Scala 实现的代码:
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.redis.RedisSink
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper
object StreamRedisSinkScala {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
//连接socket获取输入数据
val text = env.socketTextStream("192.168.121.128", 9999)
import org.apache.flink.api.scala._
//组装数据,这里组装的是tuple2类型
//第一个元素是指list队列的key名称
//第二个元素是指需要向list队列中添加的元素
val listData = text.map(word => ("words", word))
//指定redisSink
val conf = new FlinkJedisPoolConfig.Builder()
.setHost("192.168.121.128")
.setPort(6379)
.setPassword("123")
.build()
val redisSink = new RedisSink[Tuple2[String, String]](conf, new MyRedisMapper)
listData.addSink(redisSink)
env.execute("StreamRedisSinkScala")
}
//自定义RedisMapper
private class MyRedisMapper extends RedisMapper[(String, String)]{
//指定具体的操作命令
override def getCommandDescription: RedisCommandDescription = {
new RedisCommandDescription(RedisCommand.LPUSH)
}
//获取key
override def getKeyFromData(data: (String, String)): String = {
data._1
}
//获取value
override def getValueFromData(data: (String, String)): String = {
data._2
}
}
}
(2)下面是使用 Java 实现同样功能的代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class StreamRedisSinkJava {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//连接socket获取输入数据
DataStreamSource<String> text = env.socketTextStream("192.168.121.128", 9999);
//组装数据
SingleOutputStreamOperator<Tuple2<String, String>> listData = text
.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String word) throws Exception {
return new Tuple2<String, String>("words", word);
}
});
//指定redisSink
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setHost("192.168.121.128")
.setPort(6379)
.setPassword("123")
.build();
RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(conf, new MyRedisMapper());
listData.addSink(redisSink);
env.execute("StreamRedisSinkJava");
}
//自定义redisSink的mapper
public static class MyRedisMapper implements RedisMapper<Tuple2<String,String>>{
//指定具体的操作命令
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.LPUSH);
}
//获取key
@Override
public String getKeyFromData(Tuple2<String, String> data) {
return data.f0;
}
//获取value
@Override
public String getValueFromData(Tuple2<String, String> data) {
return data.f1;
}
}
}
3,运行测试
(1)我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)然后运行 Flink 程序。待程序启动后,我们在该终端中输入一些文本数据:

(3)最终到 redis 中查看结果,可以发现数据都保存到 Redis 的 list 队列中了。
lrange words 0 -1
全部评论(0)