Spark - Structured Streaming使用详解13(输出接收器4:foreach、foreachBatch sink)
作者:hangge | 2024-01-29 08:50
四、foreach sink
1,基本介绍
foreach sink 会遍历表中的每一行, 允许将流查询结果按开发者指定的逻辑输出。例如我们可以借助 foreach sink 将数据写入外部数据库、向外部 API 发送请求等。
2,准备工作
(1)将设我们需要把 wordcount 数据写入到 mysql,首先我们需要创建如下数据库表:
CREATE TABLE word_count (
word VARCHAR(255) PRIMARY KEY NOT NULL,
count BIGINT NOT NULL
);
(2)接着项目中添加 mysql 驱动:
<!-- 数据库驱动依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
3,样例代码
下面是一个读取 socket 数据然后进行单词统计,并将结果输出到 MySQL 数据库中的 word_count 表中样例:
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}
object Hello {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Hello")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建一个流式DataFrame,这里从Socket读取数据
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 单词统计
val wordCount: DataFrame = lines.as[String]
.flatMap(_.split("\\W+"))
.groupBy("value")
.count()
// 启动查询, 把结果输出至MySQL
val query: StreamingQuery = wordCount.writeStream
.outputMode("update")
// 使用 foreach 的时候, 需要传递ForeachWriter实例, 三个抽象方法需要实现.
// 每个批次的所有分区都会创建 ForeeachWriter 实例
.foreach(new ForeachWriter[Row] {
var conn: Connection = _
var ps: PreparedStatement = _
var batchCount = 0
// 一般用于 打开链接. 返回 false 表示跳过该分区的数据,
override def open(partitionId: Long, epochId: Long): Boolean = {
println("open ..." + partitionId + " " + epochId)
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/hangge", "root",
"hangge1234")
// 插入数据, 当有重复的 key 的时候更新
val sql = "insert into word_count values(?, ?) " +
"on duplicate key update word=?, count=?"
ps = conn.prepareStatement(sql)
conn != null && !conn.isClosed && ps != null
}
// 把数据写入到连接
override def process(value: Row): Unit = {
println("process ...." + value)
val word: String = value.getString(0)
val count: Long = value.getLong(1)
ps.setString(1, word)
ps.setLong(2, count)
ps.setString(3, word)
ps.setLong(4, count)
ps.execute()
}
// 用户关闭连接
override def close(errorOrNull: Throwable): Unit = {
println("close...")
ps.close()
conn.close()
}
})
.start()
// 等待应用程序终止
query.awaitTermination()
//关闭 Spark
spark.stop()
}
}
4,运行测试
(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)程序启动后,我们在该终端中输入一些文本数据:

(3)稍等一会可以看到 word_count 表中的数据如下:

五、foreachBatch sink
1,基本介绍
(1)不同于 foreach sink 会遍历表中的每一行,foreachBatch sink 在每个微批处理的开始时调用一次,让我们能够获取到每个批次的静态 DataFrame,从而自定义整个微批处理的逻辑,包括将数据写入外部存储、聚合、筛选等等。
(2)由于 foreachBatch sink 以微批处理为单位进行处理,因此适用于更复杂的、需要在整个微批处理内执行的操作。与 foreach sink 相比,foreachBatch sink 的连接建立和资源管理更加高效。
2,准备工作
(1)将设我们需要把 wordcount 数据写入到 mysql,首先我们需要创建如下数据库表:
CREATE TABLE word_count (
word VARCHAR(255) PRIMARY KEY NOT NULL,
count BIGINT NOT NULL
);
(2)接着项目中添加 mysql 驱动:
<!-- 数据库驱动依赖 -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
3,样例代码
下面是一个读取 socket 数据然后进行单词统计,并将结果输出到 MySQL 数据库中的 word_count 表中:
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.streaming.StreamingQuery
import java.util.Properties
object Hello {
def main(args: Array[String]): Unit = {
// 创建 SparkSession
val spark = SparkSession.builder()
.appName("Hello")
.master("local[*]")
.getOrCreate()
// 导入隐式转换
import spark.implicits._
// 创建一个流式DataFrame,这里从Socket读取数据
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// 单词统计
val wordCount: DataFrame = lines.as[String]
.flatMap(_.split("\\W+"))
.groupBy("value")
.count()
// 启动查询, 把结果输出至MySQL
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "hangge1234")
val query: StreamingQuery = wordCount.writeStream
.outputMode("complete")
.foreachBatch{ (df: DataFrame, batchId: Long) => // 当前分区id, 当前批次id
if (df.count() != 0) {
df.cache()
df.write.json(s"./$batchId")
df.write.mode("overwrite")
.jdbc("jdbc:mysql://localhost:3306/hangge", "word_count", props)
}
}
.start()
// 等待应用程序终止
query.awaitTermination()
//关闭 Spark
spark.stop()
}
}
4,运行测试
(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
(2)程序启动后,我们在该终端中输入一些文本数据:

(3)稍等一会可以看到 word_count 表中的数据如下:
全部评论(0)