来自:mjsws > 馆藏分类
配色: 字号:
SpringBoot使用Elastic-Job-lite实现动态创建定时任务和任务持久化
2019-01-04 | 阅:  转:  |  分享 
  
SpringBoot使用Elastic-Job-lite实现动态创建定时任务和任务持久化Elastic-Job是当当开源的一个分布式调度解决
方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Li
te定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采用自研Mesos
Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。这里以Elastic-Job-lite为例,跟Spr
ingBoot进行整合,当当的官方文档中并没有对SpringBoot集成作说明,所有的配置都是基于文档中的xml的配置修改出来的。
起步准备好一个SpringBoot的项目,pom.xml中引入Elastic-job,mysql,jpa等依赖ies>org.springframework.bootactId>spring-boot-starterroupId>org.springframework.bootspring-boot-
starter-test
test
ncy>com.dangdangelastic-job-lite-s
pring
2.1.5y>org.projectlomboklomboktId>org.springframework.bootroupId>spring-boot-starter-data-jpaendency>mysqlmysql-con
nector-java
com.zax
xer
HikariCP
ndencies>http://www.f-1.cc配置使用yaml进行相关属性的配置,主要配置的是数据库连接池,jpaelast
icjob:?serverlists:?172.31.31.48:2181?namespace:?boot-job?spring:
?datasource:?url:?jdbc:mysql://localhost:3306/test?characterEncod
ing=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=f
alse?driver-class-name:?com.mysql.jdbc.Driver???username:?root?pa
ssword:?root?type:?com.zaxxer.hikari.HikariDataSource?#??自动创建更新验证
数据库结构?jpa:?hibernate:?ddl-auto:?update?show-sql:?true?database:?m
ysqlelastic-job相关的配置使用java配置实现,代替官方文档的xml配置@Configuration@Data@Co
nfigurationProperties(prefix?=?"elasticjob")public?class?ElasticJ
obConfig?{private?String?serverlists;private?String?namespace;@Re
sourceprivate?HikariDataSource?dataSource;@Beanpublic?ZookeeperCo
nfiguration?zkConfig()?{return?new?ZookeeperConfiguration(serverl
ists,?namespace);?}@Bean(initMethod?=?"init")public?ZookeeperRegi
stryCenter?regCenter(ZookeeperConfiguration?config)?{return?new?Z
ookeeperRegistryCenter(config);?}/??将作业运行的痕迹进行持久化到DBhttp://ww
w.letaoqpyx.com???@return?/@Beanpublic?JobEventConfiguration?j
obEventConfiguration()?{return?new?JobEventRdbConfiguration(dataS
ource);?}@Beanpublic?ElasticJobListener?elasticJobListener()?{ret
urn?new?ElasticJobListener(100,?100);?}}所有相关的配置到这里就已经OK了,接下来开始具体的
编码实现定时任务实现先实现一个自己的任务类,需要实现elastic-job提供的SimpleJob接口,实现它的execute(S
hardingContextshardingContext)方法@Slf4jpublic?class?MyElasticJob?
implements?SimpleJob?{@Overridepublic?void?execute(ShardingContex
t?shardingContext)?{//打印出任务相关信息,JobParameter用于传递任务的IDlog.info("任务
名:{},?片数:{},?id={}",?shardingContext.getJobName(),?shardingContex
t.getShardingTotalCount(),?shardingContext.getJobParameter());?}}
接下来实现一个分布式的任务监听器,如果任务有分片,分布式监听器会在总的任务开始前执行一次,结束时执行一次。监听器在之前的Elast
icJobConfig已经注册到了Spring容器之中。public?class?ElasticJobListener?exten
ds?AbstractDistributeOnceElasticJobListener?{@Resourceprivate?Tas
kRepository?taskRepository;public?ElasticJobListener(long?started
TimeoutMilliseconds,?long?completedTimeoutMilliseconds)?{super(st
artedTimeoutMilliseconds,?completedTimeoutMilliseconds);?}@Overri
depublic?void?doBeforeJobExecutedAtLastStarted(ShardingContexts?s
hardingContexts)?{?}@Overridepublic?void?doAfterJobExecutedAtLast
Completed(ShardingContexts?shardingContexts)?{//任务执行完成后更新状态为已执行Jo
bTask?jobTask?=?taskRepository.findOne(Long.valueOf(shardingConte
xts.getJobParameter()));?jobTask.setStatus(1);?taskRepository.sav
e(jobTask);?}}实现一个ElasticJobHandler,用于向Elastic-job中添加指定的作业配置,作业配置
分为3级,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfigur
ation。LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfigura
tion使用JobCoreConfiguration,层层嵌套。@Componentpublic?class?ElasticJob
Handler?{@Resourceprivate?ZookeeperRegistryCenter?registryCenter;
@Resourceprivate?JobEventConfiguration?jobEventConfiguration;@Res
ourceprivate?ElasticJobListener?elasticJobListener;/??@param?j
obName??@param?jobClass??@param?shardingTotalCount??@param?cro
n??@param?id?????????????????数据ID??@return?/private?static?Lit
eJobConfiguration.Builder?simpleJobConfigBuilder(String?jobName,?
Class?jobClass,?????????????????????????????
??????????????????????????????????????????int?shardingTotalCount,
?String?cron,?String?id)?{return?LiteJobConfiguration.newBuilder(
new?SimpleJobConfiguration(?JobCoreConfiguration.newBuilder(jobNa
me,?cron,?shardingTotalCount).jobParameter(id).build(),?jobClass.
getCanonicalName()));?}/??添加一个定时任务?http://www.44226.net??@pa
ram?jobName????????????任务名??@param?cron???????????????表达式??@par
am?shardingTotalCount?分片数?/public?void?addJob(String?jobName,?St
ring?cron,?Integer?shardingTotalCount,?String?id)?{?LiteJobConfig
uration?jobConfig?=?simpleJobConfigBuilder(jobName,?MyElasticJob.
class,?shardingTotalCount,?cron,?id)?.overwrite(true).build();new
?SpringJobScheduler(new?MyElasticJob(),?registryCenter,?jobConfig
,?jobEventConfiguration,?elasticJobListener).init();?}}到这里,elasti
c-job的注册中心,数据源相关配置,以及动态添加的逻辑已经做完了,接下来在service中调用上面写好的方法,验证功能是否正常。
编写一个ElasticJobService类,扫描数据库中状态为0的任务,并且把这些任务添加到Elastic-job中,这里的相关
数据库操作使用了spring-data-jpa,dao层相关代码就不贴了,可以在源码中查看。@Servicepublic?clas
s?ElasticJobService?{@Resourceprivate?ElasticJobHandler?jobHandle
r;@Resourceprivate?TaskRepository?taskRepository;/??扫描db,并添加任务
?/public?void?scanAddJob()?{?Specification?query?=?(Specificatio
n)?(root,?criteriaQuery,?criteriaBuilder)?->?criteriaBui
lder?.and(criteriaBuilder.equal(root.get("status"),?0));?ListTask>?jobTasks?=?taskRepository.findAll(query);?jobTasks.forEach(
jobTask?->?{?Long?current?=?System.currentTimeMillis();?String?jo
bName?=?"job"?+?jobTask.getSendTime();?String?cron;//说明消费未发送,但是已经
过了消息的发送时间,调整时间继续执行任务if?(jobTask.getSendTime()?钟之后执行,把Date转换为cron表达式cron?=?CronUtils.getCron(new?Date(current?+?
60000));?}?else?{?cron?=?CronUtils.getCron(new?Date(jobTask.getSe
ndTime()));?}?jobHandler.addJob(jobName,?cron,?1,?String.valueOf(
jobTask.getId()));?});?}}在Junit中添加几条测试数据@RunWith(SpringJUnit4Clas
sRunner.class)@SpringBootTestpublic?class?JobTaskTest?{@Resourcep
rivate?TaskRepository?taskRepository;@Testpublic?void?add()?{//生成
几个任务,第一任务在三分钟之后Long?unixTime?=?System.currentTimeMillis()?+?60000
;?JobTask?task?=?new?JobTask("test-msg-1",?0,?unixTime);?taskRepo
sitory.save(task);?unixTime?+=?60000;?task?=?new?JobTask("test-ms
g-2",?0,?unixTime);?taskRepository.save(task);?unixTime?+=?60000;
?task?=?new?JobTask("test-msg-3",?0,?unixTime);?taskRepository.sa
ve(task);?unixTime?+=?60000;?task?=?new?JobTask("test-msg-4",?0,?
unixTime);?taskRepository.save(task);?}}此时,数据库中多了四条状态为0的数据最后,就可以开
始验证整个流程了,代码如下@SpringBootApplicationpublic?class?ElasticJobApplica
tion?implements?CommandLineRunner?{@Resourceprivate?ElasticJobSer
vice?elasticJobService;public?static?void?main(String[]?args)?{?S
pringApplication.run(ElasticJobApplication.class,?args);?}@Overridepublic?void?run(String...?strings)?throws?Exception?{?elasticJobService.scanAddJob();?}}可以看到,在启动过程中,多个任务被加入到了Elastic-job中,并且一小段时间之后,任务一次执行,执行成功之后,因为我们配置了监听器,会打印数据库的更新SQL,当任务执行完成,再查看数据库,发现状态也更改成功。数据库中同时也会多出两张表JOB_EXECUTION_LOG,JOB_STATUS_TRACE_LOG,这是我们之前配置的JobEventConfiguration,通过数据源持久化了作业配置的相关数据,这两张表的数据可以供Elastic-job提供的运维平台使用,具体请查看官方文档。总结至此,整个流程就已经走完了,整个demo中主要用到了Elastic-job和spring-data-jpa相关的技术,作为demo,肯定会有一些缺陷,没考虑到的地方,可以根据自己的业务场景进行改进。
献花(0)
+1
(本文系mjsws首藏)