gpt4 book ai didi

java - Spring Batch ItemReader + RabbitMQ - 未指定 'queue'

转载 作者:行者123 更新时间:2023-12-02 08:55:56 27 4
gpt4 key购买 nike

我正在运行2个服务:一个连接到数据库并将数据发送到消息代理,另一个应该从rabbit获取消息并通过批处理将其发送到targetDB。我在每个服务中都有相同的 RabbitConfiguration 但由于某种原因我得到:

org.springframework.amqp.AmqpIllegalStateException: No 'queue' specified. Check configuration of RabbitTemplate.
at org.springframework.amqp.rabbit.core.RabbitTemplate.getRequiredQueue(RabbitTemplate.java:2410) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndConvert(RabbitTemplate.java:1203) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE]
at org.springframework.batch.item.amqp.AmqpItemReader.read(AmqpItemReader.java:57) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:99) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.read(SimpleChunkProvider.java:180) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:126) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:118) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:71) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.2.RELEASE.jar:5.2.2.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.TaskExecutorRepeatTemplate$ExecutingRunnable.run(TaskExecutorRepeatTemplate.java:262) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]

2020-03-01 19:50:36.157 INFO 1748 --- [ restartedMain] o.s.batch.core.step.AbstractStep : Step: [step1] executed in 80ms
2020-03-01 19:50:36.183 INFO 1748 --- [ restartedMain] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=importClientJob]] completed with the following parameters: [{run.id=19}] and the following status: [FAILED] in 142ms

配置类:

@Configuration
public class RabbitConfiguration {
public static final String MESSAGE_EXCHANGE = "clients-exchange";
public static final String MESSAGE_QUEUE = "clients-queue";
public static final String MESSAGE_ROUTING_KEY = "clients.msg";

private final ConnectionFactory connectionFactory;

public RabbitConfiguration(ConnectionFactory connectionFactory) {
this.connectionFactory = connectionFactory;
}

@Bean
Queue queue() {
return new Queue(MESSAGE_QUEUE, true);
}

@Bean
TopicExchange exchange() {
return new TopicExchange(MESSAGE_EXCHANGE);
}

@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(MESSAGE_ROUTING_KEY);
}

@Bean
RabbitTemplate rabbitTemplate(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange(MESSAGE_EXCHANGE);
rabbitTemplate.setRoutingKey(MESSAGE_ROUTING_KEY);
rabbitTemplate.setTaskExecutor(taskExecutor);
return rabbitTemplate;
}
}

和批量配置:

@Configuration
public class BatchConfiguration {

private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;

@Value("${pusher.spring-batch-chunk-size}")
private int chunkSize;

private DataSource dataSource;
private RabbitTemplate rabbitTemplate;
private NamedParameterJdbcTemplate namedParameterJdbcTemplate;
private ClientPreparedStatementSetter clientPreparedStatementSetter;

@Autowired
public BatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, DataSource dataSource, RabbitTemplate rabbitTemplate, NamedParameterJdbcTemplate namedParameterJdbcTemplate, ClientPreparedStatementSetter clientPreparedStatementSetter) {
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.dataSource = dataSource;
this.rabbitTemplate = rabbitTemplate;
this.namedParameterJdbcTemplate = namedParameterJdbcTemplate;
this.clientPreparedStatementSetter = clientPreparedStatementSetter;
}

@Bean
public JdbcBatchItemWriter<Client> cursorItemWriter() {
return new JdbcBatchItemWriterBuilder<Client>()
.dataSource(this.dataSource)
.namedParametersJdbcTemplate(namedParameterJdbcTemplate)
.itemPreparedStatementSetter(clientPreparedStatementSetter)
.sql("INSERT INTO CLIENT (id, firstname, lastname, email, phone) VALUES (?,?,?,?,?)")
.build();
}

@Bean
public AmqpItemReader<Client> clientAmqpItemReader() {
return new AmqpItemReaderBuilder<Client>()
.amqpTemplate(rabbitTemplate)
.build();
}

@Bean
public ClientLowerCaseProcessor lowerCaseProcessor() {
return new ClientLowerCaseProcessor();
}

@Bean
public Job importClientJob(Step step1) {
return jobBuilderFactory.get("importClientJob")
.incrementer(new RunIdIncrementer())
.flow(step1)
.end()
.build();
}

@Bean
public Step step1(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor) {
return stepBuilderFactory.get("step1")
.<Client, Client>chunk(chunkSize)
.reader(clientAmqpItemReader())
.processor(lowerCaseProcessor())
.writer(cursorItemWriter())
.taskExecutor(taskExecutor)
.build();
}
}

我尝试删除任务执行器并摆弄配置,但没有成功。

最佳答案

org.springframework.amqp.AmqpIllegalStateException: No 'queue' specified. Check configuration of RabbitTemplate. at org.springframework.amqp.rabbit.core.RabbitTemplate.getRequiredQueue(RabbitTemplate.java:2410) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE] at org.springframework.amqp.rabbit.core.RabbitTemplate.receiveAndConvert(RabbitTemplate.java:1203) ~[spring-rabbit-2.2.2.RELEASE.jar:2.2.2.RELEASE] at org.springframework.batch.item.amqp.AmqpItemReader.read(AmqpItemReader.java:57) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]

spring-batch 中的

AmqpItemReader 使用 RabbitTemplate#receive() 从 RabbitMQ 接收消息,它需要您在 中设置 defaultReceiveQueue >RabbitTemplate 指定哪个队列接收消息,但您错过了配置它。所以在配置RabbitTemplate时必须指定队列名称:

@Bean
RabbitTemplate rabbitTemplate(@Qualifier("taskExecutor") ThreadPoolTaskExecutor taskExecutor) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setExchange(MESSAGE_EXCHANGE);
rabbitTemplate.setRoutingKey(MESSAGE_ROUTING_KEY);
rabbitTemplate.setDefaultReceiveQueue("someQueue");
rabbitTemplate.setTaskExecutor(taskExecutor);
return rabbitTemplate;
}

关于java - Spring Batch ItemReader + RabbitMQ - 未指定 'queue',我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60480400/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com