SparkSubmit提交yarn流程分析(学习版)

SparkSubmit提交流程分析

tips:分析基于如下执行命令开始

./spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
./examples/jars/spark-example_2.12-3.0.0.jar \
10

首先执行了spark-submit这个脚本程序,找到这个脚本的代码

#!/usr/bin/env bash
if [ -z "${SPARK_HOME}" ]; then
 source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
#exec 调用spark-class脚本 然后传入SparkSubmit这个类 和 上面那一堆参数
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

然后我们去看spark-class这个脚本的代码(只关注重点版):

#!/usr/bin/env bash
if [ -z "${SPARK_HOME}" ]; then
 source "$(dirname "$0")"/find-spark-home
fi
. "${SPARK_HOME}"/bin/load-spark-env.sh
if [ -n "${JAVA_HOME}" ]; then
 RUNNER="${JAVA_HOME}/bin/java"
else
 if [ "$(command -v java)" ]; then
 RUNNER="java"
 else
 echo "JAVA_HOME is not set" >&2
 exit 1
 fi
fi
if [ -d "${SPARK_HOME}/jars" ]; then
 SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
 SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
 echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
 echo "You need to build Spark with the target \"package\" before running this program." 1>&2
 exit 1
else
 LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
 LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
if [[ -n "$SPARK_TESTING" ]]; then
 unset YARN_CONF_DIR
 unset HADOOP_CONF_DIR
fi
#3.$RUNNER="${JAVA_HOME}/bin/java" 调用类路径中的org.apache.spark.launcher.Main类 参数为spark-submit指定的所有参数,在这里调用launcher生成下面jvm command
build_command() {
 "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
 printf "%d\0" $?
}
set +o posix
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
 if [ "$CMD_START_FLAG" == "true" ]; then
 	#2.CMD在这个循环里一直做累加,这个循环通过build_command把参数准备好
 CMD+=("$ARG")
 else
 if [ "$ARG" == $'\0' ]; then
 DELIM=''
 CMD_START_FLAG="true"
 elif [ "$ARG" != "" ]; then
 echo "$ARG"
 fi
 fi
done < <(build_command "$@")
#1。我们执行了一个cmd,这个cmd从哪儿来的
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

最后执行的cmd:

/usr/lib/java/jdk1.8.0_144/bin/java -cp \
/home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/conf/:/home/etluser/kong/spark/spark-3.0.0-bin/spark-3.0.0-bin-hadoop3.2/jars/* \
-Xmx1g \
org.apache.spark.deploy.SparkSubmit \
--master yarn \
--deploy-mode client \
--class org.apache.spark.examples.SparkPi \
./examples/jars/spark-example_2.12-3.0.0.jar

所以,spark提交脚本很关键的点在于org.apache.spark.deploy.SparkSubmit这个类是怎么运作的,其他的都是参数,我们就先看看这个类的代码:

//一个可以运行的类肯定有main方法,所以我们从main方法开始
 override def main(args: Array[String]): Unit = {
 //new 了一个sparksubmit的匿名内部类
 val submit = new SparkSubmit() {
 self =>
 override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
 new SparkSubmitArguments(args) {
 override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
 override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
 override protected def logError(msg: => String): Unit = self.logError(msg)
 }
 }
 override protected def logInfo(msg: => String): Unit = printMessage(msg)
 override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
 override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")
	 //所以是执行了这个方法,这个方法又调用的父类的doSubmit(args)
 override def doSubmit(args: Array[String]): Unit = {
 try {
 super.doSubmit(args)
 } catch {
 case e: SparkUserAppException =>
 exitFn(e.exitCode)
 }
 }
 }
	//然后用匿名内部类执行了一个dosubmit方法,此方法在匿名内部类里已被重写
 submit.doSubmit(args)
 }

1.从super.dosubmit开始的提交流程

def doSubmit(args: Array[String]): Unit = {
 // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
 // be reset before the application starts.
 // 这个是日志,暂且不看
 val uninitLog = initializeLogIfNecessary(true, silent = true)
	
 // *parseArguments这个方法返回了appArgs,作用在于解析参数
 val appArgs = parseArguments(args)
 if (appArgs.verbose) {
 logInfo(appArgs.toString)
 }
 //这里模式匹配 appArgs.action属性一定在下面这四个之中,所以我们从parseArguments方法开始
 appArgs.action match {
 case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
 case SparkSubmitAction.KILL => kill(appArgs)
 case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
 case SparkSubmitAction.PRINT_VERSION => printVersion()
 }
 }

1.1 parseArguments(args)

protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
 //构造方法 执行了关键的1.1.1 和 1.1.2 两个东西
 new SparkSubmitArguments(args)
}

1.1.1 SparkSubmitArguments(args)

try {
 //代码块儿
 parse(args.asJava)
} catch {
 case e: IllegalArgumentException =>
 SparkSubmit.printErrorAndExit(e.getMessage())
}

1.1.2 loadEnvironmentArguments()

1.1.1.1 parse(args.asJava)
//很明显了嘛,在爪子嘛,在格式化输入的参数撒
protected final void parse(List<String> args) {
 //这个就是分离参数的正则表达式
 Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");
 int idx = 0;
 for (idx = 0; idx < args.size(); idx++) {
 String arg = args.get(idx);
 String value = null;
 Matcher m = eqSeparatedOpt.matcher(arg);
 if (m.matches()) {
 arg = m.group(1);
 value = m.group(2);
 }
 // Look for options with a value.
 String name = findCliOption(arg, opts);
 if (name != null) {
 if (value == null) {
 if (idx == args.size() - 1) {
 throw new IllegalArgumentException(
 String.format("Missing argument for option '%s'.", arg));
 }
 idx++;
 value = args.get(idx);
 }
 if (!handle(name, value)) {
 break;
 }
 continue;
 }
 // Look for a switch.
 name = findCliOption(arg, switches);
 if (name != null) {
 // * 这里就是参数解析的关键函数
 if (!handle(name, null)) {
 break;
 }
 continue;
 }
 if (!handleUnknown(arg)) {
 break;
 }
 }
 if (idx < args.size()) {
 idx++;
 }
 handleExtraArgs(args.subList(idx, args.size()));
}
1.1.1.1.1 handle(name, null)
//看到这个模式匹配是不是一下就清晰了,找到这个参数,然后给属性赋值
override protected def handle(opt: String, value: String): Boolean = {
 opt match {
 // protected final String NAME = "--name";
 case NAME =>
 name = value
 // protected final String MASTER = "--master";
 case MASTER =>
 master = value
 // protected final String CLASS = "--class";
 case CLASS =>
 mainClass = value
 case NUM_EXECUTORS =>
 numExecutors = value
 case TOTAL_EXECUTOR_CORES =>
 totalExecutorCores = value
 case EXECUTOR_CORES =>
 executorCores = value
 case EXECUTOR_MEMORY =>
 executorMemory = value
 case DRIVER_MEMORY =>
 driverMemory = value
 case DRIVER_CORES =>
 driverCores = value
 case _ =>
 throw new IllegalArgumentException(s"Unexpected argument '$opt'.")
 }
 true
}

1.1.2 loadEnvironmentArguments()

// Action should be SUBMIT unless otherwise specified
//第一次执行action为空 那么action赋值一定为submit
action = Option(action).getOrElse(SUBMIT)

1.2 submit(appArgs, uninitLog)

runMain(args, uninitLog)

1.2.1 runMain(args, uninitLog) 删除不重要的log版

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
 // (childArgs, childClasspath, sparkConf, childMainClass)
 // childMainClass =》 "org.apache.spark.deploy.yarn.YarnClusterApplication"
 -- prepareSubmitEnvironment(args)
 
 // classForName(childMainClass)
 -- var mainClass: Class[_] = Utils.classForName(childMainClass)
 
 // classOf[SparkApplication].isAssignableFrom(mainClass)
 val app: SparkApplication =
 -- a)mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]
 -- b)new JavaMainApplication(mainClass)
 
 // "org.apache.spark.deploy.yarn.YarnClusterApplication"
 app.start(childArgs.toArray, sparkConf)
}
1.2.1.1 prepareSubmitEnvironment(args) 删除不重要版
if (isYarnCluster) {
 childMainClass = YARN_CLUSTER_SUBMIT_CLASS
}
1.2.1.2 app.start(childArgs.toArray, sparkConf) 删除不重要版
override def start(args: Array[String], conf: SparkConf): Unit = {
 // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
 // so remove them from sparkConf here for yarn mode.
 conf.remove(JARS)
 conf.remove(FILES)
 // new ClientArguments(args) 解析传过来的参数 其中 --class => userClass = value =>自己执行的那个类
 // private val yarnClient = YarnClient.createYarnClient
 // YarnClient client = new YarnClientImpl();
 // protected ApplicationClientProtocol rmClient; resourceManager的客户端说明这个client 是用来和resourceManager做交互的
 // 对像明白了,接下来看看run里面都是些啥
 new Client(new ClientArguments(args), conf, null).run()
}
1.2.1.2.1 rmClient.run() 删除不重要版
def run(): Unit = {
 this.appId = submitApplication()
}
def submitApplication(): ApplicationId = {
 try {
 //启动了连接
 launcherBackend.connect()
 yarnClient.init(hadoopConf)
 yarnClient.start()
 
 // 从我们的 RM 获取新的应用程序
 val newApp = yarnClient.createApplication()
 val newAppResponse = newApp.getNewApplicationResponse()
 
 // 设置适当的上下文来启动我们的 AM 进程
 // 创建容器
 // commands = JAVA_HOME/bin/java org.apache.spark.deploy.yarn.ApplicationMaster 
 val containerContext = createContainerLaunchContext(newAppResponse)
 val appContext = createApplicationSubmissionContext(newApp, containerContext)
 // 最后,提交并监控申请
 yarnClient.submitApplication(appContext)
 launcherBackend.setAppId(appId.toString)
 reportLauncherState(SparkAppHandle.State.SUBMITTED)
 appId
 } catch {
 ...
 }
 }
作者:chief_y原文地址:https://www.cnblogs.com/Diona/p/16473965.html

%s 个评论

要回复文章请先登录注册