如何进行CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的分析

如何进行CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的分析

今天就跟大家聊聊有关如何进行CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的分析,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

CoarseGrainedSchedulerBackend是Driver端用到的,CoarseGrainedExecutorBackend是Executor端用到的。他们都是Backend,什么是Backend?Backend其实就是负责端到端通信的,这两个CoarseGrained的Backend是负责Driver和Executor之间的通信的。

如何进行CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的分析

什么是Driver呢?

Driver就是我们编写的spark代码,里面的main函数就是Driver跑的代码。

什么是Executor呢?

Executor就是执行spark的Task任务的地方,Backend接收到Driver的LaunchTask消息后,调用Executor类的launchTask方法来执行任务。

Driver会启动CoarseGrainedSchedulerBackend,通过CoarseGrainedSchedulerBackend来向集群申请机器以便启动Executor,会找到一台机器,发送命令让机器启动一个ExecutorRunner,ExecutorRunner里启动CoarseGrainedExecutorBackend向Driver注册,并创建Executor来处理CoarseGrainedExecutorBackend接收到的请求。刚刚说的是Standalone部署下的流程,Yarn下大部分类似,只有向集群申请机器来启动Executor这一步不太一样,这个简单说一下吧。

Yarn环境下,是通过spark-yarn工程里的几个类一级yarn本身的功能来一起完成机器的部署和分区任务的分发。

spark-yarn包含两个文件:client.java和ApplicationMaster.java。

client.java功能是向yarn申请资源来执行ApplicationMaster.java的代码,所以这里主要看下ApplicationMaster.java的代码功能是什么。

ApplicationMaster首先干两件事,启动一个"/bin/mesos-master"和多个"/bin/mesos-slave",这都是向yarn申请资源然后部署上去执行的,都是yarn的功能部分,"/bin/mesos-master"和"/bin/mesos-slave"是yarn环境里自带的两个bin程序,可以看成是类似Standalone环境下的Master和Worker。

launchContainer方法是启动yarn的container,也就是前面说的在container上启动“/bin/mesos-slave",mesos-slave会向mesos-master注册的。等需要的slave节点资源全部申请启动完成后,调用startApplication()方法开始执行Driver。

startApplication()方法:

//Starttheuser'sapplicationprivatevoidstartApplication()throwsIOException{try{StringsparkClasspath=getSparkClasspath();StringjobJar=newFile("job.jar").getAbsolutePath();StringjavaArgs="-Xms"+(masterMem-128)+"m-Xmx"+(masterMem-128)+"m";javaArgs+="-Djava.library.path="+mesosHome+"/lib/java";StringsubstitutedArgs=programArgs.replaceAll("\\[MASTER\\]",masterUrl);if(mainClass.equals("")){javaArgs+="-cp"+sparkClasspath+"-jar"+jobJar+""+substitutedArgs;}else{javaArgs+="-cp"+sparkClasspath+":"+jobJar+""+mainClass+""+substitutedArgs;}Stringjava="java";if(System.getenv("JAVA_HOME")!=null){java=System.getenv("JAVA_HOME")+"/bin/java";}StringbashCommand=java+""+javaArgs+"1>"+logDirectory+"/application.stdout"+"2>"+logDirectory+"/application.stderr";LOG.info("Command:"+bashCommand);String[]command=newString[]{"bash","-c",bashCommand};String[]env=newString[]{"SPARK_HOME="+sparkHome,"MASTER="+masterUrl,"SPARK_MEM="+(slaveMem-128)+"m"};application=Runtime.getRuntime().exec(command,env);newThread("waitforuserapplication"){publicvoidrun(){try{appExitCode=application.waitFor();appExited=true;LOG.info("Userapplicationexitedwithcode"+appExitCode);}catch(InterruptedExceptione){e.printStackTrace();}}}.start();}catch(SparkClasspathExceptione){unregister(false);System.exit(1);return;}}

这就是启动Driver了,masterUrl就是”bin/mesos-master“的地址,设置成了环境变量”MASTER“来用了,yarn下的master的地址格式是”mesos://host:port“,Standalone下是”spark://host:port“。

在SparkContext下会根据master地址格式,做不同的处理,这段代码是这样:

mastermatch{case"local"=>checkResourcesPerTask(clusterMode=false,Some(1))valscheduler=newTaskSchedulerImpl(sc,MAX_LOCAL_TASK_FAILURES,isLocal=true)valbackend=newLocalSchedulerBackend(sc.getConf,scheduler,1)scheduler.initialize(backend)(backend,scheduler)caseLOCAL_N_REGEX(threads)=>deflocalCpuCount:Int=Runtime.getRuntime.availableProcessors()//local[*]estimatesthenumberofcoresonthemachine;local[N]usesexactlyNthreads.valthreadCount=if(threads=="*")localCpuCountelsethreads.toIntif(threadCount<=0){thrownewSparkException(s"Askedtorunlocallywith$threadCountthreads")}checkResourcesPerTask(clusterMode=false,Some(threadCount))valscheduler=newTaskSchedulerImpl(sc,MAX_LOCAL_TASK_FAILURES,isLocal=true)valbackend=newLocalSchedulerBackend(sc.getConf,scheduler,threadCount)scheduler.initialize(backend)(backend,scheduler)caseLOCAL_N_FAILURES_REGEX(threads,maxFailures)=>deflocalCpuCount:Int=Runtime.getRuntime.availableProcessors()//local[*,M]meansthenumberofcoresonthecomputerwithMfailures//local[N,M]meansexactlyNthreadswithMfailuresvalthreadCount=if(threads=="*")localCpuCountelsethreads.toIntcheckResourcesPerTask(clusterMode=false,Some(threadCount))valscheduler=newTaskSchedulerImpl(sc,maxFailures.toInt,isLocal=true)valbackend=newLocalSchedulerBackend(sc.getConf,scheduler,threadCount)scheduler.initialize(backend)(backend,scheduler)caseSPARK_REGEX(sparkUrl)=>checkResourcesPerTask(clusterMode=true,None)valscheduler=newTaskSchedulerImpl(sc)valmasterUrls=sparkUrl.split(",").map("spark://"+_)valbackend=newStandaloneSchedulerBackend(scheduler,sc,masterUrls)scheduler.initialize(backend)(backend,scheduler)caseLOCAL_CLUSTER_REGEX(numSlaves,coresPerSlave,memoryPerSlave)=>checkResourcesPerTask(clusterMode=true,Some(coresPerSlave.toInt))//Checktomakesurememoryrequested<=memoryPerSlave.OtherwiseSparkwilljusthang.valmemoryPerSlaveInt=memoryPerSlave.toIntif(sc.executorMemory>memoryPerSlaveInt){thrownewSparkException("Askedtolaunchclusterwith%dMiBRAM/workerbutrequested%dMiB/worker".format(memoryPerSlaveInt,sc.executorMemory))}valscheduler=newTaskSchedulerImpl(sc)vallocalCluster=newLocalSparkCluster(numSlaves.toInt,coresPerSlave.toInt,memoryPerSlaveInt,sc.conf)valmasterUrls=localCluster.start()valbackend=newStandaloneSchedulerBackend(scheduler,sc,masterUrls)scheduler.initialize(backend)backend.shutdownCallback=(backend:StandaloneSchedulerBackend)=>{localCluster.stop()}(backend,scheduler)casemasterUrl=>checkResourcesPerTask(clusterMode=true,None)valcm=getClusterManager(masterUrl)match{caseSome(clusterMgr)=>clusterMgrcaseNone=>thrownewSparkException("CouldnotparseMasterURL:'"+master+"'")}try{valscheduler=cm.createTaskScheduler(sc,masterUrl)valbackend=cm.createSchedulerBackend(sc,masterUrl,scheduler)cm.initialize(scheduler,backend)(backend,scheduler)}catch{casese:SparkException=>throwsecaseNonFatal(e)=>thrownewSparkException("Externalschedulercannotbeinstantiated",e)}}}

如果是yarn,会落到最后一个case语句:

casemasterUrl=>checkResourcesPerTask(clusterMode=true,None)valcm=getClusterManager(masterUrl)match{caseSome(clusterMgr)=>clusterMgrcaseNone=>thrownewSparkException("CouldnotparseMasterURL:'"+master+"'")}try{valscheduler=cm.createTaskScheduler(sc,masterUrl)valbackend=cm.createSchedulerBackend(sc,masterUrl,scheduler)cm.initialize(scheduler,backend)(backend,scheduler)}catch{casese:SparkException=>throwsecaseNonFatal(e)=>thrownewSparkException("Externalschedulercannotbeinstantiated",e)}

这里会用到ClusterManager的类,这又是什么东东呢?spark难就难在这,涉及的概念太多。

privatedefgetClusterManager(url:String):Option[ExternalClusterManager]={valloader=Utils.getContextOrSparkClassLoadervalserviceLoaders=ServiceLoader.load(classOf[ExternalClusterManager],loader).asScala.filter(_.canCreate(url))if(serviceLoaders.size>1){thrownewSparkException(s"Multipleexternalclustermanagersregisteredfortheurl$url:$serviceLoaders")}serviceLoaders.headOption}

找到所有的ExternalClusterManager类及子类,看哪个类的canCreate方法对url返回true,我们这里就是找满足"mesos://host:port"的类。

看完上述内容,你们对如何进行CoarseGrainedSchedulerBackend和CoarseGrainedExecutorBackend的分析有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注恰卡编程网行业资讯频道,感谢大家的支持。

发布于 2021-12-23 21:20:06
收藏
分享
海报
0 条评论
44
上一篇:config和cluster.spec中keys怎么配置 下一篇:如何进行OpenStack pike的卷管理完善
目录

    0 条评论

    本站已关闭游客评论,请登录或者注册后再评论吧~

    忘记密码?

    图形验证码