返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解8(dropDuplicates流数据去重)

作者:hangge | 2024-01-19 08:40

八、流数据去重

1,使用 dropDuplicates 实现流数据去重

(1)dropDuplicates 方法可以用于从数据集中删除重复的行,下面是一个简单的样例:
注意:
  • dropDuplicates 方法可以指定一个或多个列作为子集,方法将根据这些列的值来判断行是否重复。如果不指定子集参数,方法将考虑所有列。
  • dropDuplicates 方法不可用在聚合之后,即通过聚合得到的 DataSet/DataFrame 不能调用 dropDuplicates
import java.sql.Timestamp
import org.apache.spark.sql.{DataFrame, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从socket中读取数据
    val lines: DataFrame = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 解析原始输入数据
    val words: DataFrame = lines.as[String]
      .map(line => {
       val arr: Array[String] = line.split(",")
       (arr(0), arr(1), Timestamp.valueOf(arr(2)))
      })
      .toDF("uid", "word", "ts")
      .dropDuplicates("uid")  // 去重重复数据 uid 相同就是重复

    // 查询并输出
    val query = words.writeStream
      .outputMode("append")
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999
  • 首先我们输入如下数据:
1,hangge,2023-09-14 11:00:00
2,lili,2023-09-14 11:01:00
  • 控制台输出如下内容:
  • 接着我们输入如下数据:
3,tom,2023-09-14 10:10:00
1,hangge,2023-09-14 10:01:00
  • 控制台输出如下内容(id1 的数据重复,则不输出):

2,dropDuplicates 与 watermark 结合使用

(1)上面样例中我们没有使用水印,因此没有限制重复记录何时可能到达,因此查询将存储来自所有过去记录的数据作为状态数据。下面样例我们在事件时间列上定义水印,并使用唯一标识符和事件时间列进行去重。查询将使用水印来删除旧记录的旧状态数据,这些数据不再预计会有任何重复。这将限制查询必须维护的状态数据的数量。
import java.sql.Timestamp
import org.apache.spark.sql.{DataFrame, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从socket中读取数据
    val lines: DataFrame = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 解析原始输入数据
    val words: DataFrame = lines.as[String]
      .map(line => {
       val arr: Array[String] = line.split(",")
       (arr(0), arr(1), Timestamp.valueOf(arr(2)))
      })
      .toDF("uid", "word", "ts")
      .withWatermark("ts", "2 minutes") // 添加水印
      .dropDuplicates("uid")  // 去重重复数据 uid 相同就是重复

    // 查询并输出
    val query = words.writeStream
      .outputMode("append")
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999
  • 首先我们输入如下数据:
1,hangge,2023-09-14 11:00:00
2,lili,2023-09-14 11:01:00
  • 控制台输出如下内容:
  • 接着我们输入如下数据:
3,tom,2023-09-14 10:10:00
1,hangge,2023-09-14 10:01:00
  • 控制台输出如下内容(id 1 的数据重复,并且数据过期,则不输出;id3 的数据数据过期也不输出):
评论

全部评论(0)

回到顶部