gpt4 book ai didi

java - 如何在 Spring Cloud Stream 的事务上下文中使用 MessageChannel?

转载 作者:行者123 更新时间:2023-11-30 05:31:13 27 4
gpt4 key购买 nike

我正在开发一个应用程序,在该应用程序中,我从 IBM MQ 队列中读取数据、处理消息,然后将该消息发送到 Kafka 主题。我正在尝试处理我的卡夫卡经纪人出现故障的情况。在这种情况下,我希望我的应用程序回滚事务并尝试再次写入 Kafka 主题进行 X 次重试,然后我会将消息发送到备份队列。但是,我在 messageChannel.send 调用上抛出异常时遇到问题。我无法使其超时。应用程序在 messageChannel.send 调用上被无限阻止。这是我的代码:

@Component
public class MainQueueListener {

@Value("${mq.queueName}")
String queueName;
private ExecutionFlow executionFlow;

public MainQueueListener(final ExecutionFlow executionFlow ) {
this.executionFlow= executionFlow;
}

/**
* Receive message from main queue.
* The containerFactory is defined in infrastructure.jms.JmsComfig
* @param byteMessage JMSBytesMessage
*/
@JmsListener(containerFactory = "jmsFactory", destination = "${mq.queueName}")
public void receiveMessage(JMSBytesMessage byteMessage) {
executionFlow .execute(byteMessage, queueName);
}
}

事务管理器和监听器容器的配置。

public class JmsConfig {

private JmsErrorHandler errorHandler = new JmsErrorHandler();

/**
* Default JmsListenerContainer could be modified if needed
* @param connectionFactory
*/

@Bean
public JmsListenerContainerFactory<?> jmsFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setErrorHandler(errorHandler);
configurer.configure(factory, connectionFactory);
return factory;
}

/**
* Instanciating a JmsTransactionManager: a local transaction manager. It will receive the ConnectionFactory
*/
@Bean
public PlatformTransactionManager platformTransactionManager(ConnectionFactory connectionFactory) {
JmsTransactionManager jmsTransactionManager = new JmsTransactionManager(connectionFactory);
jmsTransactionManager.setTransactionSynchronization(AbstractPlatformTransactionManager.SYNCHRONIZATION_ON_ACTUAL_TRANSACTION);
jmsTransactionManager.setRollbackOnCommitFailure(true);
return jmsTransactionManager;
}
}

执行流程会调用messageChannel.send:

 public void sendMessage(MessageTarget messageTarget) {

MessageChannel messageChannel;
String topicName = messageTarget.getDestination();
switch (topicName) {
case "A":
messageChannel = MessageStreams.outboundMessageA();
break;
case "B":
messageChannel = MessageStreams.outboundMessageB();
break;
default:
throw new RuntimeException("Invalid destination: " + topicName);
}

Message message = MessageBuilder
.withPayload(messageTarget.getResponse())
.build();
System.out.println(messageChannel.send(message,3000)); //stuck here no timeout
}

最后是我的应用程序配置文件:

spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
transaction:
transaction-id-prefix: txn.
producer:
sync: true
configuration:
acks: all
enable:
idempotence: true
retries: 10
max:
block:
ms: 5000
bindings:
AResponseOutputStreamChannel:
destination: topicA
BResponseOutputStreamChannel:
destination: topicB

mustache:
check-template-location: false


ibm:
mq:
queue-manager: QM1
conn-name: localhost(1414)
channel: DEV.ADMIN.SVRCONN
user: xxx
password: xxx

mq.queueName: Q1
bo-queue: Q2

这是监听器收到消息时的输出,并且我在调用 messageChannel.send 之前杀死了本地 kafka docker 容器。发送调用中的超时或 max.block.ms 参数似乎都没有影响。

2019-08-15 20:47:02,365 WARN  [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:04,371 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:06,281 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:08,391 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:10,399 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:12,408 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:14,419 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:16,425 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:18,434 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:20,342 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:22,556 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:24,565 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:26,470 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:28,377 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:30,386 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:32,289 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:34,397 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:36,408 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available.
2019-08-15 20:47:38,518 WARN [kafka-producer-network-thread | producer-1] org.apache.kafka.clients.NetworkClient: [Producer clientId=producer-1, transactionalId=txn.0] Connection to node 1001 could not be established. Broker may not be available

最佳答案

这是一个错误;我看到这个...

org.apache.kafka.common.errors.TimeoutException: Timeout expired after 5000milliseconds while awaiting EndTxn(COMMIT)

失败后,我们尝试关闭生产者,但我们调用 close() 方法没有超时,因此它会挂起,直到代理恢复为止。

关于java - 如何在 Spring Cloud Stream 的事务上下文中使用 MessageChannel?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57517986/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com