Spark - Structured Streaming使用详解8(dropDuplicates流数据去重)
作者:hangge | 2024-01-19 08:40
八、流数据去重
1,使用 dropDuplicates 实现流数据去重
(1)dropDuplicates 方法可以用于从数据集中删除重复的行,下面是一个简单的样例:
注意:
- dropDuplicates 方法可以指定一个或多个列作为子集,方法将根据这些列的值来判断行是否重复。如果不指定子集参数,方法将考虑所有列。
- dropDuplicates 方法不可用在聚合之后,即通过聚合得到的 DataSet/DataFrame 不能调用 dropDuplicates
import java.sql.Timestamp import org.apache.spark.sql.{DataFrame, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从socket中读取数据 val lines: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 解析原始输入数据 val words: DataFrame = lines.as[String] .map(line => { val arr: Array[String] = line.split(",") (arr(0), arr(1), Timestamp.valueOf(arr(2))) }) .toDF("uid", "word", "ts") .dropDuplicates("uid") // 去重重复数据 uid 相同就是重复 // 查询并输出 val query = words.writeStream .outputMode("append") .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
- 首先我们输入如下数据:
1,hangge,2023-09-14 11:00:00 2,lili,2023-09-14 11:01:00
- 控制台输出如下内容:
- 接着我们输入如下数据:
3,tom,2023-09-14 10:10:00 1,hangge,2023-09-14 10:01:00
- 控制台输出如下内容(id 为 1 的数据重复,则不输出):
2,dropDuplicates 与 watermark 结合使用
(1)上面样例中我们没有使用水印,因此没有限制重复记录何时可能到达,因此查询将存储来自所有过去记录的数据作为状态数据。下面样例我们在事件时间列上定义水印,并使用唯一标识符和事件时间列进行去重。查询将使用水印来删除旧记录的旧状态数据,这些数据不再预计会有任何重复。这将限制查询必须维护的状态数据的数量。
import java.sql.Timestamp import org.apache.spark.sql.{DataFrame, SparkSession} object Hello { def main(args: Array[String]): Unit = { // 创建 SparkSession val spark = SparkSession.builder() .appName("Hello") .master("local[*]") .getOrCreate() // 导入隐式转换 import spark.implicits._ // 创建一个流式DataFrame,这里从socket中读取数据 val lines: DataFrame = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 解析原始输入数据 val words: DataFrame = lines.as[String] .map(line => { val arr: Array[String] = line.split(",") (arr(0), arr(1), Timestamp.valueOf(arr(2))) }) .toDF("uid", "word", "ts") .withWatermark("ts", "2 minutes") // 添加水印 .dropDuplicates("uid") // 去重重复数据 uid 相同就是重复 // 查询并输出 val query = words.writeStream .outputMode("append") .format("console") .start() // 等待应用程序终止 query.awaitTermination() //关闭 Spark spark.stop() } }
(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket:
nc -lk 9999
- 首先我们输入如下数据:
1,hangge,2023-09-14 11:00:00 2,lili,2023-09-14 11:01:00
- 控制台输出如下内容:
- 接着我们输入如下数据:
3,tom,2023-09-14 10:10:00 1,hangge,2023-09-14 10:01:00
- 控制台输出如下内容(id 为 1 的数据重复,并且数据过期,则不输出;id 为 3 的数据数据过期也不输出):
全部评论(0)