支付中心对账模块调度改造elastic-job分布式调度

elastic-job调度框架

elastic-job简介

Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。

Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务,外部依赖仅Zookeeper。

详细介绍可以去官网查询:http://elasticjob.io

elastic-job架构图

elastic-job

在这次改造中用的是elastic-job-lite。

elastic-job作业类型介绍

Elastic-Job提供SimpleJob、DataflowJob和ScriptJob 3种作业类型。

  • SimpleJob

    意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。

  • DataflowJob

    Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。

  • ScriptJob

    Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。

elastic-job中操作手册中API介绍

elastic-job操作API官方地址:http://elasticjob.io/docs/elastic-job-lite/02-guide/operation-manual/
里面有详细介绍怎么使用,这里说一下要注意的点,减少开发过程遇到的问题。

这一块的代码在elastic-job-lite-lifecycle这个jar包中,从这个名称可以看到这是对任务的一个生命周期的操作。

那么这一块从架构图上看它是位于Console这一块的,但是实际上的任务生命周期的控制是在Elastic-Job-Lite这个模块,为什么它能够反应到Lite模块中去,这里通过依靠zookeeper传递消息和quartz本身的触发功能来实现远程操作作业的功能。

每个节点实例启动的时候,elastic-job默认会将ListenerManager中定义的任务监听器启动,使用curator来监控instances节点的数据变化,当出现变化则执行AbstractJobListener中相应实现类的dataChanged方法,而这个方法中就是通过JobRegistry这个类调用quartz的相关方法对任务进行相关操作,所以最终还是操作的是quartz api。

那么elatic-job提供了哪些api操作?
elastic-job提供了三类api操作,配置类API、操作类API和统计类API。

  • 其中配置类API以JobSettingsAPI为代表,对相关的作业的配置进行操作,如修改、删除、获取作业,实际操作的zk上面的节点信息。

  • 操作类API以JobOperateAPI和ShardingOperateAPI为代表,这主要是对作业的操作和分片作业的操作,比如作业的立即触发、暂停、修改、详情、删除、启动等操作。

  • 统计类API以JobStatisticsAPI为代表,展示了作业的一个运行状态信息,分片数量信息等。

具体的怎么使用的可以参考官方文档,这里不做赘述。

工具类

为什么这里会提到这个工具类,主要是原生的console中提供的disable方法并不能对作业的暂停起到效果,所以使用了下面的工具类对作业进行相关的控制。

JobRegistry.getInstance(): 通过这个可以获取作业的注册表,从而实现对作业的暂停、停止、恢复、立即触发等操作。

LiteJobConfigurationGsonFactory:Lite作业配置的Gson工厂,可以实现json转Lite作业配置和作业配置转json字符串。

支付中心4.0需求

由于之前的支付中心对账任务需求都是通过,原生的quartz进行开发的,不具备高可用的能力,而且将入库耦合在一起,所以提出将对账和入库单独分离出来,改造为具备分布式、高可用的能力。最终的技术选型选择了elastic-job这一款开源的分布式调度框架结合spring-boot框架进行开发。

spring-boot的入门教程可参考《SpringBoot入门教程》-http://eip.teamshub.com/t/3548515

改造过程

以上述章节对elastic-job了解做为此次开发的技术背景,结合支付中心的业务需求对其进行改造。
改造的过程分为两大部分,第一个是任务的配置,第二个是进行任务调度。

这里用的elastic-job是目前最新的2.1.5版本

引入maven依赖

这里的spring-boot的就不进行相关说明了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-lifecycle</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic-job.version}</version>
</dependency>

作业开发

elastic-job提供了三种作业类型同时对应了三个作业的接口,这里使用的是Simple类型的作业,所以需要实现这个接口然后重写execute方法。
代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class BalanceJob implements SimpleJob, ApplicationContextAware {

private static Logger logger = LoggerFactory.getLogger(BalanceJob.class);

private ApplicationContext applicationContext;

@Override
public void execute(ShardingContext context) {
int shardingItem = context.getShardingItem();
switch (shardingItem) {
case 0:
doTask(context.getShardingParameter(), context.getJobParameter());
break;
default:
logger.warn("当前任务不需要进行分片处理...");
}
}

private void doTask(String method, String beanName) {
PaymentRetInfo ret = null;
Object handler = applicationContext.getBean(beanName);
//通过反射调用处理作业的相关方法,即ProviderBalanceHandler的balance方法
ret = (PaymentRetInfo) invoke(handler, method, new Object[0]);

if (ret != null) {
logger.info("任务执行结果:retCode->{}, retMsg->{}", ret.getRetCode(),
ret.getMessage());
}
}

@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
this.applicationContext = applicationContext;
}
}

上面通过实现ApplicationContextAware获取容器的上下文环境,从而可以获取相关作业的处理类的实例对象,如AlipayHandler等支付商对账处理实现类。然后通过反射去调用实例的balance方法完成对账操作。

作业配置

改造过程中整体的一个相关任务类的配置结构图如下,这里对elastic-job的相关配置进行封装,只需要继承SimpleJobConfigWrapper即可实现Simple类型相关配置,然后重写init方法那么在项目启动的时候就会调用此方法,这里可以进行相关任务的实例化操作通过调用initTask方法或者initOneShardTask方法。

类配置结构图

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Configuration
public class PaymentBalanceJobConfig extends SimpleJobConfigWrapper {

private static Logger logger = LoggerFactory.getLogger(PaymentBalanceJobConfig.class);

@Resource
ZookeeperRegistryCenter regCenter;// 在RegistryCenterConfig中进行实例化

@Resource
private BalanceTaskDao balanceTaskDao; //从数据读取相关任务配置信息

@Bean
public BalanceJob balanceJob() {
return new BalanceJob();
}

@Bean
public ProviderExecuteListener listener() {
return new ProviderExecuteListener();
}

@Override
public void init() {
//获取任务
BalanceTask bt = new BalanceTask();
bt.setStatus(EntityConfig.TASK_EFFECTIVE_STATUS); //有效
bt.setUserType(EntityConfig.TASK_PAYMENT_TYPE); //支付中心
List<BalanceTask> balanceTasks = balanceTaskDao.selectByPrimaryKeySelective(bt);

//批量实例化任务
for (BalanceTask balanceTask : balanceTasks) {
String taskId = balanceTask.getTaskId();
String taskName = balanceTask.getTaskName();
String beanName = balanceTask.getBeanName();
String method = balanceTask.getMethod();
String cron = balanceTask.getAutoJobTime();

Assert.notNull(beanName, "任务[" + balanceTask.getTaskId() + "]配置错误,beanName为空");
Assert.notNull(cron, "任务[" + balanceTask.getTaskId() + "]配置错误,cron为空");

//实例化任务
logger.info("--->实例化任务:{}, 执行时间:{}", taskName, cron);

initOneShardTask(balanceJob(), regCenter, listener(), taskId, taskName, cron, method, beanName);
}
}

}

发布部署

在公司的jenkins环境 http://172.21.10.54:18080/jenkins/上已经有配置好的任务,只需执行
payment-manage-balance-web任务进行构建。

payment-manage-balance-web

jenkins的配置

在支付项目的jenkins上面添加项目payment-manage-balance-web
其他配置参考之前的项目配置,然后在Build选项中的maven执行参数填入以下配置:

1
2
> -X clean install -Pcore_test -pl payment-manage/payment-manage-balance-web -am -Dmaven.test.skip=true
>

表示我只打包payment-manage-balance-web这个项目并且一并打包依赖的项目。

打完包之后会得到一个payment-manage-balance-web.jar的文件,那么这jar就是项目运行的jar。

部署脚本编写

spring-boot.sh 这是一个通用的脚本,可以用来启动打包为jar方式的spring-boot项目。

具体使用方法:

1
sh spring-boot.sh [options] [application]

options 选项:start、status、stop、restart

application : 即要操作的应用

如:

  • 启动: sh spring-boot.sh start payment-manage-balance-web.jar
  • 查看启动状态: sh spring-boot.sh status payment-manage-balance-web.jar

脚本源代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#!/bin/bash

SpringBoot=$2

#启动参数
#JAVA_OPTS="-server -Xms400m -Xmx400m -Xmn300m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xverify:none -XX:+DisableExplicitGC -Djava.awt.headless=true"
JAVA_OPTS=""

if [ "$1" = "" ];
then
echo -e "\033[0;31m 未输入操作名 \033[0m \033[0;34m {start|stop|restart|status} \033[0m"
exit 1
fi

if [ "$SpringBoot" = "" ];
then
echo -e "\033[0;31m 未输入应用名 \033[0m"
exit 1
fi

function start()
{
count=`ps -ef |grep java|grep $SpringBoot|grep -v grep|wc -l`
if [ $count != 0 ];then
echo "$SpringBoot is running..."
else
echo "Start $SpringBoot success..."
nohup java $JAVA_OPTS -jar $SpringBoot > /dev/null 2>&1 &
fi
}

function stop()
{
echo "Stop $SpringBoot"
boot_id=`ps -ef |grep java|grep $SpringBoot|grep -v grep|awk '{print $2}'`
count=`ps -ef |grep java|grep $SpringBoot|grep -v grep|wc -l`

if [ $count != 0 ];then
kill $boot_id
count=`ps -ef |grep java|grep $SpringBoot|grep -v grep|wc -l`

boot_id=`ps -ef |grep java|grep $SpringBoot|grep -v grep|awk '{print $2}'`
kill -9 $boot_id
fi
}

function restart()
{
stop
sleep 2
start
}

function status()
{
count=`ps -ef |grep java|grep $SpringBoot|grep -v grep|wc -l`
if [ $count != 0 ];then
echo "$SpringBoot is running..."
else
echo "$SpringBoot is not running..."
fi
}

case $1 in
start)
start;;
stop)
stop;;
restart)
restart;;
status)
status;;
*)

echo -e "\033[0;31m Usage: \033[0m \033[0;34m sh $0 {start|stop|restart|status} {SpringBootJarName} \033[0m
\033[0;31m Example: \033[0m
\033[0;33m sh $0 start esmart-test.jar \033[0m"
esac

作业运行状态查看

打包之后在测试环境172.21.10.54运行sh spring-boot.sh payment-manage-balance-web.jar

然后在浏览器运行172.21.10.54:8089/即可查看到如下结果。
任务查看

注意这个上面只能看到作业的运行状态,并不能看到作业是否执行成功,可以在管理后台中的对账管理部分可以看到任务是否执行成功。