在spark集群中怎么设置worker节点的数目

在spark集群中怎么设置worker节点的数目,第1张

部署这种模式,需要修改conf目录下的spark-env.sh文件。在其中新增如下配置选项:

export HADOOP_HOME= /home/hadoop/hadoop-2.0.0-cdh4.5.0

export HADOOP_CONF_DIR= $HADOOP_HOME/etc/hadoop

SPARK_EXECUTOR_INSTANCES=2

SPARK_EXECUTOR_CORES=1

SPARK_EXECUTOR_MEMORY=400M

SPARK_DRIVER_MEMORY=400M

SPARK_YARN_APP_NAME="Spark 1.0.0"

基于spark1.3.1的源码进行分析

Spark master启动源码分析

1、在start-master.sh调用master的main方法,main方法调用

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new MasterArguments(argStrings, conf)

val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)//启动系统和actor

actorSystem.awaitTermination()

}

2、调用startSystemAndActor启动系统和创建actor

def startSystemAndActor(

host: String,

port: Int,

webUiPort: Int,

conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {

val securityMgr = new SecurityManager(conf)

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,

securityManager = securityMgr)

val actor = actorSystem.actorOf(

Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)

val timeout = AkkaUtils.askTimeout(conf)

val portsRequest = actor.ask(BoundPortsRequest)(timeout)

val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]

(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)

3、调用AkkaUtils.createActorSystem来创建ActorSystem

def createActorSystem(

name: String,

host: String,

port: Int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int =>(ActorSystem, Int) = { actualPort =>

doCreateActorSystem(name, host, actualPort, conf, securityManager)

}

Utils.startServiceOnPort(port, startService, conf, name)

}

4、调用Utils.startServiceOnPort启动一个端口上的服务,创建成功后调用doCreateActorSystem创建ActorSystem

5、ActorSystem创建成功后创建Actor

6、调用Master的主构造函数,执行preStart()

1、start-slaves.sh调用Worker类的main方法

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new WorkerArguments(argStrings, conf)

val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,

args.memory, args.masters, args.workDir)

actorSystem.awaitTermination()

}

2、调用startSystemAndActor启动系统和创建actor

def startSystemAndActor(

host: String,

port: Int,

webUiPort: Int,

cores: Int,

memory: Int,

masterUrls: Array[String],

workDir: String,

workerNumber: Option[Int] = None,

conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems

val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")

val actorName = "Worker"

val securityMgr = new SecurityManager(conf)

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,

conf = conf, securityManager = securityMgr)

val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))

actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,

masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)

(actorSystem, boundPort)

}

3、调用AkkaUtils的createActorSystem创建ActorSystem

def createActorSystem(

name: String,

host: String,

port: Int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int =>(ActorSystem, Int) = { actualPort =>

doCreateActorSystem(name, host, actualPort, conf, securityManager)

}

Utils.startServiceOnPort(port, startService, conf, name)

}

4、创建完ActorSystem后调用Worker的主构造函数,执行preStart方法

override def preStart() {

assert(!registered)

logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(

host, port, cores, Utils.megabytesToString(memory)))

logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")

logInfo("Spark home: " + sparkHome)

createWorkDir()

context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

shuffleService.startIfEnabled()

webUi = new WorkerWebUI(this, workDir, webUiPort)

webUi.bind()

registerWithMaster()

metricsSystem.registerSource(workerSource)

metricsSystem.start()

// Attach the worker metrics servlet handler to the web ui after the metrics system is started.

metricsSystem.getServletHandlers.foreach(webUi.attachHandler)

}

5、调用registerWithMaster方法向Master注册启动的worker

def registerWithMaster() {

// DisassociatedEvent may be triggered multiple times, so don't attempt registration

// if there are outstanding registration attempts scheduled.

registrationRetryTimer match {

case None =>

registered = false

tryRegisterAllMasters()

connectionAttemptCount = 0

registrationRetryTimer = Some {

context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,

INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)

}

case Some(_) =>

logInfo("Not spawning another attempt to register with the master, since there is an" +

" attempt scheduled already.")

}

}

6、调用tryRegisterAllMasters向Master发送注册的Worker消息

private def tryRegisterAllMasters() {

for (masterAkkaUrl <- masterAkkaUrls) {

logInfo("Connecting to master " + masterAkkaUrl + "...")

val actor = context.actorSelection(masterAkkaUrl)

actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)

}

}

7、Master的receiveWithLogging接收到消息执行

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>

{

logInfo("Registering worker %s:%d with %d cores, %s RAM".format(

workerHost, workerPort, cores, Utils.megabytesToString(memory)))

if (state == RecoveryState.STANDBY) {

// ignore, don't send response

} else if (idToWorker.contains(id)) {

sender ! RegisterWorkerFailed("Duplicate worker ID")

} else {

val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,

sender, workerUiPort, publicAddress)

if (registerWorker(worker)) {

persistenceEngine.addWorker(worker)

sender ! RegisteredWorker(masterUrl, masterWebUiUrl)

schedule()

} else {

val workerAddress = worker.actor.path.address

logWarning("Worker registration failed. Attempted to re-register worker at same " +

"address: " + workerAddress)

sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "

+ workerAddress)

}

}

}

8、失败向worker返回失败消息,成功则返回Master的相关信息

9、返回消息后调用schedule,但是因为没有application,所以这时候不会进行资源的分配

至此整个Spark集群就已经启动完成

Spark集群有三种运行模式:Standalone、Mesos和YARN模式。

现在说Standalone模式。这是最简单的模式,Spark靠自己就能运行这个模式(不依靠其它集群管理工具)。

方法一:手动运行Standalone模式。

前提:Spark各个文件都不做任何修改。

1、在master机器上运行 ./sbin/start-master/sh

运行完之后,会打印出url: spark://HOST:PORT ,这个就是当前master的Spark URL。

2、在slave机器上运行 ./sbin/start-slave.sh <master-spark-url>

然后在Master的管理界面上查看,查看slave是否已上线。

方法二:使用集群运行脚本运行Standalone模式。

前提:master节点去访问slave节点需要使用ssh无密码登录,因此需要提前配置无密码登录。

1、在master的conf文件夹下新增slaves文件。slaves文件里存放着每一个slave节点的hostname,每行一个。

2、在master节点上运行如下脚本即可


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/bake/11367732.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-15
下一篇2023-05-15

发表评论

登录后才能评论

评论列表(0条)

    保存