gpt4 book ai didi

java - 轮询 S3 中的对象时等待来自池的连接超时

转载 作者:行者123 更新时间:2023-12-02 10:24:43 27 4
gpt4 key购买 nike

我正在开发一个后端服务,该服务使用 spring aws 集成定期轮询 S3 存储桶,并处理来自 S3 的轮询对象。下面是它的实现

@Configuration
@EnableIntegration
@IntegrationComponentScan
@EnableAsync
public class S3PollerConfiguration {

//private static final Logger log = (Logger) LoggerFactory.getLogger(S3PollerConfiguration.class);

@Value("${amazonProperties.bucketName}")
private String bucketName;

@Bean
@InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5"))
public MessageSource<InputStream> s3InboundStreamingMessageSource() {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template());
messageSource.setRemoteDirectory(bucketName);
return messageSource;
}

@Bean
public S3RemoteFileTemplate template() {
return new S3RemoteFileTemplate(new S3SessionFactory(thumbnailGeneratorService.getImagesS3Client()));
}

@Bean
public PollableChannel s3FilesChannel() {
return new QueueChannel();
}

@Bean
IntegrationFlow fileReadingFlow() throws IOException {
return IntegrationFlows
.from(s3InboundStreamingMessageSource(),
e -> e.poller(p -> p.fixedDelay(10, TimeUnit.SECONDS)))
.handle(Message.class, (payload, header) -> processS3Object(payload.getHeaders(), payload.getPayload()))
.get();
}
}

我在对象上传时从 S3 获取消息,并且能够使用作为消息负载的一部分接收到的输入流来处理它。但我在这里面临的问题是,在收到几条消息后,我收到“超时等待来自池的连接”异常

2019-01-06 02:19:06.156 ERROR 11322 --- [ask-scheduler-5] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:445)
at org.springframework.integration.file.remote.RemoteFileTemplate.list(RemoteFileTemplate.java:405)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.listFiles(AbstractRemoteFileStreamingMessageSource.java:194)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.poll(AbstractRemoteFileStreamingMessageSource.java:180)
at org.springframework.integration.aws.inbound.S3StreamingMessageSource.poll(S3StreamingMessageSource.java:70)
at org.springframework.integration.file.remote.AbstractRemoteFileStreamingMessageSource.doReceive(AbstractRemoteFileStreamingMessageSource.java:153)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:155)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:236)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:250)

我知道问题与未关闭打开的 S3Object 有关,如此处所述 https://github.com/aws/aws-sdk-java/issues/1405所以我已经实现了关闭作为消息负载的一部分接收的 S3Object 的输入流。但这并不能解决问题,而且我不断收到异常(exception)情况。有人可以帮我解决这个问题吗?

最佳答案

您的问题是您仍然在配置中将消息注释声明与 Java DSL 混合在一起。

看起来像 fileReadingFlow你关闭那些InputStream您的代码中的 s processS3Object()方法,但你对 InputStream 不执行任何操作由 @InboundChannelAdapter(value = "s3FilesChannel", poller = @Poller(fixedDelay = "5")) 制作。为什么你把它放在第一位?如果您不使用该代码,是什么让您保留它?

这个S3StreamingMessageSource始终被轮询两次:由@InboundChannelAdapterIntegrationFlows.from() .

您只需删除 @InboundChannelAdapter来自S3StreamingMessageSource bean 定义仅此而已。

请阅读更多引用手册来确定此类注释的原因以及在使用 Java DSL 时如何不需要它:

https://docs.spring.io/spring-integration/reference/html/configuration.html#_using_the_literal_inboundchanneladapter_literal_annotation

https://docs.spring.io/spring-integration/reference/html/java-dsl.html#java-dsl-inbound-adapters

关于java - 轮询 S3 中的对象时等待来自池的连接超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54056264/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com