SpringBoot使用Elastic-Job-lite实现动态创建定时任务和任务持久化Elastic-Job是当当开源的一个分布式调度解决 方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。Elastic-Job-Li te定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务;Elastic-Job-Cloud采用自研Mesos Framework的解决方案,额外提供资源治理、应用分发以及进程隔离等功能。这里以Elastic-Job-lite为例,跟Spr ingBoot进行整合,当当的官方文档中并没有对SpringBoot集成作说明,所有的配置都是基于文档中的xml的配置修改出来的。 起步准备好一个SpringBoot的项目,pom.xml中引入Elastic-job,mysql,jpa等依赖ies>org.springframework.bootactId>spring-boot-starterroupId>org.springframework.bootspring-boot- starter-testtestncy>com.dangdangelastic-job-lite-s pring2.1.5y>org.projectlomboklomboktId>org.springframework.bootroupId>spring-boot-starter-data-jpaendency>mysqlmysql-con nector-javacom.zax xerHikariCPndencies>http://www.f-1.cc配置使用yaml进行相关属性的配置,主要配置的是数据库连接池,jpaelast icjob:?serverlists:?172.31.31.48:2181?namespace:?boot-job?spring: ?datasource:?url:?jdbc:mysql://localhost:3306/test?characterEncod ing=utf-8&verifyServerCertificate=false&useSSL=false&requireSSL=f alse?driver-class-name:?com.mysql.jdbc.Driver???username:?root?pa ssword:?root?type:?com.zaxxer.hikari.HikariDataSource?#??自动创建更新验证 数据库结构?jpa:?hibernate:?ddl-auto:?update?show-sql:?true?database:?m ysqlelastic-job相关的配置使用java配置实现,代替官方文档的xml配置@Configuration@Data@Co nfigurationProperties(prefix?=?"elasticjob")public?class?ElasticJ obConfig?{private?String?serverlists;private?String?namespace;@Re sourceprivate?HikariDataSource?dataSource;@Beanpublic?ZookeeperCo nfiguration?zkConfig()?{return?new?ZookeeperConfiguration(serverl ists,?namespace);?}@Bean(initMethod?=?"init")public?ZookeeperRegi stryCenter?regCenter(ZookeeperConfiguration?config)?{return?new?Z ookeeperRegistryCenter(config);?}/??将作业运行的痕迹进行持久化到DBhttp://ww w.letaoqpyx.com???@return?/@Beanpublic?JobEventConfiguration?j obEventConfiguration()?{return?new?JobEventRdbConfiguration(dataS ource);?}@Beanpublic?ElasticJobListener?elasticJobListener()?{ret urn?new?ElasticJobListener(100,?100);?}}所有相关的配置到这里就已经OK了,接下来开始具体的 编码实现定时任务实现先实现一个自己的任务类,需要实现elastic-job提供的SimpleJob接口,实现它的execute(S hardingContextshardingContext)方法@Slf4jpublic?class?MyElasticJob? implements?SimpleJob?{@Overridepublic?void?execute(ShardingContex t?shardingContext)?{//打印出任务相关信息,JobParameter用于传递任务的IDlog.info("任务 名:{},?片数:{},?id={}",?shardingContext.getJobName(),?shardingContex t.getShardingTotalCount(),?shardingContext.getJobParameter());?}} 接下来实现一个分布式的任务监听器,如果任务有分片,分布式监听器会在总的任务开始前执行一次,结束时执行一次。监听器在之前的Elast icJobConfig已经注册到了Spring容器之中。public?class?ElasticJobListener?exten ds?AbstractDistributeOnceElasticJobListener?{@Resourceprivate?Tas kRepository?taskRepository;public?ElasticJobListener(long?started TimeoutMilliseconds,?long?completedTimeoutMilliseconds)?{super(st artedTimeoutMilliseconds,?completedTimeoutMilliseconds);?}@Overri depublic?void?doBeforeJobExecutedAtLastStarted(ShardingContexts?s hardingContexts)?{?}@Overridepublic?void?doAfterJobExecutedAtLast Completed(ShardingContexts?shardingContexts)?{//任务执行完成后更新状态为已执行Jo bTask?jobTask?=?taskRepository.findOne(Long.valueOf(shardingConte xts.getJobParameter()));?jobTask.setStatus(1);?taskRepository.sav e(jobTask);?}}实现一个ElasticJobHandler,用于向Elastic-job中添加指定的作业配置,作业配置 分为3级,分别是JobCoreConfiguration,JobTypeConfiguration和LiteJobConfigur ation。LiteJobConfiguration使用JobTypeConfiguration,JobTypeConfigura tion使用JobCoreConfiguration,层层嵌套。@Componentpublic?class?ElasticJob Handler?{@Resourceprivate?ZookeeperRegistryCenter?registryCenter; @Resourceprivate?JobEventConfiguration?jobEventConfiguration;@Res ourceprivate?ElasticJobListener?elasticJobListener;/??@param?j obName??@param?jobClass??@param?shardingTotalCount??@param?cro n??@param?id?????????????????数据ID??@return?/private?static?Lit eJobConfiguration.Builder?simpleJobConfigBuilder(String?jobName,? Class?extends?SimpleJob>?jobClass,????????????????????????????? ??????????????????????????????????????????int?shardingTotalCount, ?String?cron,?String?id)?{return?LiteJobConfiguration.newBuilder( new?SimpleJobConfiguration(?JobCoreConfiguration.newBuilder(jobNa me,?cron,?shardingTotalCount).jobParameter(id).build(),?jobClass. getCanonicalName()));?}/??添加一个定时任务?http://www.44226.net??@pa ram?jobName????????????任务名??@param?cron???????????????表达式??@par am?shardingTotalCount?分片数?/public?void?addJob(String?jobName,?St ring?cron,?Integer?shardingTotalCount,?String?id)?{?LiteJobConfig uration?jobConfig?=?simpleJobConfigBuilder(jobName,?MyElasticJob. class,?shardingTotalCount,?cron,?id)?.overwrite(true).build();new ?SpringJobScheduler(new?MyElasticJob(),?registryCenter,?jobConfig ,?jobEventConfiguration,?elasticJobListener).init();?}}到这里,elasti c-job的注册中心,数据源相关配置,以及动态添加的逻辑已经做完了,接下来在service中调用上面写好的方法,验证功能是否正常。 编写一个ElasticJobService类,扫描数据库中状态为0的任务,并且把这些任务添加到Elastic-job中,这里的相关 数据库操作使用了spring-data-jpa,dao层相关代码就不贴了,可以在源码中查看。@Servicepublic?clas s?ElasticJobService?{@Resourceprivate?ElasticJobHandler?jobHandle r;@Resourceprivate?TaskRepository?taskRepository;/??扫描db,并添加任务 ?/public?void?scanAddJob()?{?Specification?query?=?(Specificatio n)?(root,?criteriaQuery,?criteriaBuilder)?->?criteriaBui lder?.and(criteriaBuilder.equal(root.get("status"),?0));?ListTask>?jobTasks?=?taskRepository.findAll(query);?jobTasks.forEach( jobTask?->?{?Long?current?=?System.currentTimeMillis();?String?jo bName?=?"job"?+?jobTask.getSendTime();?String?cron;//说明消费未发送,但是已经 过了消息的发送时间,调整时间继续执行任务if?(jobTask.getSendTime()?钟之后执行,把Date转换为cron表达式cron?=?CronUtils.getCron(new?Date(current?+? 60000));?}?else?{?cron?=?CronUtils.getCron(new?Date(jobTask.getSe ndTime()));?}?jobHandler.addJob(jobName,?cron,?1,?String.valueOf( jobTask.getId()));?});?}}在Junit中添加几条测试数据@RunWith(SpringJUnit4Clas sRunner.class)@SpringBootTestpublic?class?JobTaskTest?{@Resourcep rivate?TaskRepository?taskRepository;@Testpublic?void?add()?{//生成 几个任务,第一任务在三分钟之后Long?unixTime?=?System.currentTimeMillis()?+?60000 ;?JobTask?task?=?new?JobTask("test-msg-1",?0,?unixTime);?taskRepo sitory.save(task);?unixTime?+=?60000;?task?=?new?JobTask("test-ms g-2",?0,?unixTime);?taskRepository.save(task);?unixTime?+=?60000; ?task?=?new?JobTask("test-msg-3",?0,?unixTime);?taskRepository.sa ve(task);?unixTime?+=?60000;?task?=?new?JobTask("test-msg-4",?0,? unixTime);?taskRepository.save(task);?}}此时,数据库中多了四条状态为0的数据最后,就可以开 始验证整个流程了,代码如下@SpringBootApplicationpublic?class?ElasticJobApplica tion?implements?CommandLineRunner?{@Resourceprivate?ElasticJobSer vice?elasticJobService;public?static?void?main(String[]?args)?{?S pringApplication.run(ElasticJobApplication.class,?args);?}@Overridepublic?void?run(String...?strings)?throws?Exception?{?elasticJobService.scanAddJob();?}}可以看到,在启动过程中,多个任务被加入到了Elastic-job中,并且一小段时间之后,任务一次执行,执行成功之后,因为我们配置了监听器,会打印数据库的更新SQL,当任务执行完成,再查看数据库,发现状态也更改成功。数据库中同时也会多出两张表JOB_EXECUTION_LOG,JOB_STATUS_TRACE_LOG,这是我们之前配置的JobEventConfiguration,通过数据源持久化了作业配置的相关数据,这两张表的数据可以供Elastic-job提供的运维平台使用,具体请查看官方文档。总结至此,整个流程就已经走完了,整个demo中主要用到了Elastic-job和spring-data-jpa相关的技术,作为demo,肯定会有一些缺陷,没考虑到的地方,可以根据自己的业务场景进行改进。 |
|