gpt4 book ai didi

java - @Poller-s 在 Spring Integration 中如何工作?

转载 作者:行者123 更新时间:2023-12-02 08:39:46 25 4
gpt4 key购买 nike

我正在使用两个 PollableChannel 构建 Sprint Integration 的实现:

  1. 常规 channel
  2. 错误 channel

消息从常规 channel 轮询并进行处理。如果处理过程中出现错误(例如,外部服务不可用),消息将被发送到错误 channel 。从错误 channel 重新排队到常规 channel ,如此循环直到消息被成功处理。

这个想法是不频繁地轮询错误 channel ,以便给处理器一些时间(希望)恢复。

我在以下测试中模拟了此工作流程:

package com.stackoverflow.questions.sipoller;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.FIVE_MINUTES;
import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;

@SpringBootTest
class SiPollerApplicationTests {

private final static Logger LOG = LoggerFactory.getLogger(SiPollerApplicationTests.class);

private final static String QUEUE_CHANNEL_REGULAR = "queueChannelRegular";
private final static String QUEUE_CHANNEL_ERROR = "queueChannelError";

private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds

private final static AtomicInteger NUMBER_OF_ATTEMPTS = new AtomicInteger();
private final static AtomicInteger NUMBER_OF_SUCCESSES = new AtomicInteger();
private final static List<Instant> ATTEMPT_INSTANTS = Collections.synchronizedList(new ArrayList<>());

@Autowired
@Qualifier(QUEUE_CHANNEL_REGULAR)
private PollableChannel channelRegular;

@Test
void testTimingOfMessageProcessing() {
channelRegular.send(MessageBuilder.withPayload("Test message").build());

await()
.atMost(FIVE_MINUTES)
.with()
.pollInterval(ONE_HUNDRED_MILLISECONDS)
.until(
() -> {
if (NUMBER_OF_SUCCESSES.intValue() == 1) {
reportGaps();
return true;
}
return false;
}
);
}

private void reportGaps() {
List<Long> gaps = IntStream
.range(1, ATTEMPT_INSTANTS.size())
.mapToObj(
i -> Duration
.between(
ATTEMPT_INSTANTS.get(i - 1),
ATTEMPT_INSTANTS.get(i)
)
.toMillis()
)
.collect(Collectors.toList());
LOG.info("Gaps between attempts (in ms): {}", gaps);
}

@Configuration
@EnableIntegration
@Import(SiPollerApplicationTestEndpoint.class)
static class SiPollerApplicationTestConfig {

@Bean(name = QUEUE_CHANNEL_REGULAR)
public PollableChannel queueChannelRegular() {
return MessageChannels.queue(QUEUE_CHANNEL_REGULAR).get();
}

@Bean(name = QUEUE_CHANNEL_ERROR)
public PollableChannel queueChannelError() {
return MessageChannels.queue(QUEUE_CHANNEL_ERROR).get();
}

@Router(
inputChannel = QUEUE_CHANNEL_ERROR,
poller = @Poller(fixedRate = POLLER_PERIOD_ERROR)
)
public String retryProcessing() {
return QUEUE_CHANNEL_REGULAR;
}
}

@MessageEndpoint
static class SiPollerApplicationTestEndpoint {

@Autowired
@Qualifier(QUEUE_CHANNEL_ERROR)
private PollableChannel channelError;

@ServiceActivator(
inputChannel = QUEUE_CHANNEL_REGULAR,
poller = @Poller(fixedRate = POLLER_PERIOD_REGULAR)
)
public void handleMessage(Message<String> message) {
// Count and time attempts
int numberOfAttempts = NUMBER_OF_ATTEMPTS.getAndIncrement();
ATTEMPT_INSTANTS.add(Instant.now());

// First few times - refuse to process message and bounce it into
// error channel
if (numberOfAttempts < 5) {
channelError.send(message);
return;
}

// After that - process message
NUMBER_OF_SUCCESSES.getAndIncrement();
}
}

}

pom.xml 依赖项是:

    <dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.2</version>
<scope>test</scope>
</dependency>

</dependencies>

注意Poller的配置:

    private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds

常规 channel 应每半秒轮询一次,错误 channel 应每三秒轮询一次。

测试模拟消息处理期间的中断:前五次处理消息的尝试都被拒绝。此外,测试还会记录每次处理尝试的即时。最后,在我的机器上,测试输出:

Gaps between attempts (in ms): [1, 0, 0, 0, 0]

换句话说,每次失败后几乎都会立即重试该消息。

在我看来,我从根本上误解了 Poller 在 Spring Integration 中的工作原理。所以我的问题是:

  1. 为什么轮询器配置与实际轮询频率之间存在如此不一致的情况。
  2. Spring Integration 是否提供了一种方法来实现我所描述的模式?

最佳答案

有两个设置可以影响此行为。

QueueChannel 轮询器默认会耗尽队列; setMaxMessagesPerPoll(1) 每次轮询仅接收一条消息。

此外,默认情况下,QueueChannel 默认超时为 1 秒(1000 毫秒)。

所以第一次民意调查可能比你想象的要早;将其设置为 0 可以在队列中没有消息时立即退出。

关于java - @Poller-s 在 Spring Integration 中如何工作?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61448123/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com