gpt4 book ai didi

java - Camel : stop the route when the jdbc connection loss is detected

转载 作者:可可西里 更新时间:2023-11-01 02:39:46 27 4
gpt4 key购买 nike

我有一个具有以下路径的应用程序:

from("netty:tcp://localhost:5150?sync=false&keepAlive=true")
.routeId("tcp.input")
.transform()
.simple("insert into tamponems (AVIS) values (\"${in.body}\");")
.to("jdbc:mydb");

此路由每 59 毫秒接收一次新消息。我想在与数据库的连接丢失时停止路由在第二条消息到达之前。主要是,我希望永远不会丢失消息

我是这样进行的:

我添加了一个errorHandler:

errorHandler(deadLetterChannel("direct:backup")
.redeliveryDelay(5L)
.maximumRedeliveries(1)
.retryAttemptedLogLevel(LoggingLevel.WARN)
.logExhausted(false));

我的 errorHandler 尝试重新传递消息,如果再次失败,它会将消息重定向到 deadLetterChannel

以下 deadLetterChannel 将停止 tcp.input 路由并尝试将消息重新传送到数据库:

RoutePolicy policy = new StopRoutePolicy();
from("direct:backup")
.routePolicy(policy)
.errorHandler(
defaultErrorHandler()
.redeliveryDelay(1000L)
.maximumRedeliveries(-1)
.retryAttemptedLogLevel(LoggingLevel.ERROR)
)
.to("jdbc:mydb");

这是routePolicy的代码:

public class StopRoutePolicy extends RoutePolicySupport {

private static final Logger LOG = LoggerFactory.getLogger(String.class);

@Override
public void onExchangeDone(Route route, Exchange exchange) {
String stop = "tcp.input";
CamelContext context = exchange.getContext();
if (context.getRouteStatus(stop) != null && context.getRouteStatus(stop).isStarted()) {
try {
exchange.getContext().getInflightRepository().remove(exchange);
LOG.info("STOP ROUTE: {}", stop);
context.stopRoute(stop);
} catch (Exception e) {
getExceptionHandler().handleException(e);
}
}
}

}

我使用这种方法的问题是:

  • 在我的 "direct:backup" 路由中,如果我将 maximumRedeliveries 设置为 -1,则路由 tcp.input 将永远不会停止
  • 我在 parking 期间丢失了消息
  • 这种检测连接丢失和停止路由的方法太长了

请问,有没有人想过让它更快或者为了不丢失消息而做出不同的想法?

最佳答案

我终于找到了解决问题的方法。为了使应用程序更快,我在 seda 中添加了异步进程多线程

from("netty:tcp://localhost:5150?sync=false&keepAlive=true").to("seda:break");


from("seda:break").threads(5)
.routeId("tcp.input")
.transform()
.simple("insert into tamponems (AVIS) values (\"${in.body}\");")
.to("jdbc:mydb");

我对备用路线做了同样的事情。

from("seda:backup")
.routePolicy(policy)
.errorHandler(
defaultErrorHandler()
.redeliveryDelay(1000L)
.maximumRedeliveries(-1)
.retryAttemptedLogLevel(LoggingLevel.ERROR)
).threads(2).to("jdbc:mydb");

然后我像这样修改了 routePolicy:

public class StopRoutePolicy extends RoutePolicySupport {

private static final Logger LOG = LoggerFactory.getLogger(String.class);

@Override
public void onExchangeBegin(Route route, Exchange exchange) {
String stop = "tcp.input";
CamelContext context = exchange.getContext();
if (context.getRouteStatus(stop) != null && context.getRouteStatus(stop).isStarted()) {
try {
exchange.getContext().getInflightRepository().remove(exchange);
LOG.info("STOP ROUTE: {}", stop);
context.stopRoute(stop);
} catch (Exception e) {
getExceptionHandler().handleException(e);
}
}
}

@Override
public void onExchangeDone(Route route, Exchange exchange) {
String stop = "tcp.input";
CamelContext context = exchange.getContext();
if (context.getRouteStatus(stop) != null && context.getRouteStatus(stop).isStopped()) {
try {
LOG.info("RESTART ROUTE: {}", stop);
context.startRoute(stop);
} catch (Exception e) {
getExceptionHandler().handleException(e);
}
}
}
}

有了这些更新,TCP 路由会在处理备份路由之前停止。并且路由在 jdbc 连接恢复时重新启动

现在,多亏了 Camel,应用程序能够处理数据库故障,而不会丢失消息,也无需人工干预。

希望对您有所帮助。

关于java - Camel : stop the route when the jdbc connection loss is detected,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17705318/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com