gpt4 book ai didi

java - 适当使用 Spring Integration Java DSL 加上 AmazonS3InboundSynchronizationMessageSource

转载 作者:太空宇宙 更新时间:2023-11-04 13:28:50 28 4
gpt4 key购买 nike

我正在使用 AmazonS3InboundSynchronizationMessageSource 读取分布在 S3 存储桶子目录中的数百万个文件,这些文件按 type >> 年 >> 月 >> 日 >> 小时 >> {filename}-{uniqueid}.gz 组织。理想情况下,我想轮询和写入,并让同步器记住我在后续轮询中读取的最后一个位置,以检索后续批处理。但这不是上面 MessageSource 的设计方式。

无论如何,我可以通过选择一个范围并阅读内容来解决这个问题。

除此之外,如果我采取一种简单的方法并在第一次轮询时从一个目录中读取文件;我想在那之后关闭(System.exit)(实际上是在下面的注释中进行一些处理之后)。

所以,类似于这里问的问题:

Spring Integration Inbound-channel-adapter: make one poll and exit

我只想轮询一次并在第一次轮询后退出。 (也许有不同的方法来解决这个问题?我愿意接受建议)。

应用程序引导

@SpringBootApplication
@EnableIntegration
@IntegrationComponentScan
public class DataMigrationApp extends SpringBootServletInitializer {

@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(DataMigrationApp.class);
}

public static void main(String[] args) {
SpringApplication.run(DataMigrationApp.class, args);
}

}

已更新 (2015-09-06)

代码示例

@Configuration
public class DataMigrationModule {

private final Logger log = LoggerFactory.getLogger(getClass());

@Value("${cloud.aws.credentials.accessKey}")
private String accessKey;

@Value("${cloud.aws.credentials.secretKey}")
private String secretKey;

@Value("${cloud.aws.s3.bucket}")
private String bucket;

@Value("${cloud.aws.s3.max-objects-per-batch:1024}")
private int maxObjectsPerBatch;

@Value("${cloud.aws.s3.accept-subfolders:false}")
private String acceptSubFolders;

@Value("${cloud.aws.s3.remote-directory}")
private String remoteDirectory;

@Value("${cloud.aws.s3.local-directory:target/s3-dump}")
private String localDirectory;

@Value("${cloud.aws.s3.filename-wildcard:}")
private String fileNameWildcard;

@Value("${app.persistent-type:}")
private String persistentType;

@Value("${app.repository-type:}")
private String repositoryType;

@Value("${app.persistence-batch-size:2500}")
private int persistenceBatchSize;

@Autowired
private ListableBeanFactory beanFactory;

private final AtomicBoolean invoked = new AtomicBoolean();

public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}

private FileToInputStreamTransformer unzipTransformer() {
FileToInputStreamTransformer transformer = new FileToInputStreamTransformer();
transformer.setDeleteFiles(true);
return transformer;
}

private Class<?> repositoryType() {
try {
return Class.forName(repositoryType);
} catch (ClassNotFoundException cnfe) {
log.error("DataMigrationModule.failure -- (Unknown repository implementation!)", cnfe);
System.exit(0);
}
return null;
}

private Class<?> persistentType() {
try {
return Class.forName(persistentType);
} catch (ClassNotFoundException cnfe) {
log.error("DataMigrationModule.failure -- (Unsupported type!)", cnfe);
System.exit(0);
}
return null;
}

@Bean
public MessageSource<?> amazonS3InboundSynchronizationMessageSource() {
AWSCredentials credentials = new BasicAWSCredentials(this.accessKey, this.secretKey);
AmazonS3InboundSynchronizationMessageSource messageSource = new AmazonS3InboundSynchronizationMessageSource();
messageSource.setCredentials(credentials);
messageSource.setBucket(bucket);
messageSource.setMaxObjectsPerBatch(maxObjectsPerBatch);
messageSource.setAcceptSubFolders(Boolean.valueOf(acceptSubFolders));
messageSource.setRemoteDirectory(remoteDirectory);
if (!fileNameWildcard.isEmpty()) {
messageSource.setFileNameWildcard(fileNameWildcard);
}
String directory = System.getProperty("java.io.tmpdir");
if (!localDirectory.startsWith("/")) {
localDirectory = "/" + localDirectory;
}
if (!localDirectory.endsWith("/")) {
localDirectory = localDirectory + "/";
}
directory = directory + localDirectory;
FileUtils.mkdir(directory);
messageSource.setDirectory(new LiteralExpression(directory));
return messageSource;
}

@Bean
DirectChannel inputChannel() {
return new DirectChannel();
}

@Bean
JdbcRepositoryHandler jdbcRepositoryHandler() {
return new JdbcRepositoryHandler(repositoryType(), beanFactory);
}

@Bean
public IntegrationFlow flow() {
// formatter:off
return IntegrationFlows
.from(
this.amazonS3InboundSynchronizationMessageSource(),
e -> e.poller(p -> p.trigger(this::nextExecutionTime))
)
.transform(unzipTransformer())
// TODO add advised PollableChannel to deal with possible decompression issues

.split(f -> new FileSplitter())
.channel(MessageChannels.executor(Executors.newCachedThreadPool()))
.transform(Transformers.fromJson(persistentType()))
// TODO add advised PollableChannel to deal with possible transform issues

// @see http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to
.aggregate(a ->
a.releaseStrategy(g -> g.size() == persistenceBatchSize)
.expireGroupsUponCompletion(true)
.sendPartialResultOnExpiry(true)
.groupTimeoutExpression("size() ge 2 ? 10000 : -1")
, null
)
.handle(jdbcRepositoryHandler())
// TODO add advised PollableChannel to deal with possible persistence issue and retry with partial batch
.get();
// formatter:on
}

public class JdbcRepositoryHandler extends AbstractReplyProducingMessageHandler {

private final Logger log = LoggerFactory.getLogger(getClass());

@SuppressWarnings("rawtypes")
private Insertable repository;

public JdbcRepositoryHandler(Class<?> repositoryClass, ListableBeanFactory beanFactory) {
repository = (Insertable<?>) beanFactory.getBean(repositoryClass);
}

@Override
protected Object handleRequestMessage(Message<?> message) {
List<?> result = null;
try {
result = repository.insert((List<?>) message.getPayload());
} catch (TransactionSystemException | DataAccessException e) {
// TODO Quite a bit more work to add retry capability for records that didn't cause failure
log.error("DataMigrationModule.failure -- (Could not persist batch!)", ExceptionUtils.getStackTrace(e));
}
return result;
}

}

public class FileToInputStreamTransformer extends AbstractFilePayloadTransformer<InputStream> {

@Override
protected InputStream transformFile(File payload) throws Exception {
return new GZIPInputStream(new FileInputStream(payload));
}
}

}

最佳答案

实际上不确定你的问题是什么。

顺便说一下,你走的是对的路。

对于OnlyOnceTrigger,您可以使用我的测试用例中的类似内容:

    private final AtomicBoolean invoked = new AtomicBoolean();

public Date nextExecutionTime(TriggerContext triggerContext) {
return this.invoked.getAndSet(true) ? null : new Date();
}

...

e -> e.poller(p -> p.trigger(this::nextExecutionTime))

要解压缩文件,您应该执行以下操作:

.<File, InputStream>transform(p -> new GZIPInputStream(new FileInputStream(p)))

您必须这样做,因为有一个 FileSplitter 开箱即用的组件可以逐行读取文件并为每个文件发出消息。并且它支持 InputStream 作为有效负载,以避免将整个文件加载到内存中。

因此,IntegrationFlow 中的下一个 EIP 方法如下:

.split(new FileSplitter())

不确定之后是否需要将每个域对象聚合到某个列表以进行进一步的批量插入,因为您可以通过 ExecutorChannel 逐一分发它们

如您所见,删除解压文件步骤中没有任何原因。

以及最后一个删除所有 *.gz 文件步骤。只是因为您可能依赖 AcceptOnceFileListFilter 来避免在下一个轮询任务中重新读取同一文件。

如果我错过了什么,请告诉我。

关于java - 适当使用 Spring Integration Java DSL 加上 AmazonS3InboundSynchronizationMessageSource,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32388806/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com