返回 导航

Spark

hangge.com

Spark - Spark Streaming使用详解4(自定义数据源:Socket、MQTT)

作者:hangge | 2023-12-08 09:10
    通过自定义数据源,我们可以从非标准输入源接收流式数据,这在一些特定业务场景下十分有用。要实现自定义数据源,只需要继承 Receiver,并实现 onStartonStop 方法来自定义数据源采集即可,下面通过样例进行演示。

四、自定义数据源

1,自定义数据源之 Socket

(1)尽管 Spark Streaming 已经为我们提供了现成的套接字输入源可供直接使用(点击查看),为了展示自定义数据源的实现,我们这里自定义一个通过 socket 接收数据的 Spark Streaming 数据接收器(Receiver),具体内容如下。
class CustomerSocketReceiver(host: String, port: Int) extends
  Receiver[String](StorageLevel.MEMORY_ONLY) {

  //最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Spark
  override def onStart(): Unit = {
    new Thread("Socket Receiver") {
      override def run() {
        receive()
      }
    }.start()
  }

  //读数据并将数据发送给 Spark
  def receive(): Unit = {
    //创建一个 Socket
    var socket: Socket = new Socket(host, port)
    //定义一个变量,用来接收端口传过来的数据
    var input: String = null
    //创建一个 BufferedReader 用于读取端口传来的数据
    val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,
      StandardCharsets.UTF_8))
    //读取数据
    input = reader.readLine()
    //当 receiver 没有关闭并且输入数据不为空,则循环发送数据给 Spark
    while (!isStopped() && input != null) {
      store(input)
      input = reader.readLine()
    }
    //跳出循环则关闭资源
    reader.close()
    socket.close()
    //重启任务
    restart("restart")
  }
  override def onStop(): Unit = {}
}

(2)接着我们就可以使用这个自定义的 receiver 来实时地从指定的套接字连接接收数据流。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

    // 初始化 StreamingContext,设置微批处理的时间间隔为 3 秒
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    //创建自定义 receiver 的 Streaming
    val lineStream = ssc.receiverStream(new CustomerSocketReceiver("localhost", 9999))

    //将每一行数据做切分,形成一个个单词
    val wordStream = lineStream.flatMap(_.split(" "))

    //将单词映射成元组(word,1)
    val wordAndOneStream = wordStream.map((_, 1))

    //将相同的单词次数做统计
    val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)

    //打印
    wordAndCountStream.print()

    // 开启任务
    ssc.start()

    // 等待应用程序终止
    ssc.awaitTermination()
  }
}

(3)测试时我们首先通过在终端运行如下命令来启动一个监听本地 9999 端口的 TCP socket
nc -lk 9999

(4)程序启动后,我们在该终端中输入一些文本数据:

(5)Spark Streaming 应用程序这边将会实时处理输入的文本数据并输出结果:

2,自定义数据源之 MQTT

(1)在 Spark Streaming 中并没有官方内置的现成可用的 MQTT 数据源接收器。但是我可以通过编写自定义的 MQTT 数据源接收器来集成 MQTT 数据源到 Spark Streaming 中。首先编辑项目的 pom.xml 文件,添加 Eclipse Paho MQTT 客户端库的依赖:
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.5</version>
</dependency>

(2)接着编写 MQTT 数据源的自定义接收器,具体代码如下:
class MqttDataSourceReceiver(brokerUrl: String, clientId: String, topic: String)
  extends Receiver[String](StorageLevel.MEMORY_ONLY) {

  var mqttClient: MqttClient = _

  // 在接收器启动时调用
  override def onStart(): Unit = {
    connectMqtt()
  }

  // 连接到 MQTT 代理
  def connectMqtt(): Unit = {
    mqttClient = new MqttClient(brokerUrl, clientId)
    val mqttConnectOptions = new MqttConnectOptions()
    mqttClient.connect(mqttConnectOptions)

    // 订阅指定的 MQTT 主题
    mqttClient.subscribe(topic)

    // 设置回调函数,当消息到达时被调用
    mqttClient.setCallback(new MqttCallback {
      override def connectionLost(cause: Throwable): Unit = {}

      // 当有消息到达时调用
      override def messageArrived(topic: String, message: MqttMessage): Unit = {
        // 将消息内容存储到接收器中
        store(new String(message.getPayload))
      }

      override def deliveryComplete(token: IMqttDeliveryToken): Unit = {}
    })
  }

  // 在接收器停止时调用
  override def onStop(): Unit = {
    if (mqttClient != null && mqttClient.isConnected) {
      mqttClient.disconnect()
    }
  }
}

(3)最后在 Spark Streaming 应用程序中使用 MQTT 数据源:
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

    // 初始化 StreamingContext,设置微批处理的时间间隔为 3 秒
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // mqtt地址、客户端id、主题信息
    val brokerUrl = "tcp://192.168.60.9:1883"
    val clientId = "spark-mqtt-example"
    val topic = "word-data"

    //创建自定义 receiver 的 Streaming
    val mqttReceiver = new MqttDataSourceReceiver(brokerUrl, clientId, topic)
    val mqttStream = ssc.receiverStream(mqttReceiver)

    //将数据做切分,形成一个个单词
    val wordStream = mqttStream
      .flatMap(_.split("\n")) // 这里分割多行文本消息
      .flatMap(_.split(" "))

    //将单词映射成元组(word,1)
    val wordAndOneStream = wordStream.map((_, 1))

    //将相同的单词次数做统计
    val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)

    //打印
    wordAndCountStream.print()

    // 开启任务
    ssc.start()

    // 等待应用程序终止
    ssc.awaitTermination()
  }
}

(4)我们测试一下,首先准备好 MQTT 服务,具体可以参考我之前写的文章:

(5)Spark Streaming 程序启动后,我们使用工具往指定主题发送一些消息:

(6)Spark Streaming 应用程序这边将会实时处理输入的数据并输出结果:
评论

全部评论(0)

回到顶部