spark中的DRA怎么开启
spark中的DRA怎么开启
这篇文章主要讲解了“spark中的DRA怎么开启”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“spark中的DRA怎么开启”吧!
spark on yarn 中的DynamicResourceAllocation
spark on yarn对于DynamicResourceAllocation分配来说,从spark 1.2版本就已经开始支持了.
对于spark熟悉的人都知道,如果我们要开启DynamicResourceAllocation,就得有ExternalShuffleService服务,
对于yarn来说ExternalShuffleService是作为辅助服务开启的,具体配置如下:
<property><name>yarn.nodemanager.aux-services</name><value>spark_shuffle</value></property><property><name>yarn.nodemanager.aux-services.spark_shuffle.class</name><value>org.apache.spark.network.yarn.YarnShuffleService</value></property><property><name>spark.shuffle.service.port</name><value>7337</value></property>
重启nodeManager,这样在每个nodeManager节点就会启动一个YarnShuffleService,之后在spark应用中设置spark.dynamicAllocation.enabled 为true,这样就能达到运行时资源动态分配的效果
我们直接从CoarseGrainedExecutorBackend中SparkEnv创建开始说,每一个executor的启动,必然会经过CoarseGrainedExecutorBackend main方法,而main中就涉及到SparkEnv的创建
valenv=SparkEnv.createExecutorEnv(driverConf,arguments.executorId,arguments.bindAddress,arguments.hostname,arguments.cores,cfg.ioEncryptionKey,isLocal=false)
而sparkEnv的创建就涉及到BlockManager的创建。沿着代码往下走,最终
valblockTransferService=newNettyBlockTransferService(conf,securityManager,bindAddress,advertiseAddress,blockManagerPort,numUsableCores,blockManagerMaster.driverEndpoint)valblockManager=newBlockManager(executorId,rpcEnv,blockManagerMaster,serializerManager,conf,memoryManager,mapOutputTracker,shuffleManager,blockTransferService,securityManager,externalShuffleClient)
在blockManager的initialize方法中,就会进行registerWithExternalShuffleServer
//RegisterExecutors'configurationwiththelocalshuffleservice,ifoneshouldexist.if(externalShuffleServiceEnabled&&!blockManagerId.isDriver){registerWithExternalShuffleServer()}
如果我们开启了ExternalShuffleService,对于yarn就是YarnShuffleService,就会把当前的ExecutorShuffleInfo注册到host为shuffleServerId.host, port为shuffleServerId.port的ExternalShuffleService中,ExecutorShuffleInfo的信息如下:
valshuffleConfig=newExecutorShuffleInfo(diskBlockManager.localDirsString,diskBlockManager.subDirsPerLocalDir,shuffleManager.getClass.getName)
这里我重点分析一下registerWithExternalShuffleServer的方法中的以下片段
//Synchronousandwillthrowanexceptionifwecannotconnect.blockStoreClient.asInstanceOf[ExternalBlockStoreClient].registerWithShuffleServer(shuffleServerId.host,shuffleServerId.port,shuffleServerId.executorId,shuffleConfig)
该代码中shuffleServerId来自于:
shuffleServerId=if(externalShuffleServiceEnabled){logInfo(s"externalshuffleserviceport=$externalShuffleServicePort")BlockManagerId(executorId,blockTransferService.hostName,externalShuffleServicePort)}else{blockManagerId}
而blockTransferService.hostName 是我们在SparkEnv中创建的时候由advertiseAddress传过来的,
最终由CoarseGrainedExecutorBackend 主类参数hostname过来的,那到底怎么传过来的呢? 参照ExecutorRunnable的prepareCommand方法,
valcommands=prefixEnv++Seq(Environment.JAVA_HOME.$$()+"/bin/java","-server")++javaOpts++Seq("org.apache.spark.executor.YarnCoarseGrainedExecutorBackend","--driver-url",masterAddress,"--executor-id",executorId,"--hostname",hostname,"--cores",executorCores.toString,"--app-id",appId,"--resourceProfileId",resourceProfileId.toString)++
而这个hostname的值最终由YarnAllocator的方法runAllocatedContainers
valexecutorHostname=container.getNodeId.getHost
传递过来的,也就是说我们最终获取到了yarn节点,也就是nodeManager的host 这样每个启动的executor,就向executor所在的nodeManager的YarnShuffleService注册了ExecutorShuffleInfo信息,这样对于开启了动态资源分配的
ExternalBlockStoreClient 来说fetchBlocksg过程就和未开启动态资源分配的NettyBlockTransferService大同小异了
spark on k8s(kubernetes) 中的DynamicResourceAllocation
参考之前的文章,我们知道在entrypoint中我们在启动executor的时候,我们传递了hostname参数
executor)shift1CMD=(${JAVA_HOME}/bin/java"${SPARK_EXECUTOR_JAVA_OPTS[@]}"-Xms$SPARK_EXECUTOR_MEMORY-Xmx$SPARK_EXECUTOR_MEMORY-cp"$SPARK_CLASSPATH:$SPARK_DIST_CLASSPATH"org.apache.spark.executor.CoarseGrainedExecutorBackend--driver-url$SPARK_DRIVER_URL--executor-id$SPARK_EXECUTOR_ID--cores$SPARK_EXECUTOR_CORES--app-id$SPARK_APPLICATION_ID--hostname$SPARK_EXECUTOR_POD_IP)
而SPARK_EXECUTOR_POD_IP是运行中的POD IP,参考BasicExecutorFeatureStep类片段:
Seq(newEnvVarBuilder().withName(ENV_EXECUTOR_POD_IP).withValueFrom(newEnvVarSourceBuilder().withNewFieldRef("v1","status.podIP").build()).build())
这样按照以上流程的分析,
executor也不能向k8s节点ExternalShuffleService服务注册,因为我们注册的节点是POD IP,而不是节点IP,
当然spark社区早就提出了未开启external shuffle service的动态资源分配,且已经合并到master分支. 具体配置,可以参照如下:
spark.dynamicAllocation.enabledtruespark.dynamicAllocation.shuffleTracking.enabledtruespark.dynamicAllocation.minExecutors1spark.dynamicAllocation.maxExecutors4spark.dynamicAllocation.executorIdleTimeout60s
感谢各位的阅读,以上就是“spark中的DRA怎么开启”的内容了,经过本文的学习后,相信大家对spark中的DRA怎么开启这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
推荐阅读
-
Hello Spark! | Spark,从入门到精通
-
Spark Streaming+Kafka的嵌入方式及应用
-
怎么让spark sql写mysql时支持update操作
-
Spark SQL怎么用
SparkSQL怎么用这篇文章主要介绍“SparkSQL怎么用...
-
Spark的相关问题有哪些
Spark的相关问题有哪些这篇文章主要介绍“Spark的相关问题有...
-
Spark SQL中怎么创建DataFrames
-
Redis怎么让Spark提速
-
Spark 3.0怎么实现event logs滚动
Spark3.0怎么实现eventlogs滚动本篇内容介绍了“...
-
Spark是怎样工作的
-
Spark Graphx怎么求社交网络中的最大年纪追求者
SparkGraphx怎么求社交网络中的最大年纪追求者本篇内容介...