返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解9(join操作)

作者:hangge | 2024-01-22 08:52
    Structured Streaming 不但支持 Streaming DataSet/DataFrame 与静态的 DataSet/DataFrame 进行 join, 也支持 Streaming DataSet/DataFrame 与另外一个 Streaming DataSet/DataFrame 进行 join。同时 join 的结果也是持续不断的生成,类似于前面学习的 streaming 的聚合结果。

一、Streaming DataSet/DataFrame 与静态的 DataSet/DataFrame 进行 join

1,内连接

(1)下面是一个使用内连接样例代码:
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 静态 df
    val arr = Array(("hangge", "male"), ("lili", "female"), ("tom", "male"));
    var staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")

    // 流式 df
    val lines: DataFrame = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    val streamDF: DataFrame = lines.as[String].map(line => {
      val arr = line.split(",")
      (arr(0), arr(1).toInt)
    }).toDF("name", "age")

    // join 等值内连接  a.name=b.name
    val joinResult: DataFrame = streamDF.join(staticDF, "name")

    // 输出
    val query = joinResult.writeStream
      .outputMode("append")
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999
tom,12
hangge,99
  • 接着我们输入如下数据:
jerry,1
hangge,88

2,外连接

(1)下面是一个使用外连接样例代码:
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 静态 df
    val arr = Array(("hangge", "male"), ("lili", "female"), ("tom", "male"));
    var staticDF: DataFrame = spark.sparkContext.parallelize(arr).toDF("name", "sex")

    // 流式 df
    val lines: DataFrame = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    val streamDF: DataFrame = lines.as[String].map(line => {
      val arr = line.split(",")
      (arr(0), arr(1).toInt)
    }).toDF("name", "age")

    // join 外链接
    val joinResult: DataFrame = streamDF.join(staticDF, Seq("name"), "left")

    // 输出
    val query = joinResult.writeStream
      .outputMode("append")
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)测试一下,首先我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999
tom,12
hangge,99
  • 接着我们输入如下数据:
jerry,1
hangge,88

二、Streaming DataSet/DataFrame与另外一个Streaming DataSet/DataFrame 进行 join

1,基本介绍

    Spark 2.3 开始支持 stream-stream join,使用时 Spark 会自动维护两个流的状态,以保障后续流入的数据能够和之前流入的数据发生 join 操作,但这会导致状态无限增长。因此,在对两个流进行 join 操作时,依然可以用 watermark 机制来消除过期的状态,避免状态无限增长。
注意:
  • 2 个流式数据进行 join 操作,输出模式仅支持 append 模式。
  • 内连接是否使用 watermark 均可,但外连接必须使用 watermark

2,内连接

(1)下面是一个不带 watermast 的内连接样例代码:
import java.sql.Timestamp
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 第 1 个 stream
    val nameSexStream: DataFrame = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0), arr(1), Timestamp.valueOf(arr(2)))
      }).toDF("name", "sex", "ts1")

    // 第 2 个 stream
    val nameAgeStream: DataFrame = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 8888)
      .load
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
      }).toDF("name", "age", "ts2")


    // join 操作
    val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name")

    // 输出
    val query = joinResult.writeStream
      .outputMode("append")
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}
// 9999 端口控制台
hangge,male,2023-09-05 10:41:00
tom,male,2023-09-05 10:42:00
lili,female,2023-09-05 10:43:00

// 8888 端口控制台
hangge,100,2023-09-05 10:43:00
jerry,1,2023-09-05 10:44:00
lili,33,2023-09-05 10:45:00
  • 接着我们在终端窗口输入如下内容:
// 9999 端口控制台
jerry,male,2023-09-05 10:51:00
lili,male,2023-09-05 10:52:00

(2)下面是一个带 watermast 的内连接样例代码:
import java.sql.Timestamp
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 第 1 个 stream
    val nameSexStream: DataFrame = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0), arr(1), Timestamp.valueOf(arr(2)))
      })
      .toDF("name", "sex", "ts1")
      .withWatermark("ts1", "1 minutes")

    // 第 2 个 stream
    val nameAgeStream: DataFrame = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 8888)
      .load
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
      }).toDF("name", "age", "ts2")
      .withWatermark("ts2", "1 minutes")

    // join 操作
    val joinResult: DataFrame = nameSexStream.join(nameAgeStream, "name")

    // 输出
    val query = joinResult.writeStream
      .outputMode("append")
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}
// 9999 端口控制台
hangge,male,2023-09-05 10:41:00
tom,male,2023-09-05 10:42:00
lili,female,2023-09-05 10:43:00

// 8888 端口控制台
hangge,100,2023-09-05 10:43:00
jerry,1,2023-09-05 10:44:00
lili,33,2023-09-05 10:45:00
  • 接着我们在终端窗口输入如下内容:
// 9999 端口控制台
jerry,female,2023-09-05 10:41:00
jerry,female,2023-09-05 10:42:00
jerry,female,2023-09-05 10:42:01
jerry,female,2023-09-05 10:46:00
jerry,female,2023-09-05 10:44:00

3,外连接

(1)外连接和内连接相比,代码几乎一致,只需要在连接的时候指定连接类型为 left_outer(左外连接)、right_outer(右外连接)、 full_outer(全连接)即可。
注意:外连接必须使用 watermast,并连接操作中要使用时间戳字段进行连接。
import org.apache.spark.sql.functions.{expr, window}
import java.sql.Timestamp
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 第 1 个 stream
    val nameSexStream: DataFrame = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0), arr(1), Timestamp.valueOf(arr(2)))
      })
      .toDF("name1", "sex", "ts1")
      .withWatermark("ts1", "1 minutes")

    // 第 2 个 stream
    val nameAgeStream: DataFrame = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 8888)
      .load
      .as[String]
      .map(line => {
        val arr: Array[String] = line.split(",")
        (arr(0), arr(1).toInt, Timestamp.valueOf(arr(2)))
      })
      .toDF("name2", "age", "ts2")
      .withWatermark("ts2", "1 minutes")

    // join 操作(外连接)
    val joinResult: DataFrame = nameSexStream.join(nameAgeStream, expr(
      """
        |name1=name2 and
        |ts2 >= ts1 and
        |ts2 <= ts1 + interval 20 minutes
      """.stripMargin),
      "left_outer")

    // 输出
    val query = joinResult.writeStream
      .outputMode("append")
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2) 测试一下,假设我们在两个终端窗口分别输入如下内容:
// 9999 端口控制台
hangge,male,2023-09-05 10:41:00
tom,male,2023-09-05 10:42:00
lili,female,2023-09-05 10:43:00

// 8888 端口控制台
hangge,100,2023-09-05 10:43:00
jerry,1,2023-09-05 10:44:00
lili,33,2023-09-05 10:45:00
  • 接着我们在终端窗口输入如下内容:
// 9999 端口控制台
jerry,female,2023-09-05 10:41:00
jerry,female,2023-09-05 10:42:00
jerry,female,2023-09-05 10:42:01
jerry,female,2023-09-05 10:46:00
jerry,female,2023-09-05 10:44:00
评论

全部评论(0)

回到顶部