提交Spark应用的若干问题记录(一)

提交Spark应用的若干问题记录(一),第1张

提交Spark应用的若干问题记录(一)

文章目录
  • (零)这是啥
  • (一)用WSL下的Ubuntu提交Spark
    • 1.1 绑定IP地址
    • 1.2 子网无法提交
    • 1.3 放弃
  • (二)集群Cluster方式提交
    • 2.1 Windows下的环境变量问题
    • 2.2 状态:LOST
    • 2.3 Python应用
    • 2.4 放弃
  • (三)Spark的Worker和Executor
    • 3.1 Worker
    • 3.2 Executor
    • 3.3 无法满足条件的Worker不工作
  • (四)RDD大结果集的collect()

(零)这是啥

尝试《在Windows下程序通过SparkLauncher执行Spark应用》中遇到很多问题。
怕以后忘了,先记录下来。
环境基本上还是《从零开始学习大数据平台》里面的样子。
只是各种软件版本都升级了。

(一)用WSL下的Ubuntu提交Spark

不知道WSL归类算到Windows还是Linux……

1.1 绑定IP地址

WSL默认拷贝一份Windows下的hosts,不过Ubuntu会比CentOS多干一件事儿。
把自己的主机名,绑定到127.0.1.1上。导致Spark警告:

WARN Utils: Your hostname, shionwsl resolves to a loopback address: 127.0.1.1; using 192.168.50.16 instead (on interface eth0)
WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address

除非让WSL不要从Windows生成hosts:

# This file was automatically generated by WSL. To stop automatic generation of this file, add the following entry to /etc/wsl.conf:
# [network]
# generateHosts = false

这部分在:《大家来玩WSL(Windows Subsystem for Linux)》里提到过。

1.2 子网无法提交

由于集群的服务器无法访问WSL子网,所以Client模式下,任务提交成功,但是driver和executor永远连接超时。
虽然WSL理论上可以桥接,但日常用太麻烦,所以还是改用桥接的Linux虚拟机来提交spark应用吧。

1.3 放弃

⚠️目前我没办法用WSL来提交Spark任务。
呵呵……

(二)集群Cluster方式提交

集群(Cluster)提交的应用(也就是Driver)是运行在集群的Worker上的,我们无法看到执行的进度,最后的结果也只能存储在HDFS上(否则看不到啊)。

2.1 Windows下的环境变量问题

参考链接:JDK-8187776。
简单说就是Windows保留了很多DOS留下的奇怪环境变量,平时看不到,但是用Java提交Spark集群方式时,这些奇怪环境变量也会被复制过去,这种【=::】,【=C:】名称的环境变量导致Spark报错:

ERROR ClientEndpoint: Exception from cluster was: java.lang.IllegalArgumentException: Invalid environment variable name: "=::"
java.lang.IllegalArgumentException: Invalid environment variable name: "=::"
	at java.lang.ProcessEnvironment.validateVariable(ProcessEnvironment.java:114)
	at java.lang.ProcessEnvironment.access0(ProcessEnvironment.java:61)
	at java.lang.ProcessEnvironment$Variable.valueOf(ProcessEnvironment.java:170)

或者:

Invalid environment variable name: "=C:"

呃,这不算Java的bug吧……
总之不知道怎么解决,Windows只好用Client方式提交,也就是应用(driver)运行在本地的方式。

2.2 状态:LOST

那么我们用Linux主机,提交Cluster模式,总是可以的吧。
确实可以,提交后命令行很快就返回了,只能在集群WEB页面,找到driver的位置查看进度:
命令行提交如下:

$ac@vm00 ~> spark-submit --class com.ac.GZBvH --master spark://vm00:7077 --deploy-mode cluster --driver-memory 1g --executor-memory 1g hdfs://vm00:9000/bin/MySparkApp_XXYYZZ.jar hdfs://vm00:9000 /TestData/GuiZhou/GPRS/1/ hdfs://vm00:9000/output/gzGPRS1/ 100 100 1
$ac@vm00 ~>
$ac@vm00 ~>

但是用SparkLauncher程序提交Cluster模式程序也很快就返回了。
报的状态是LOST。。。

这时去集群WEB页面查看,发现其实driver是正常运行的。
这部分不知道怎么弄,从原理上似乎不太好弄。
先记录一下。

补充,可能的解决方向:官方资料。
一个3.1.0的新参数spark.standalone.submit.waitAppCompletion,默认false。

In standalone cluster mode, controls whether the client waits to exit until the application completes. If set to true, the client process will stay alive polling the driver’s status. Otherwise, the client process will exit after submission.

2.3 Python应用

官方其实早就说了,Python应用无法提交到Spark独立集群:

org.apache.spark.SparkException: Cluster deploy mode is currently not supported for python applications on standalone clusters.
2.4 放弃

⚠️目前我没办法用SparkLauncher程序,来提交Cluster方式的Spark任务。
呵呵……

(三)Spark的Worker和Executor 3.1 Worker

工作节点,相当于工作站,一台虚拟的计算机,有自己的CPU核心数,内存数。
我们把Worker假设成一台计算机,那么CPU核心数(假设2核),内存数(假设32GB)就是它的硬件条件。
实际上在Spark里面是这些配置决定的:

export SPARK_WORKER_CORES=2
export SPARK_WORKER_MEMORY=32g
3.2 Executor

工作程序,相当于计算机上运行的一个程序,有它要求CPU核心数,要求的内存数。
我们把Executor假设成一个程序,那么它需要运行在计算机上,它它对CPU核心数(假设1核),内存数(假设4GB),就是它运行的基本要求。

PS:同理driver也是个程序。

我们提交时可以指定这些参数:

SparkLauncher aL = new org.apache.spark.launcher.SparkLauncher()
				.setXXX(...)
                .setConf(SparkLauncher.DRIVER_MEMORY, "4g")
                .setConf(SparkLauncher.EXECUTOR_MEMORY,"4g")
                .setConf(SparkLauncher.EXECUTOR_CORES,"1")
3.3 无法满足条件的Worker不工作

如果EXECUTOR_MEMORY,EXECUTOR_CORES参数指定超过了某些Worker的配置,
则这些Work不会参加到这个任务的计算中。

如果EXECUTOR_MEMORY,EXECUTOR_CORES参数指定超过了全部Worker的配置,
那么集群中就没有资源可以供这个任务使用了,任务会无限等待下去,等待可用资源。

这不是问题,只是个概念。

(四)RDD大结果集的collect()

Spark的核心d性分布式数据集,对于rdd.collect是这么描述的:

java.util.List collect() //Python版本差不多的
返回一个数组包含RDD中所有的元素

备注

这个方法只能在结果集较小的情况下使用,因为所有的数据都会加载到driver的内存中。

如果结果集大怎能办?
网上查了一圈,大家的说法都是:

  1. 大的数据集用rdd写入HDFS文件的方法,比如rdd.saveAsTextFile,rdd.saveAsNewAPIHadoopFile各个节点将结果写入到HDFS的目录中(一大堆文件)。
  2. 不传到driver上而是rdd.foreach,rdd.foreachPartition打印到屏幕上,或发送到数据库。

不过我比较懒,希望整个流程和以前无缝切换,
所以我采用了collectPartitions,分几批collect回来数据。

PS:但Python中没有找到collectPartitions方法啊???肿么办!!!
Java代码如下:

...
        if (ColBatch < out_2.getNumPartitions()) {//如果设置了批量值,并小于分区数,则分批collect
            int[] Par = new int[ColBatch];
            for (int i = 0; i < out_2.getNumPartitions(); i += Par.length) {
                int ParLen = 0;
                for (int j = 0; j < Par.length; j++) {
                    if (i + j < out_2.getNumPartitions()) {
                        Par[j] = i + j;
                        ParLen++;
                    }
                }
                TmpLine = String.format("当前: %d - %dn", i, i + ParLen - 1);
                System.out.print(TmpLine);
                List>[] output2 = out_2.collectPartitions(Par);
                for (int j = 0; j < ParLen; j++) {
                    for (Tuple2 tuple : output2[j]) {
                        oF002.write(String.format("%s|%sn", tuple._1(), tuple._2()));
                        g002++;
                    }
                }
            }
        } else {
            List> output1 = out_2.collect();
            for (Tuple2 tuple : output1) {
                oF002.write(String.format("%s|%sn", tuple._1(), tuple._2()));
                g002++;
            }
        }
...

先就这样吧。

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/zaji/5699117.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-12-17
下一篇2022-12-17

发表评论

登录后才能评论

评论列表(0条)

    保存