- Java锁的逻辑(结合对象头和ObjectMonitor)
- 还在用饼状图?来瞧瞧这些炫酷的百分比可视化新图形(附代码实现)⛵
- 自动注册实体类到EntityFrameworkCore上下文,并适配ABP及ABPVNext
- 基于Sklearn机器学习代码实战
目的:分析xxl-job执行器的注册过程 。
流程:
@xxlJjob
)修饰的 handler
版本: xxl-job 2.3.1 。
建议:下载 xxl-job 源码,按流程图 debug 调试, 看堆栈信息并按文章内容理解执行流程 .
完整流程图:
部分流程图:
首先启动管理台界面(服务 XxlJobAdminApplication ),然后启动项目中给的执行器实例 (SpringBoot) ,
这个方法是扫描项目中使用 @xxlJob 注解的所有handler方法。接着往下走 。
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
//获取该项目中所有的bean,然后遍历
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
//注意点★
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
//没有跳过本次循环继续
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}
//获取了当前执行器中所有@xxl-job的方法,获取方法以及对应的初始化和销毁方法
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
// regist
registJobHandler(xxlJob, bean, executeMethod);
}
}
}
在 Spring 案例执行器中有5个 handler
XxlJobExecutor.registJobHandler()中部分源码 。
String name = xxlJob.value();
//make and simplify the variables since they'll be called several times later
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
然后进行遍历注册;开始进行名字判断:
loadJobHandler 校验方式会去该方法中查找:当bean注册完成后时存放到 jobHandlerRepository 一个 map 类型中,
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
executeMethod.setAccessible(true); 它实现了修改对象访问权限的功能,参数为true,则表示允许调用方在使用反射时忽略Java语言的访问控制检查. 。
往后走会判断该注解的生命周期方法( init和destroy ) 。
//注意MethodJobHandler,后面会用到
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
//添加执行器名字及对应的hob方法信息(当前类、方法、init和destroy属性)
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = clazz.getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
首先检查 @XxlJob 注解中的 init 属性是否存在且不为空。如果存在,则尝试获取该类中名为 init 的方法,并将其设置为 可访问状态 ,以便后续调用.
同理,代码接下来也检查了 @XxlJob 注解中的 destroy 属性是否存在且不为空,如果是,则获取该类中名为 destroy 的方法,并设置其为 可访问状态 .
在这个过程中,如果某个方法不存在或者无法被访问,则会抛出 NoSuchMethodException 异常,并且使用 throw new RuntimeException 将其包装并抛出一个运行时异常。这样做的目的是为了提醒开发人员在任务处理器类中正确地设置 init和destroy 属性,并确保方法名称与属性值一致.
部分流程图:
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
在扫描完执行器中所有的任务后,开始进行执行器注册 XxlJobSpringExecutor中的super.start() 方法.
在初始化执行服务器启动之前,进行了四种操作,初始化日志、初始化 adminBizList 地址(可视化管理台地址)、初始化日志清除、初始化回调线程等.
这里需要注意的是第二步初始化地址,在初始化服务器启动的时候需要用到.
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// fill ip port
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
// generate address
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// accessToken
if (accessToken==null || accessToken.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
}
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
继续到 initEmbedServer ,开始初始化ip地址和端口等, 需要明白的是,这一步的参数获取方式其实是第一步读取 **XxlJobConfig** 获得的; 进行ip的校验和拼接等操作,开始进行真正的注册.
创建一个 嵌入式的HTTP服务器, 将当前执行器信息(包含应用名称和IP地址端口等)注册到注册中心,注册方式的实现在 ExecutorRegistryThread 中实现.
校验名字和注册中心,如果注册中心不可用,则等待一段时间后重新尝试连接.
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
//心跳检测,默认30s
if (!toStop) {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
}
开启一个新线程,首先 构建注册参数(包含执行器分组、执行器名字、执行器本地地址及端口号),遍历注册中心地址,开始进行执行器注册,注册方式通过发送http的post请求.
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
在 debug 的过程中, XxlJobRemotingUtil 执行到 int statusCode = connection.getResponseCode(); 才会跳转到 JobApiController.api 中的注册地址. 。
// services mapping
if ("callback".equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registryRemove(registryParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
最后进入到 JobRegistryHelper.registry() 方法中完成数据库的入库和更新操作.
通过更新语句判断该执行器是否注册,结果小于1,那么保存注册器信息,并向注册中心发送一个请求,更新当前执行器所属的应用名称、执行器名称和 IP 地址等信息,否则跳过.
public ReturnT<String> registry(RegistryParam registryParam) {
//.......
// async execute
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() {
//更新注册表信息
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
//保存执行器注册信息
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
// fresh 刷新执行器状态
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
至此执行器的注册流程分析完成.
部分流程图:
执行器中的任务流程比较简单,如果执行器启动的话,那么每次执行任务是通过 JobThread 通过 Cron 表达式进行操作的.
通过 handler.execute() 进行执行,是在框架内部通过反射机制调用作业处理器对象 handler 中的 execute() 方法实现的。在这个过程中,handler 对象表示被加载的作业处理器,并且已经调用了 init() 方法进行初始化.
method.invoke() 方法使用反射机制调用指定对象 target 中的方法 method 。在这个方法中, target 表示作业处理器对象, method 表示作业处理器中的 execute() 方法.
通过上述方法,获取到 SampleXxlJob.demoJobHandler 的任务,然后开始进行任务逻辑操作.
最后此篇关于【源码分析】XXL-JOB的执行器的注册流程的文章就讲到这里了,如果你想了解更多关于【源码分析】XXL-JOB的执行器的注册流程的内容请搜索CFSDN的文章或继续浏览相关文章,希望大家以后支持我的博客! 。
已关闭。此问题不符合Stack Overflow guidelines 。目前不接受答案。 要求我们推荐或查找工具、库或最喜欢的场外资源的问题对于 Stack Overflow 来说是偏离主题的,因为
首先是一些背景;我们正在开发一个数据仓库,并对我们的 ETL 过程使用哪些工具进行一些研究。该团队非常以开发人员为中心,每个人都熟悉 C#。到目前为止,我已经看过 RhinoETL、Pentaho (
我需要具有管理员权限的进程。从this问题和答案来看,似乎没有比启动单独进程更好的方法了。因为我宁愿有一个专用于该过程的过程,而不是仅为此方法在第二个过程中启动我的原始应用程序–我以为我会在VS201
我有这个函数来压平对象 export function flattenObject(object: Object, prefix: string = "") { return Object.key
我正在开发一个基于java的Web应用程序,它要求我使用来自SIP( session 启动协议(protocol))消息的输入生成序列图。我必须表示不同电话和相应服务器之间的调用流程。我可以利用任何工
这是我的代码: Process p=Runtime.getRuntime().exec("something command"); String s; JFrame frame = new JFram
我对 istio 的 mTLS 流程有点困惑。在bookinginfo 示例中,我看到服务通过http 而不是https 进行调用。如果服务之间有 mTLS 那么服务会进行 http 调用吗? 是否可
很难说出这里问的是什么。这个问题是含糊的、模糊的、不完整的、过于宽泛的或修辞性的,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开它,visit the help center 。 已关
之前做过一个简单的纸牌游戏,对程序的整体流程有自己的想法。我最关心的是卡片触发器。 假设我们有一张名为“Guy”的牌,其效果为“每当你打出另一张牌时,获得 2 点生命”。我将如何将其合并到我的代码中?
我有 4 个 Activity 。 A、B、C 和 D。 用户可以从每个 Activity 开始任何 Activity 。 即 Activity A 有 3 个按钮来启动 B、C 和 D。以同样的方式
我做了一个简单的路由器类,简化后看起来像这样 // @flow import { Container } from 'unstated' type State = { history: Objec
我有两个 Activity ,比如 A1 和 A2。顺序为 A1->A2我从 A1 开始 A2 而没有在 A1 中调用 finish() 。在 A2 中按下后退按钮后,我想在 A1 中触发一个功能。但
我正在考虑在我的下一个项目中使用 BPEL。我试用了 Netbeans BPEL 设计器,我对它很满意。但在我决定使用 BPEL 之前,我想知道它对测试驱动开发的适用程度。不幸的是,我对那个话题知之甚
我需要将两个表格堆叠在一起,前后都有内容。我无法让后面的内容正常流动。堆叠的 table 高度可变。 HTML 结构: ... other content ...
我是 Hibernate 的新手。我无法理解 Hibernate 的流程。请澄清我的疑问。 我有“HibernateUtil.java ”和以下语句 sessionFactory = new Anno
早上好 我开始使用 Ruby,想创建一个小工具来获取我的公共(public) IP 并通过电子邮件发送。我遇到了字符串比较和无法处理的 if/else block 的基本问题。 代码非常简单(见下文)
我目前正尝试在我的团队中建立一个开发流程并阅读有关 GitFlow 的信息。它看起来很有趣,但我可以发现一些问题。 让我们假设以下场景: 我们完成了 F1、F2 和 F3 功能,并将它们 merge
我已经使用 git flow 有一段时间了。我很想了解一个特定的用例。 对于我的一个项目,我有一张新网站功能的门票。此工单取决于许多子任务。我想为主工单创建一个功能分支,然后为每个子任务创建一个脱离父
你怎么知道在一个程序中已经发现并解决了尽可能多的错误? 几年前我读过一篇关于调试的文档(我认为这是某种 HOWTO)。其中,该文档描述了一种技术,其中编程团队故意将错误添加到代码中并将其传递给 QA
我目前正在构建一个微服务架构,并从身份验证服务器和客户端着手。我还想确认使用 token 对用户进行身份验证的最佳流程。 在上图中。第 3 步是我开始感到困惑。我想到了2个解决问题的方法。 每个 ap
我是一名优秀的程序员,十分优秀!