spark 怎么启动worker

spark 怎么启动worker,第1张

基于spark1.3.1的源码进行分析

Spark master启动源肢做轿码分胡信析

1、在start-master.sh调用master的main方法,main方法调用

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new MasterArguments(argStrings, conf)

val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf)//启动系统和actor

actorSystem.awaitTermination()

}

2、调用startSystemAndActor启动系统和创建actor

def startSystemAndActor(

host: String,

port: Int,

webUiPort: Int,

conf: SparkConf): (ActorSystem, Int, Int, Option[Int]) = {

val securityMgr = new SecurityManager(conf)

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf,

securityManager = securityMgr)

val actor = actorSystem.actorOf(

Props(classOf[Master], host, boundPort, webUiPort, securityMgr, conf), actorName)

val timeout = AkkaUtils.askTimeout(conf)

val portsRequest = actor.ask(BoundPortsRequest)(timeout)

val portsResponse = Await.result(portsRequest, timeout).asInstanceOf[BoundPortsResponse]

(actorSystem, boundPort, portsResponse.webUIPort, portsResponse.restPort)

3、调用AkkaUtils.createActorSystem来创建ActorSystem

def createActorSystem(

name: String,

host: String,

port: Int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int =>(ActorSystem, Int) = { actualPort =>

doCreateActorSystem(name, host, actualPort, conf, securityManager)

}

Utils.startServiceOnPort(port, startService, conf, name)

}

4、调用Utils.startServiceOnPort启动一个端口上的服务,创建成功后调用doCreateActorSystem创建ActorSystem

5、ActorSystem创建成功后创建Actor

6、调用Master的主构造函数,历肆执行preStart()

1、start-slaves.sh调用Worker类的main方法

def main(argStrings: Array[String]) {

SignalLogger.register(log)

val conf = new SparkConf

val args = new WorkerArguments(argStrings, conf)

val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,

args.memory, args.masters, args.workDir)

actorSystem.awaitTermination()

}

2、调用startSystemAndActor启动系统和创建actor

def startSystemAndActor(

host: String,

port: Int,

webUiPort: Int,

cores: Int,

memory: Int,

masterUrls: Array[String],

workDir: String,

workerNumber: Option[Int] = None,

conf: SparkConf = new SparkConf): (ActorSystem, Int) = {

// The LocalSparkCluster runs multiple local sparkWorkerX actor systems

val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")

val actorName = "Worker"

val securityMgr = new SecurityManager(conf)

val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,

conf = conf, securityManager = securityMgr)

val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem)))

actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,

masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName)

(actorSystem, boundPort)

}

3、调用AkkaUtils的createActorSystem创建ActorSystem

def createActorSystem(

name: String,

host: String,

port: Int,

conf: SparkConf,

securityManager: SecurityManager): (ActorSystem, Int) = {

val startService: Int =>(ActorSystem, Int) = { actualPort =>

doCreateActorSystem(name, host, actualPort, conf, securityManager)

}

Utils.startServiceOnPort(port, startService, conf, name)

}

4、创建完ActorSystem后调用Worker的主构造函数,执行preStart方法

override def preStart() {

assert(!registered)

logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(

host, port, cores, Utils.megabytesToString(memory)))

logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")

logInfo("Spark home: " + sparkHome)

createWorkDir()

context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])

shuffleService.startIfEnabled()

webUi = new WorkerWebUI(this, workDir, webUiPort)

webUi.bind()

registerWithMaster()

metricsSystem.registerSource(workerSource)

metricsSystem.start()

// Attach the worker metrics servlet handler to the web ui after the metrics system is started.

metricsSystem.getServletHandlers.foreach(webUi.attachHandler)

}

5、调用registerWithMaster方法向Master注册启动的worker

def registerWithMaster() {

// DisassociatedEvent may be triggered multiple times, so don't attempt registration

// if there are outstanding registration attempts scheduled.

registrationRetryTimer match {

case None =>

registered = false

tryRegisterAllMasters()

connectionAttemptCount = 0

registrationRetryTimer = Some {

context.system.scheduler.schedule(INITIAL_REGISTRATION_RETRY_INTERVAL,

INITIAL_REGISTRATION_RETRY_INTERVAL, self, ReregisterWithMaster)

}

case Some(_) =>

logInfo("Not spawning another attempt to register with the master, since there is an" +

" attempt scheduled already.")

}

}

6、调用tryRegisterAllMasters向Master发送注册的Worker消息

private def tryRegisterAllMasters() {

for (masterAkkaUrl <- masterAkkaUrls) {

logInfo("Connecting to master " + masterAkkaUrl + "...")

val actor = context.actorSelection(masterAkkaUrl)

actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)

}

}

7、Master的receiveWithLogging接收到消息执行

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>

{

logInfo("Registering worker %s:%d with %d cores, %s RAM".format(

workerHost, workerPort, cores, Utils.megabytesToString(memory)))

if (state == RecoveryState.STANDBY) {

// ignore, don't send response

} else if (idToWorker.contains(id)) {

sender ! RegisterWorkerFailed("Duplicate worker ID")

} else {

val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,

sender, workerUiPort, publicAddress)

if (registerWorker(worker)) {

persistenceEngine.addWorker(worker)

sender ! RegisteredWorker(masterUrl, masterWebUiUrl)

schedule()

} else {

val workerAddress = worker.actor.path.address

logWarning("Worker registration failed. Attempted to re-register worker at same " +

"address: " + workerAddress)

sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "

+ workerAddress)

}

}

}

8、失败向worker返回失败消息,成功则返回Master的相关信息

9、返回消息后调用schedule,但是因为没有application,所以这时候不会进行资源的分配

至此整个Spark集群就已经启动完成

(1)master:管理集群和节点,不参与计算。

(2)worker:计算节点,进程本身不滚灶参与计算,和master汇报。

(3)Driver:运行程序的main方法,创建spark context对锋粗象。

(4)spark context:控银备镇制整个application的生命周期,包括dagsheduler和task scheduler等组件。

(5)client:用户提交程序的入口。

Spark提交程序来说,最终都是通过Spark-submit命令来实现的,不同的是spark-shell在运行时,会先进行一些初始参数册或敬的设置,然后调用Sparksubmit来运行,并且spark-shell是交互式的。

下面团茄我们从源代码的角度来解释。

首先看下Spark-Shell命令,其中它会调用main方法

在mian方法中,会调用spark-submit 并传入—class的参数(入口类)为org.apache.spark.repl.Main,设置应州慎用程序名—name “Spark shell” 传入spark-shell接收的所有参数$@。

而在Spark-submit中是通过exec命令启动进程的,如下图:

总结:所用的应用程序最后的提交都是由spark-submit完成的,其他程序的调用只是对spark-submit的参数进行设置后,调用spark-submit来完成应用程序的提交到集群的 *** 作。


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/yw/8202714.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-04-14
下一篇2023-04-14

发表评论

登录后才能评论

评论列表(0条)

    保存