返回 导航

Spark

hangge.com

Spark - Structured Streaming使用详解2(输入源1:Rate、Socket)

作者:hangge | 2024-01-09 08:39

二、Rate 输入源

1,基本介绍

Rate 是一个虚拟数据源,该输入源以固定的速率生成固定格式的数据,通常用来测试 Structured Streaming 的性能。

2,使用样例

(1)下面样例我们从虚拟数据源"rate"读取数据,并将数据输出到控制台。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("Hello")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从rate读取数据
    val rows = spark.readStream
      .format("rate") // 设置数据源为 rate
      .option("rowsPerSecond", 3) // 设置每秒产生的数据的条数, 默认是 1
      .option("rampUpTime", 1) // 设置多少秒到达指定速率 默认为 0
      .option("numPartitions", 2) /// 设置分区数  默认是 spark 的默认并行度
      .load

    // 启动查询, 把结果打印到控制台
    val query = rows.writeStream
      .outputMode("append") // 使用complete输出模式
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

(2)程序启动后,控制台输出内容如下:

三、Socket 输入源

1,基本介绍

Socket(套接字)是计算机网络通信的基础,它提供了一种标准的编程接口,用于在不同计算机之间进行数据传输和通信。

2,样例代码

(1)下面代码我们通过 Structured Streaming 读取 9999 端口数据并统计不同单词出现的次数:
import org.apache.spark.sql.SparkSession

object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("HelloStructuredStreaming")
      .master("local[*]")
      .getOrCreate()

    // 导入隐式转换
    import spark.implicits._

    // 创建一个流式DataFrame,这里从Socket读取数据
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()

    // 对流式DataFrame进行转换,将每一行数据切分成单词
    val words = lines.as[String].flatMap(_.split(" "))

    // 对单词进行分组和聚合
    val wordCounts = words.groupBy("value").count()

    // 启动查询, 把结果打印到控制台
    val query = wordCounts.writeStream
      .outputMode("complete") // 或者选择其他适合的输出模式
      .format("console")
      .start()

    // 等待应用程序终止
    query.awaitTermination()

    //关闭 Spark
    spark.stop()
  }
}

3,运行测试

(1)我们通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(2)程序启动后,我们在该终端中输入一些文本数据:

(3)Structured Streaming 应用程序这边将会实时处理输入的文本数据并输出结果:
评论

全部评论(0)

回到顶部