来自:mjsws > 馆藏分类
配色: 字号:
java提交spark任务到yarn平台的配置讲解
2018-06-25 | 阅:  转:  |  分享 
  
java提交spark任务到yarn平台的配置讲解一、背景采用spark的方式处理,所以需要将spark的功能集成到代码,采用yarn客户端
的方式管理spark任务。不需要将cdh的一些配置文件放到resource路径下,只需要配置一些配置即可,非常方便二、任务管理架构
三、接口1、任务提交1./2.提交任务到yarn集群3.4.@paramconditions5.yarn集群
,spark,hdfs具体信息,参数等6.@returnappid7./8.?publicStringsubmitSpar
k(YarnSubmitConditionsconditions){9.logger.info("初始化sparkonyarn参
数");10.11.//初始化yarn客户端12.logger.info("初始化sparkonyarn客户端");13.L
istargs=Lists.newArrayList("--jar",conditions.getApplicationJar()
,"--class",14.conditions.getMainClass());15.?if(conditions.getOt
herArgs()!=null&&conditions.getOtherArgs().size()>0){16.?for(Stri
ngs:conditions.getOtherArgs()){17.args.add("--arg");18.args.add
(org.apache.commons.lang.StringUtils.join(newString[]{s},","));19
.}20.}21.22.//identifythatyouwillbeusingSparkasYARNmode23.Sys
tem.setProperty("SPARK_YARN_MODE","true");24.SparkConfsparkConf=
newSparkConf();25.?if(org.apache.commons.lang.StringUtils.isNotEm
pty(conditions.getJobName())){26.sparkConf.setAppName(conditions
.getJobName());27.}28.29.sparkConf.set("spark.yarn.jars",condit
ions.getSparkYarnJars());30.?if(conditions.getAdditionalJars()!=n
ull&&conditions.getAdditionalJars().length>0){31.sparkConf.set("
spark.jars",org.apache.commons.lang.StringUtils.join(conditions.g
etAdditionalJars(),","));32.}33.34.?if(conditions.getFiles()!=nu
ll&&conditions.getFiles().length>0){35.sparkConf.set("spark.file
s",org.apache.commons.lang.StringUtils.join(conditions.getFiles()
,","));36.}37.?for(Map.Entrye:conditions.getSparkProperties().en
trySet()){38.sparkConf.set(e.getKey().toString(),e.getValue().to
String());39.}40.41.//添加这个参数,不然spark会一直请求0.0.0.0:8030,一直重试42.s
parkConf.set("yarn.resourcemanager.hostname",conditions.getYarnRe
sourcemanagerAddress().split(":")[0]);43.//设置为true,不删除缓存的jar包,因为
现在提交yarn任务是使用的代码配置,没有配置文件,删除缓存的jar包有问题,44.sparkConf.set("spark.y
arn.preserve.staging.files","true");45.46.//初始化yarn的配置47.Config
urationcf=newConfiguration();48.Stringos=System.getProperty("os.
name");49.?booleancross_platform=false;50.?if(os.contains("Window
s")){51.cross_platform=true;52.}53.cf.setBoolean("mapreduce.ap
p-submission.cross-platform",cross_platform);//配置使用跨平台提交任务54.//设
置yarn资源,不然会使用localhost:803255.cf.set("yarn.resourcemanager.addre
ss",conditions.getYarnResourcemanagerAddress());56.//设置namenode的
地址,不然jar包会分发,非常恶心57.cf.set("fs.defaultFS",conditions.getSparkFsD
efaultFS());58.59.ClientArgumentscArgs=newClientArguments(args.t
oArray(newString[args.size()]));60.Clientclient=newClient(cArgs,
cf,sparkConf);61.logger.info("提交任务,任务名称:"+conditions.getJobName(
));62.63.?try{64.65.ApplicationIdappId=client.submitApplication(
);66.67.?returnappId.toString();68.69.}catch(Exceptione){70.log
ger.error("提交spark任务失败",e);71.?returnnull;72.}finally{73.?if(cli
ent!=null){74.client.stop();75.}76.}77.}2、任务进度获取1./2.停止s
park任务3.4.@paramyarnResourcemanagerAddress5.yarn资源管理器地址,例如:
master:8032,查看yarn集群获取具体地址6.@paramappIdStr7.需要取消的任务id8./9.?
publicvoidkillJob(StringyarnResourcemanagerAddress,StringappIdStr
){10.logger.info("取消spark任务,任务id:"+appIdStr);11.//初始化yarn的配置12.
Configurationcf=newConfiguration();13.Stringos=System.getProper
ty("os.name");14.?booleancross_platform=false;15.?if(os.contains(
"Windows")){16.cross_platform=true;www.44771.net17.}18.cf.setB
oolean("mapreduce.app-submission.cross-platform",cross_platform);
//配置使用跨平台提交任务19.//设置yarn资源,不然会使用localhost:803220.cf.set("yarn.r
esourcemanager.address",yarnResourcemanagerAddress);21.22.//创建ya
rn的客户端,此类中有杀死任务的方法23.YarnClientyarnClient=YarnClient.createYarnC
lient();24.//初始化yarn的客户端25.yarnClient.init(cf);26.//yarn客户端启动2
7.yarnClient.start();28.?try{29.//根据应用id,杀死应用30.yarnClient.kil
lApplication(getAppId(appIdStr));31.}catch(Exceptione){32.logge
r.error("取消spark任务失败",e);33.}34.//关闭yarn客户端35.yarnClient.stop(
);36.37.}3、任务取消1./2.获取spark任务状态3.4.5.@paramyarnResour
cemanagerAddress6.yarn资源管理器地址,例如:master:8032,查看yarn集群获取具体地址7.
@paramappIdStr8.需要取消的任务id9./10.?publicSparkTaskStategetStatus
(StringyarnResourcemanagerAddress,StringappIdStr){11.logger.info
("获取任务状态启动,任务id:"+appIdStr);12.//初始化yarn的配置13.Configurationcf=n
ewConfiguration();14.Stringos=System.getProperty("os.name");15.?
booleancross_platform=false;16.?if(os.contains("Windows")){17.cr
oss_platform=true;18.}19.cf.setBoolean("mapreduce.app-submissio
n.cross-platform",cross_platform);//配置使用跨平台提交任务20.//设置yarn资源,不然会
使用localhost:8032www.77884.net21.cf.set("yarn.resourcemanager.ad
dress",yarnResourcemanagerAddress);22.logger.info("获取任务状态,任务id:"
+appIdStr);23.24.SparkTaskStatetaskState=newSparkTaskState();25.
//设置任务id26.taskState.setAppId(appIdStr);27.YarnClientyarnClien
t=YarnClient.createYarnClient();28.//初始化yarn的客户端29.yarnClient.i
nit(cf);30.//yarn客户端启动31.yarnClient.start();32.ApplicationRepo
rtreport=null;33.?try{34.report=yarnClient.getApplicationReport(
getAppId(appIdStr));35.}catch(Exceptione){36.logger.error("获取sp
ark任务状态失败");37.}38.39.?if(report!=null){40.YarnApplicationState
state=report.getYarnApplicationState();41.taskState.setState(sta
te.name());42.//任务执行进度43.?floatprogress=report.getProgress();44.
taskState.setProgress(progress);45.//最终状态46.FinalApplicationSt
atusstatus=report.getFinalApplicationStatus();47.taskState.setFi
nalStatus(status.name());48.}else{49.taskState.setState(ConstPa
ram.SPARK_FAILED);50.taskState.setProgress(0.0f);51.taskState.s
etFinalStatus(ConstParam.SPARK_FAILED);52.}53.54.//关闭yarn客户端55.
yarnClient.stop();56.logger.info("获取任务状态结束,任务状态:"+JSON.toJSONSt
ring(taskState));57.?returntaskState;58.}四、yarn参数调节1、可分配给容器的物理内存数量,一个nodemanage分配的内存,如果机器内存是128g,尽量分配2/3yarn.nodemanager.resource.memory-mb:80g2、可以为容器分配的虚拟CPU内核的数量。该参数在CDH4.4以前版本中无效。一个nodemanage分配的核数。如果机器是64和,尽量分配2/3.yarn.nodemanager.resource.cpu-vcores:403、Java?进程堆栈内存的最大大小(以字节为单位)。已传递到Java-Xmx。ResourceManager的Java堆栈大小(字节)ResourceManagerDefaultGroupB千字节兆字节吉字节
献花(0)
+1
(本文系mjsws首藏)