gpt4 book ai didi

java - 为什么 Spring Integration Channel 无法正确排序我的消息?

转载 作者:行者123 更新时间:2023-12-02 09:26:09 29 4
gpt4 key购买 nike

我有这个简单的管道配置。只是想尝试一下 Spring Integration。但是,输出很奇怪。

这是我的代码:

@Slf4j
@Configuration
@RequiredArgsConstructor
public class FileToConsoleIntegration {

@Bean public IntegrationFlow fileToConsoleIntegrationFlow() {
return IntegrationFlows.from(sameSourceDirectory(), spec -> spec.poller(this::getPollerSpec))
.log(LoggingHandler.Level.WARN, "before.sort", m -> m.getHeaders().get("file_name"))
.channel(MessageChannels.queue(5))
.log(LoggingHandler.Level.INFO, "after.sort", m -> m.getHeaders().get("file_name"))
.channel(alphabeticallyReversed())
.log(LoggingHandler.Level.ERROR, "after.sort", m -> m.getHeaders().get("file_name"))
.handle(logInfoMessageHandler())
.get();
}

private PollerSpec getPollerSpec(PollerFactory p) {
return p
.fixedRate(1000)
.maxMessagesPerPoll(10);
}

@Bean public MessageSource<File> sameSourceDirectory() {
FileReadingMessageSource messageSource = new FileReadingMessageSource();
messageSource.setDirectory(new File("input_dir"));
return messageSource;
}

@Bean public MessageHandler logInfoMessageHandler() {
return message ->
log.info("Handling message with headers: {} and payload: {}",
message.getHeaders(), message.getPayload());
}

@Bean
public PriorityChannel alphabeticallyReversed() {
return MessageChannels.priority()
.capacity(5)
.comparator(Comparator.comparing(getFilename(), Comparator.reverseOrder()))
.get();
}

private Function<Message<?>, String> getFilename() {
return a -> (String) a.getHeaders().get("file_name");
}

}

这是我的输入:

/input_dir
01_zhasdfha.txt
02_usfhahjf.txt
05_bsdfasdf.txt
06_asdfasdf.txt

这是我的输出:

2019-10-10 17:24:48.604  WARN 46024 --- [ask-scheduler-1] before.sort                              : 01_zhasdfha.txt
2019-10-10 17:24:48.605 INFO 46024 --- [ask-scheduler-1] after.sort : 01_zhasdfha.txt
2019-10-10 17:24:48.605 ERROR 46024 --- [ask-scheduler-2] after.sort : 01_zhasdfha.txt
2019-10-10 17:24:48.605 WARN 46024 --- [ask-scheduler-1] before.sort : 02_usfhahjf.txt
2019-10-10 17:24:48.605 INFO 46024 --- [ask-scheduler-1] after.sort : 02_usfhahjf.txt
2019-10-10 17:24:48.606 WARN 46024 --- [ask-scheduler-1] before.sort : 05_bsdfasdf.txt
2019-10-10 17:24:48.606 ERROR 46024 --- [ask-scheduler-2] after.sort : 02_usfhahjf.txt
2019-10-10 17:24:48.606 INFO 46024 --- [ask-scheduler-1] after.sort : 05_bsdfasdf.txt
2019-10-10 17:24:48.606 ERROR 46024 --- [ask-scheduler-2] after.sort : 05_bsdfasdf.txt
2019-10-10 17:24:48.606 WARN 46024 --- [ask-scheduler-1] before.sort : 06_asdfasdf.txt
2019-10-10 17:24:48.606 INFO 46024 --- [ask-scheduler-1] after.sort : 06_asdfasdf.txt
2019-10-10 17:24:48.606 ERROR 46024 --- [ask-scheduler-2] after.sort : 06_asdfasdf.txt
2019-10-10 17:24:48.605 INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration : Handling message with headers: {file_originalFile=input_dir/01_zhasdfha.txt, id=5fcfb1cc-7187-86a8-dcf4-23e9b31b2376, file_name=01_zhasdfha.txt, file_relativePath=01_zhasdfha.txt, timestamp=1570728288603} and payload: input_dir/01_zhasdfha.txt
2019-10-10 17:24:48.607 INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration : Handling message with headers: {file_originalFile=input_dir/06_asdfasdf.txt, id=cf1a978b-c577-bb14-cf45-cf5b7ce8f2a7, file_name=06_asdfasdf.txt, file_relativePath=06_asdfasdf.txt, timestamp=1570728288606} and payload: input_dir/06_asdfasdf.txt
2019-10-10 17:24:48.607 INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration : Handling message with headers: {file_originalFile=input_dir/05_bsdfasdf.txt, id=c81dbcdc-6dd0-b549-4923-fe567ff6ca23, file_name=05_bsdfasdf.txt, file_relativePath=05_bsdfasdf.txt, timestamp=1570728288606} and payload: input_dir/05_bsdfasdf.txt
2019-10-10 17:24:48.607 INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration : Handling message with headers: {file_originalFile=input_dir/02_usfhahjf.txt, id=3371143b-a097-a9ee-e303-d4d359875188, file_name=02_usfhahjf.txt, file_relativePath=02_usfhahjf.txt, timestamp=1570728288605} and payload: input_dir/02_usfhahjf.txt
2019-10-10 17:24:48.609 INFO 46024 --- [ main] c.s.i.siexample.SiExampleApplication : Started SiExampleApplication in 0.743 seconds (JVM running for 1.232)

期望的输出:

2019-10-10 17:24:48.607  INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration       : Handling message with headers: {file_originalFile=input_dir/06_asdfasdf.txt, id=cf1a978b-c577-bb14-cf45-cf5b7ce8f2a7, file_name=06_asdfasdf.txt, file_relativePath=06_asdfasdf.txt, timestamp=1570728288606} and payload: input_dir/06_asdfasdf.txt
2019-10-10 17:24:48.607 INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration : Handling message with headers: {file_originalFile=input_dir/05_bsdfasdf.txt, id=c81dbcdc-6dd0-b549-4923-fe567ff6ca23, file_name=05_bsdfasdf.txt, file_relativePath=05_bsdfasdf.txt, timestamp=1570728288606} and payload: input_dir/05_bsdfasdf.txt
2019-10-10 17:24:48.607 INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration : Handling message with headers: {file_originalFile=input_dir/02_usfhahjf.txt, id=3371143b-a097-a9ee-e303-d4d359875188, file_name=02_usfhahjf.txt, file_relativePath=02_usfhahjf.txt, timestamp=1570728288605} and payload: input_dir/02_usfhahjf.txt
2019-10-10 17:24:48.605 INFO 46024 --- [ask-scheduler-3] c.s.i.s.i.FileToConsoleIntegration : Handling message with headers: {file_originalFile=input_dir/01_zhasdfha.txt, id=5fcfb1cc-7187-86a8-dcf4-23e9b31b2376, file_name=01_zhasdfha.txt, file_relativePath=01_zhasdfha.txt, timestamp=1570728288603} and payload: input_dir/01_zhasdfha.txt

期望输出和当前输出之间的区别在于顺序困惑。我该如何解决这个问题?

最佳答案

仅仅因为您将队列大小设置为 5,并不意味着框架将在下游轮询器拉取它们之前等待所有 5 个队列大小。如果有轮询器正在等待消息(默认情况下每次轮询最多等待 1 秒),则将立即接收该消息。您可以尝试将轮询器的接收超时设置为 0,但仍然会存在(小)竞争条件,即可以比您预期更早地获取消息。

最好使用聚合器和转换器来对 List<> 进行排序。 (或聚合器上的自定义输出处理器),然后是分离器。

关于java - 为什么 Spring Integration Channel 无法正确排序我的消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58328382/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com