
顾名思义,用来存配置的Map,可以存单个配置或者配置文件,在 POD 中我们可以通过环境变量来访问单个配置和配置文件,后者一般会被 mount 到 Pod 的一个 Volume 上,这样我们在跑 Spark on k8s 的时候,可以将一些配置放在 ConfigMap 中,和容器镜像解耦。
如果使用 Spark 社区默认的 spark-submit 的提交方式,在 Spark 30 之后(当然目前还没有release),支持以下类型的 ConfigMap。
这些是用户可以配置的,在 Spark on k8s 的内部运行实现上,也用了一些 ConfigMap 特性带来的便利,可以通过 describe 命令查看下 Driver Pod 的描述,
比如 spark-conf-volume 这个,就存放了客户端汇总的所有 spark 相关的配置,默认被 mount 到 /opt/spark/conf/sparkproperties 这个位置,然后在 Driver Pod 启动时,通过 --properties-file 参数指定,也可以通过 describe 命令,我们也可以查看这个 map 的内容
使用 ConfigMap 之前必须先创建,我们可以通过 kubectl create configmap --help 了解这个命令的用法。
ConfigMap 和命名空间是绑定的,创建在对应的命名空间内,后面跑 spark 的时候才能使用。
我们以 sparkkuberneteshadoopconfigMapName 来举例,我们用来创建ConfigMap 来存储 HADOOP_CONF_DIR 下的文件。
可以通过上面两种方式,直接使用路径 或者 一个文件一个文件来指定
通过 describe 命令可以查看该 ConfigMap 信息。
ConfigMap 的 数据部分,key值对应的是文件的名字,Value对应的是该文件的实际内容。
创建完成后,我们就可以通过将它的名称,如hz10-hadoop-dir,直接设置给 sparkkuberneteshadoopconfigMapName 就可以使用啦
csv1map(line => linesplit(",")map(elem =>{
val sb = new StringBuilder()
for (e <-elem ){
sbappend(e)
sbappend(,)
}
})
我先写了一个kafka的生产者程序,然后写了一个kafka的消费者程序,一切正常。
生产者程序生成5条数据,消费者能够读取到5条数据。然后我将kafka的消费者程序替换成使用spark的读取kafka的程序,重复多次发现每次都是读取1号分区的数据,而其余的0号和2号2个分区的数据都没有读到。请哪位大侠出手帮助一下。
我使用了三台虚拟机slave122,slave123,slave124作为kafka集群和zk集群;然后生产者和消费者程序以及spark消费者程序都是在myeclipse上完成。
软件版本为:kafka_211-01010,spark-streaming-kafka-0-10_211-210,zookeeper-349
spark消费者程序主要代码如下:
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParamsput("bootstrapservers", "slave124:9092,slave122:9092,slave123:9092");
kafkaParamsput("keydeserializer", "orgapachekafkacommonserializationStringDeserializer");
kafkaParamsput("valuedeserializer","orgapachekafkacommonserializationStringDeserializer");
kafkaParamsput("groupid", "ssgroup");
kafkaParamsput("autooffsetreset", "earliest"); //update mykafka,"earliest" from the beginning,"latest" from the rear of topic
kafkaParamsput("enableautocommit", "true"); //messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semantics
kafkaParamsput("autocommitintervalms", "5000");
// Create a local StreamingContext with two working thread and batch interval of 2 second
SparkConf conf = new SparkConf();
//conf被set后,返回新的SparkConf实例,所以多个set必须连续,不能拆开。
confsetMaster("local[1]")setAppName("streaming word count")setJars(new String[]{"D:\\Workspaces\\MyEclipse 2015\\MyFirstHadoop\\bin\\MyFirstHadoopjar"});;
try{
JavaStreamingContext jssc = new JavaStreamingContext(conf, Durationsseconds(5));
Collection<String> topics = new HashSet<>(ArraysasList("order"));
JavaInputDStream<ConsumerRecord<String, String>> oJInputStream = KafkaUtilscreateDirectStream(
jssc,
LocationStrategiesPreferConsistent(),
ConsumerStrategies<String, String>Subscribe(topics, kafkaParams)
);
JavaPairDStream<String, String> pairs = oJInputStreammapToPair(
new PairFunction<ConsumerRecord<String, String>, String, String>() {
private static final long serialVersionUID = 1L;
@Override
public Tuple2<String, String> call(ConsumerRecord<String, String> record) {
try {
BufferedWriter oBWriter = new BufferedWriter(new FileWriter("D:\\Workspaces\\MyEclipse 2015\\MyFirstHadoop\\bin\\mysparkstream\\MyFirstHadoopout",true));
String strLog = "^^^^^^^^^^^ " + SystemcurrentTimeMillis() / 1000 + " mapToPair:topic:" + recordtopic() + ",key:" + recordkey() + ",value:" + recordvalue() + ",partition id:" + recordpartition() + ",offset:" + recordoffset() + "\n";
Systemoutprintln(strLog);
oBWriterwrite(strLog);
oBWriterclose();
} catch (IOException e) {
// TODO Auto-generated catch block
eprintStackTrace();
}
return new Tuple2<>(recordkey(), recordvalue());
}
}
);
pairsprint();
jsscstart(); //start here in fact
jsscawaitTermination();
jsscclose();
}catch(Exception e){
// TODO Auto-generated catch block
Systemoutprintln("Exception:throw one exception");
eprintStackTrace();
}
1、你在工作当中有遇到内存溢出问题吗?你是如何解决的?
回答思路:先解释spark的内存模型,再分情况介绍不同情况下的解决方案。总体思想是根据内存模型找出不够的那一块内存,要么提升占比,要么整体增加。
oom通常出现在execution内存中,因为storage这块内存在放满之后,会直接丢弃内存中旧的数据,对性能有点影响但不会导致oom。存储内存和执行内存可以互相借用内存空间。
而spark的oom问题主要分为三种情况:
①map执行后的内存溢出
--场景:maptask所运行的executor内存溢出。
增加堆内内存,申请的堆外内存也会随之增加
--executor -memory
增加堆外内存
--conf sparkexcutormemoryoverhead 2048
默认申请的堆外内存是Executor内存的10%。
②shuffle后内存溢出
reduce task去map一边拉取数据,一边聚合。reduce端有一块聚合内存,executor memory 02
解决方案:增加reduce聚合内存的比例,设置sparkshufflememoryfraction
增加executor memory的大小
减少reduce task每次拉取的数据量,设置sparkreducermaxSizeInFlight 24m
③driver内存溢出
--场景一:用户在Dirver端口生成大对象,比如创建了一个大的集合数据结构
解决思路:Ⅰ将大对象转换成Executor端加载,比如调用sctextfile
Ⅱ评估大对象占用的内存,增加dirver-menory的值
--场景二:从Executor端收集数据(collect)回Dirver端
解决思路:Ⅰ本身不建议将大的数据从executor端,collect回来。建议将driver端对collect回来的数据所作的 *** 作,转换成executor端rdd *** 作
Ⅱ若无法避免,估算collect需要的内存,相应增加driver-memory的值
--场景三:spark自身框架的消耗
主要由spark UI数据消耗,取决于作业的累计task个数
解决思路:Ⅰ从hdfs load的parition是自动计算,但在过滤之后,已经大大减少了数据量,此时可以缩小partitions。
Ⅱ通过参数sparkuiretainedStages/sparkuiretainedjobs控制(默认1000)
2、shuffle file not found可能是什么原因导致的报错?
产生该报错的原因可能是后一个stage的task从上一个stage的task所在的executor拉取数据,但是上一个stage正在执行GC,导致数据没有拉渠道,出现该错误。可以通过调整拉取的次数和间隔时间来避免此类事件发生。
val conf =new SparkConf()
set("sparkshuffleiomaxRetries","6')
set("sparkshuffleioretrywait","60s")
3、栈溢出?
yarn-client模式下,Dirver是运行在本地机器上的,spark使用的jvm的permGen是128m,可能在client上测试没有问题
yarn-cluster模式下,Dirver是运行在集群的某个节点上,使用的是没有经过配置的默认配置,PermGen永久代大小为82m。运行时报栈溢出。
--解决方案:在spark-submit脚本中对相关参数进行设置
--conf sparkdirverextraJavaOptions="-xx:PerSize=128M -xx:MaxPermSize=256m"
通过前面的介绍我们知道,Spark的客户端(Driver)有两种:Spark Submit和Spark Shell。这两种客户端相同点都是维护一个Spark Context对象,来向Spark集群提交任务;不同点是Spark Submit只能提交任务,不能交互,而Spark Shell是一个命令行工具,即可以提交任务,还可以人机交互。本节先来介绍Spark Submit客户端的使用。
例子:使用蒙特卡罗方法计算圆周率。
如图所示,蒙特卡罗方法求圆周率,使用的是概率的思想:往正方形区域随机撒点,总点数记为P2,落在单位圆区域内的点数记为P1,单位圆的面积为π,正方形的面子为4,π = P1 / P2 4。这里的P1和P2均由随机实验中得到,实验的次数(P2)越多,得到的结果就越精确。
Spark提供的测试用例$SPARK_HOME/examples/jars/spark-examples_211-210jar中就有蒙特卡罗求圆周率的例子SparkPI,我们就使用它来介绍Spark Submit的使用。
(1)如果配置了基于Zookeeper的Spark HA,需要先启动Zookeeper服务器
(2)启动Spark集群
使用Spark Submit的命令格式如下:
(1)提交SparkPI任务,随机实验次数为100:
(2)提交SparkPI任务,随机实验次数为1000:
(3)提交SparkPI任务,随机实验次数为10000:
可以看到上面三次实验的结果分别是:
一般对于随机实验来说,试验次数越多结果越精确。但是不免存在误差。如果想要获取更精确的圆周率,你可以输入更多的次数进行测试。但这不是本文介绍的重点。
至此,使用Spark Submit客户端提交Spark任务的方法已经介绍完毕,祝你玩的愉快!
以上就是关于Spark on k8s: 配置和使用ConfigMap全部的内容,包括:Spark on k8s: 配置和使用ConfigMap、spark中如何读取csv后得到一个arrary对象、请教一个关于使用spark 读取kafka只能读取一个分区数据的问题等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)