返回 导航

Spark

hangge.com

Spark - Spark Streaming使用详解10(优雅的关闭作业程序)

作者:hangge | 2023-12-21 08:50

十、优雅的关闭作业程序

1,基本介绍

(1)流式任务需要 7*24 小时执行,但是有时涉及到升级代码需要主动停止程序。如果需要优雅的关闭 Spark Streaming,则在程序中执行如下代码即可,其中 stop 方法参数说明如下:
  • 第一个 true 意思是 Spark context 需要被停止。
  • 第二个 true 意思是需要优雅的关闭,允许正在处理的消息完成,避免正在处理的数据丢失。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

    // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 设置关闭钩子
    ShutdownHookThread {
      // 优雅地关闭资源
      ssc.stop(stopSparkContext = true, stopGracefully = true)
      // 执行其他资源清理操作,例如关闭数据库连接等
      println("程序已经被优雅地关闭了.")
    }

    // 从监控端口读取数据流
    val inputDStream = ssc.socketTextStream("localhost", 9999)

    // 打印结果
    inputDStream.print()

    // 启动 StreamingContext
    ssc.start()

    // 等待应用程序终止(遇错停止,否则不断工作)
    ssc.awaitTermination()
  }
}

(3)假设我在在 IntelliJ IDEA 中运行程序,程序启动以后,我们点击停止按钮,应用程序会收到中断信号,并触发关闭钩子。并且在关闭过程中尝试优雅地处理当前正在处理的数据,等待已经启动的批处理作业完成。

2,监听外部文件来关闭程序

(1)由于分布式程序的特性,逐个进程进行手动终止将会十分困难。因此,配置一个优雅的关闭机制显得尤为重要。我们可以通过外部文件系统来实现内部程序的有序关闭控制。
(2)首先我们创建了一个 FileListenerThread 类,它实现了 Runnable 接口。在 run() 方法中,我们通过循环定时检查文件,一旦发现控制文件存在,就会触发关闭钩子并停止 StreamingContext
class FileListenerThread(ssc: StreamingContext, controlFilePath: String) extends Runnable {
  private val controlFile = new File(controlFilePath)

  override def run(): Unit = {
    while (!Thread.currentThread().isInterrupted) {
      Thread.sleep(5000) // 间隔一段时间检查一次文件

      if (checkControlFile()) {
        // 触发关闭钩子,然后停止 StreamingContext
        ssc.stop(stopSparkContext = true, stopGracefully = true)
        controlFile.delete() // 删除控制文件
        println("程序已经被优雅地关闭了.")
        return
      }
    }
  }

  // 检查文件是否存在
  private def checkControlFile(): Boolean = {
    controlFile.exists() && controlFile.isFile()
  }
}

(2)接着我们在 Spark Streaming 应用程序中使用这个自定义的 FileListenerThread 类,它将在应用程序中独立地启动一个线程来监听控制文件,实现外部触发程序关闭的需求。
注意:为了方便演示,这里我将控制文件路径就直接设置为程序目录下的 datas/stop 文件,实际中可以指定成 hdfs 路径。
object Hello {
  def main(args: Array[String]): Unit = {
    // 创建 Spark 运行配置对象
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Hello")

    // 初始化 StreamingContext,设置微批处理的时间间隔为 5 秒
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    // 从监控端口读取数据流
    val inputDStream = ssc.socketTextStream("localhost", 9999)

    // 打印结果
    inputDStream.print()

    // 启动一个线程来监听控制文件
    val fileListenerThread = new FileListenerThread(ssc, "datas/stop")
    val thread = new Thread(fileListenerThread)
    thread.start()

    // 启动 StreamingContext
    ssc.start()

    // 等待应用程序终止(遇错停止,否则不断工作)
    ssc.awaitTermination()
  }
}

(3)程序启动后,我们在程序的 datas 目录下创建一个 stop 文件,稍等一会便会发现程序能监控到该文件,并优雅地自动关闭了。
评论

全部评论(0)

回到顶部