Spark - 将项目打成Jar包、及任务提交执行教程(附:三种提交模式、常用参数)
作者:hangge | 2024-07-22 09:07
Apache Spark 是一个快速、通用的分布式计算系统,适用于大规模数据处理。为了让我们开发的应用程序能够在 Spark 集群上运行,我们需要将任务代码打包成一个可执行的 JAR 文件,并提交到 Spark 集群执行。下面我将详细演示如何完成这一过程。
(2)然后把 spark-core 依赖的作用域设置为 provided,表示只在编译时使用该依赖,在执行及打包时都不使用。
(2)文件内容如下:
(3)接着执行如下命令把测试数据上传至 HDFS:
(2)此时任务会被提交到 YARN 集群中,可以看到任务执行成功了:
(2)中间的值 yarn client 模式,由于是 on yarn 模式,所以里面是 yarn 集群的进程,此时 driver 进程就在提交 spark 任务的客户端机器上了。
1,编写代码
(1)首先我们创建一个 Spark 项目,具体流程可以参考我之前写的文章:
(2)接着我们编写一个简单的 WordCount 单词统计任务程序,具体代码如下,其中有两个地方要特别注意:
- 我们需要将 setMaster("local") 注释掉,后面我们会在提交任务的时候动态指定 master 信息。
- 修改代码中的输入文件路径信息,因为这个时候无法读取 windows 中的数据了,把代码修改成动态接收输入文件路径。
import org.apache.spark.{SparkConf, SparkContext} object WordCount { def main(args: Array[String]): Unit = { //第一步:创建SparkContext val conf = new SparkConf() conf.setAppName("WordCount")//设置任务名称 // .setMaster("local")//local表示在本地执行 val sc = new SparkContext(conf) //第二步:加载数据 var path = "D:\\hello.txt" if (args.length == 1) { path = args(0) } val linesRDD = sc.textFile(path) //第三步:对数据进行切割,把一行数据切分成一个一个的单词 val wordsRDD = linesRDD.flatMap(_.split(" ")) //第四步:迭代words,将每个word转化为(word,1)这种形式 val pairRDD = wordsRDD.map((_,1)) //第五步:根据key(其实就是word)进行分组聚合统计 val wordCountRDD = pairRDD.reduceByKey(_ + _) //第六步:将结果打印到控制台 wordCountRDD.foreach(wordCount=>println(wordCount._1+"--"+wordCount._2)) //第七步:停止SparkContext sc.stop() } }
2,将项目打成 Jar 包
(1)首先修改项的 pom.xml 文件,添加 Maven 的编译打包插件配置(和 dependencies 标签平级)。
<build> <plugins> <!-- java编译插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.6.0</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <!-- scala编译插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.6</version> <configuration> <scalaCompatVersion>2.13</scalaCompatVersion> <scalaVersion>2.13.1</scalaVersion> </configuration> <executions> <execution> <id>compile-scala</id> <phase>compile</phase> <goals> <goal>add-source</goal> <goal>compile</goal> </goals> </execution> <execution> <id>test-compile-scala</id> <phase>test-compile</phase> <goals> <goal>add-source</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> <!-- 打包插件 --> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
(2)然后把 spark-core 依赖的作用域设置为 provided,表示只在编译时使用该依赖,在执行及打包时都不使用。
提示:因为 spark-core 依赖在 Spark 集群中已经存在了,所以在打 Jar 包时就不需要将其打包进去了。如果我们使用了集群中没有的第三方依赖包,则需要将其打进 Jar 包里。
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.13</artifactId> <version>3.4.0</version> <scope>provided</scope> </dependency> </dependencies>
(3)接着执行打 Jar 包的操作:
(4)打包完毕后,在在项目的 target 目录下看到生成的 XXX-jar-with-dependencies.jar 文件,这个就是我们需要的 jar 包。
3,准备测试数据
(1)我们登录服务器,创建一个 hello.txt
vi hello.txt
(2)文件内容如下:
hello hangge.com welcome to hangge.com hello world bye world
(3)接着执行如下命令把测试数据上传至 HDFS:
hdfs dfs -put hello.txt /
4,提交任务
(1)将我们前面生成的 Jar 包上传至 Spark 集群的任意一台机器上,并且执行如下命令向集群提交 Jar 包:
./bin/spark-submit \ --master yarn \ --class WordCount \ spark-1.0-SNAPSHOT-jar-with-dependencies.jar \ hdfs://192.168.121.128:9000/hello.txt
(2)此时任务会被提交到 YARN 集群中,可以看到任务执行成功了:
附一:Spark Job 的三种提交模式
1,standalone 模式
(1)该模式基于 Spark 自己的 standalone 集群。
(2)要使用该模式提交时指定如下参数即可:
--master spark://node1:7077
2,基于 YARN 的 client 模式
(1)这种方式主要用于测试,因为 driver 进程运行在本地客户端,查看日志方便一些,部分日志会直接打印到控制台上面。
注意:因为 driver 进程运行在本地客户端,就是提交 Spark 任务的那个客户端机器,driver 负责调度 job,会与 yarn 集群产生大量的通信,一般情况下 Spark 客户端机器和 Hadoop 集群的机器是无法内网通信,只能通过外网,这样在大量通信的情况下会影响通信效率,并且当我们执行一些 action 操作的时候数据也会返回给 driver 端,driver 端机器的配置一般都不高,可能会导致内存溢出等问题。
(2)要使用该模式提交时指定如下参数即可:
–master yarn --deploy-mode client
3,是基于 YARN 的 cluster 模式(推荐)
(1)这种方式 driver 进程运行在集群中的某一台机器上,这样集群内部节点之间通信是可以通过内网通信的,并且集群内的机器的配置也会比普通的客户端机器配置高,所以就不存在 yarn-client 模式的一些问题了,只不过这个时候查看日志只能到集群上面看了。
(2)要使用该模式提交时指定如下参数即可:
–master yarn --deploy-mode cluster
4,三种模式对比
(1)左边是 standalone 模式,driver 进程是在客户端机器中的。
提示:其实针对 standalone 模式而言,这个 Driver 进程也是可以运行在集群中的,只需要通过指定 deploy-mode 为 cluster 即可。
(3)最右边这个是 yarn cluster 模式,driver 进程就会在集群中的某一个节点上面。
附二:spark-submit 脚本常用参数
(1)--name mySparkJobName :指定任务名称
(2)--class com.hangge.xxxxx :指定入口类
(3)--master yarn :指定集群地址,on yarn 模式指定 yarn
(4)--deploy-mode cluster :client 代表 yarn-client,cluster 代表 yarn-cluster
(5)--executor-memory 1G :executor 进程的内存大小,实际工作中设置 2~4G 即可
(6)--num-executors 2 :分配多少个executor 进程
(7)--executor-cores 2 : 一个 executor 进程分配多少个 cpu core
(8)--driver-cores 1 :driver 进程分配多少 cpu core,默认为 1 即可
(9)--driver-memory 1G:driver 进程的内存,如果需要使用类似于 collect 之类的 action 算子向 driver 端拉取数据,则这里可以设置大一些
(10)--conf "spark.default.parallelism=10" :可以动态指定一些 spark 任务的参数,指定多个参数可以通过多个 --conf 来指定,或者在一个 --conf 后面的双引号中指定多个,多个参数之间用空格隔开即可
(11)--jars fastjson.jar,abc.jar :在这里可以设置 job 依赖的第三方 jar 包
注意:
- 不建议把第三方依赖 jar 包整体打包进 saprk 的 job 中,那样会导致任务 jar 包过大,并且也不方便统一管理依赖 jar 包的版本
- 这里的 jar 包路径可以指定本地磁盘路径,或者是 hdfs 路径,建议使用 hdfs 路径,因为 spark 在提交任务的时候会把本地磁盘的依赖 jar 包也上传到 hdfs 的一个临时目录中,如果在这里本来指定的就是 hdfs 的路径, 那么 spark 在提交任务的时候就不会重复提交依赖的这个 jar 包了,这样其实可以提高任务提交的效率,并且这样可以方便统一管理第三方依赖 jar 包,所有人都统一使用 hdfs 中的共享的这些第三方 jar 包,这样版本就统一了,所以我们可以在 hdfs 中创建一个类似于 commonLib 的目录,统一存放第三方依赖的 jar 包
- 如果一个 Spark job 需要依赖多个 jar 包,在这里可以一次性指定多个,多个 jar 包之间通过逗号隔开即可
全部评论(0)