支付中心对账模块调度改造elastic-job分布式调度
elastic-job调度框架
elastic-job简介
Elastic-Job是一个分布式调度解决方案,由两个相互独立的子项目Elastic-Job-Lite和Elastic-Job-Cloud组成。
Elastic-Job-Lite定位为轻量级无中心化解决方案,使用jar包的形式提供分布式任务的协调服务,外部依赖仅Zookeeper。
详细介绍可以去官网查询:http://elasticjob.io
elastic-job架构图
在这次改造中用的是elastic-job-lite。
elastic-job作业类型介绍
Elastic-Job提供SimpleJob、DataflowJob和ScriptJob 3种作业类型。
SimpleJob
意为简单实现,未经任何封装的类型。需实现SimpleJob接口。该接口仅提供单一方法用于覆盖,此方法将定时执行。与Quartz原生接口相似,但提供了弹性扩缩容和分片等功能。DataflowJob
Dataflow类型用于处理数据流,需实现DataflowJob接口。该接口提供2个方法可供覆盖,分别用于抓取(fetchData)和处理(processData)数据。ScriptJob
Script类型作业意为脚本类型作业,支持shell,python,perl等所有类型脚本。只需通过控制台或代码配置scriptCommandLine即可,无需编码。执行脚本路径可包含参数,参数传递完毕后,作业框架会自动追加最后一个参数为作业运行时信息。
elastic-job中操作手册中API介绍
elastic-job操作API官方地址:http://elasticjob.io/docs/elastic-job-lite/02-guide/operation-manual/
里面有详细介绍怎么使用,这里说一下要注意的点,减少开发过程遇到的问题。
这一块的代码在elastic-job-lite-lifecycle
这个jar包中,从这个名称可以看到这是对任务的一个生命周期的操作。
那么这一块从架构图上看它是位于Console
这一块的,但是实际上的任务生命周期的控制是在Elastic-Job-Lite
这个模块,为什么它能够反应到Lite模块中去,这里通过依靠zookeeper传递消息和quartz本身的触发功能来实现远程操作作业的功能。
每个节点实例启动的时候,elastic-job默认会将ListenerManager
中定义的任务监听器启动,使用curator
来监控instances
节点的数据变化,当出现变化则执行AbstractJobListener
中相应实现类的dataChanged
方法,而这个方法中就是通过JobRegistry
这个类调用quartz的相关方法对任务进行相关操作,所以最终还是操作的是quartz api。
那么elatic-job提供了哪些api操作?
elastic-job提供了三类api操作,配置类API、操作类API和统计类API。
其中配置类API以JobSettingsAPI为代表,对相关的作业的配置进行操作,如修改、删除、获取作业,实际操作的zk上面的节点信息。
操作类API以JobOperateAPI和ShardingOperateAPI为代表,这主要是对作业的操作和分片作业的操作,比如作业的立即触发、暂停、修改、详情、删除、启动等操作。
统计类API以JobStatisticsAPI为代表,展示了作业的一个运行状态信息,分片数量信息等。
具体的怎么使用的可以参考官方文档,这里不做赘述。
工具类
为什么这里会提到这个工具类,主要是原生的console中提供的disable
方法并不能对作业的暂停起到效果,所以使用了下面的工具类对作业进行相关的控制。
JobRegistry.getInstance(): 通过这个可以获取作业的注册表,从而实现对作业的暂停、停止、恢复、立即触发等操作。
LiteJobConfigurationGsonFactory:Lite作业配置的Gson工厂,可以实现json转Lite作业配置和作业配置转json字符串。
支付中心4.0需求
由于之前的支付中心对账任务需求都是通过,原生的quartz进行开发的,不具备高可用的能力,而且将入库耦合在一起,所以提出将对账和入库单独分离出来,改造为具备分布式、高可用的能力。最终的技术选型选择了elastic-job这一款开源的分布式调度框架结合spring-boot框架进行开发。
spring-boot的入门教程可参考《SpringBoot入门教程》-http://eip.teamshub.com/t/3548515
改造过程
以上述章节对elastic-job了解做为此次开发的技术背景,结合支付中心的业务需求对其进行改造。
改造的过程分为两大部分,第一个是任务的配置,第二个是进行任务调度。
这里用的elastic-job是目前最新的2.1.5
版本
引入maven依赖
这里的spring-boot的就不进行相关说明了。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-core</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-lifecycle</artifactId>
<version>${elastic-job.version}</version>
</dependency>
<dependency>
<groupId>com.dangdang</groupId>
<artifactId>elastic-job-lite-spring</artifactId>
<version>${elastic-job.version}</version>
</dependency>
作业开发
elastic-job提供了三种作业类型同时对应了三个作业的接口,这里使用的是Simple类型的作业,所以需要实现这个接口然后重写execute
方法。
代码示例如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36public class BalanceJob implements SimpleJob, ApplicationContextAware {
private static Logger logger = LoggerFactory.getLogger(BalanceJob.class);
private ApplicationContext applicationContext;
public void execute(ShardingContext context) {
int shardingItem = context.getShardingItem();
switch (shardingItem) {
case 0:
doTask(context.getShardingParameter(), context.getJobParameter());
break;
default:
logger.warn("当前任务不需要进行分片处理...");
}
}
private void doTask(String method, String beanName) {
PaymentRetInfo ret = null;
Object handler = applicationContext.getBean(beanName);
//通过反射调用处理作业的相关方法,即ProviderBalanceHandler的balance方法
ret = (PaymentRetInfo) invoke(handler, method, new Object[0]);
if (ret != null) {
logger.info("任务执行结果:retCode->{}, retMsg->{}", ret.getRetCode(),
ret.getMessage());
}
}
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
this.applicationContext = applicationContext;
}
}
上面通过实现ApplicationContextAware
获取容器的上下文环境,从而可以获取相关作业的处理类的实例对象,如AlipayHandler
等支付商对账处理实现类。然后通过反射去调用实例的balance
方法完成对账操作。
作业配置
改造过程中整体的一个相关任务类的配置结构图如下,这里对elastic-job的相关配置进行封装,只需要继承SimpleJobConfigWrapper
即可实现Simple类型相关配置,然后重写init
方法那么在项目启动的时候就会调用此方法,这里可以进行相关任务的实例化操作通过调用initTask
方法或者initOneShardTask
方法。
代码示例:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class PaymentBalanceJobConfig extends SimpleJobConfigWrapper {
private static Logger logger = LoggerFactory.getLogger(PaymentBalanceJobConfig.class);
ZookeeperRegistryCenter regCenter;// 在RegistryCenterConfig中进行实例化
private BalanceTaskDao balanceTaskDao; //从数据读取相关任务配置信息
public BalanceJob balanceJob() {
return new BalanceJob();
}
public ProviderExecuteListener listener() {
return new ProviderExecuteListener();
}
public void init() {
//获取任务
BalanceTask bt = new BalanceTask();
bt.setStatus(EntityConfig.TASK_EFFECTIVE_STATUS); //有效
bt.setUserType(EntityConfig.TASK_PAYMENT_TYPE); //支付中心
List<BalanceTask> balanceTasks = balanceTaskDao.selectByPrimaryKeySelective(bt);
//批量实例化任务
for (BalanceTask balanceTask : balanceTasks) {
String taskId = balanceTask.getTaskId();
String taskName = balanceTask.getTaskName();
String beanName = balanceTask.getBeanName();
String method = balanceTask.getMethod();
String cron = balanceTask.getAutoJobTime();
Assert.notNull(beanName, "任务[" + balanceTask.getTaskId() + "]配置错误,beanName为空");
Assert.notNull(cron, "任务[" + balanceTask.getTaskId() + "]配置错误,cron为空");
//实例化任务
logger.info("--->实例化任务:{}, 执行时间:{}", taskName, cron);
initOneShardTask(balanceJob(), regCenter, listener(), taskId, taskName, cron, method, beanName);
}
}
}
发布部署
在公司的jenkins环境 http://172.21.10.54:18080/jenkins/上已经有配置好的任务,只需执行
payment-manage-balance-web任务进行构建。
jenkins的配置
在支付项目的jenkins上面添加项目
payment-manage-balance-web
其他配置参考之前的项目配置,然后在Build选项中的maven执行参数填入以下配置:
1
2 > -X clean install -Pcore_test -pl payment-manage/payment-manage-balance-web -am -Dmaven.test.skip=true
>表示我只打包
payment-manage-balance-web
这个项目并且一并打包依赖的项目。
打完包之后会得到一个payment-manage-balance-web.jar
的文件,那么这jar就是项目运行的jar。
部署脚本编写
spring-boot.sh 这是一个通用的脚本,可以用来启动打包为jar方式的spring-boot项目。
具体使用方法:1
sh spring-boot.sh [options] [application]
options 选项:start、status、stop、restart
application : 即要操作的应用
如:
- 启动:
sh spring-boot.sh start payment-manage-balance-web.jar
- 查看启动状态:
sh spring-boot.sh status payment-manage-balance-web.jar
脚本源代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78#!/bin/bash
SpringBoot=$2
#启动参数
#JAVA_OPTS="-server -Xms400m -Xmx400m -Xmn300m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=128m -Xverify:none -XX:+DisableExplicitGC -Djava.awt.headless=true"
JAVA_OPTS=""
if [ "$1" = "" ];
then
echo -e "\033[0;31m 未输入操作名 \033[0m \033[0;34m {start|stop|restart|status} \033[0m"
exit 1
fi
if [ "$SpringBoot" = "" ];
then
echo -e "\033[0;31m 未输入应用名 \033[0m"
exit 1
fi
function start()
{
count=`ps -ef |grep java|grep $SpringBoot|grep -v grep|wc -l`
if [ $count != 0 ];then
echo "$SpringBoot is running..."
else
echo "Start $SpringBoot success..."
nohup java $JAVA_OPTS -jar $SpringBoot > /dev/null 2>&1 &
fi
}
function stop()
{
echo "Stop $SpringBoot"
boot_id=`ps -ef |grep java|grep $SpringBoot|grep -v grep|awk '{print $2}'`
count=`ps -ef |grep java|grep $SpringBoot|grep -v grep|wc -l`
if [ $count != 0 ];then
kill $boot_id
count=`ps -ef |grep java|grep $SpringBoot|grep -v grep|wc -l`
boot_id=`ps -ef |grep java|grep $SpringBoot|grep -v grep|awk '{print $2}'`
kill -9 $boot_id
fi
}
function restart()
{
stop
sleep 2
start
}
function status()
{
count=`ps -ef |grep java|grep $SpringBoot|grep -v grep|wc -l`
if [ $count != 0 ];then
echo "$SpringBoot is running..."
else
echo "$SpringBoot is not running..."
fi
}
case $1 in
start)
start;;
stop)
stop;;
restart)
restart;;
status)
status;;
*)
echo -e "\033[0;31m Usage: \033[0m \033[0;34m sh $0 {start|stop|restart|status} {SpringBootJarName} \033[0m
\033[0;31m Example: \033[0m
\033[0;33m sh $0 start esmart-test.jar \033[0m"
esac
作业运行状态查看
打包之后在测试环境172.21.10.54运行sh spring-boot.sh payment-manage-balance-web.jar
然后在浏览器运行172.21.10.54:8089/即可查看到如下结果。
注意这个上面只能看到作业的运行状态,并不能看到作业是否执行成功,可以在管理后台中的对账管理部分可以看到任务是否执行成功。