gpt4 book ai didi

spring-batch - 使用 spring 批处理处理大文件

转载 作者:行者123 更新时间:2023-12-04 04:53:22 27 4
gpt4 key购买 nike

我有一个可能包含 100K 到 500K 记录的大文件。我打算使用面向块的处理,我的想法是

1)根据计数将大文件拆分为较小的文件,例如每个文件中的 10K。

2) 如果有 100K 条记录,那么我将得到 10 个文件,每个文件包含 10K 个记录

3) 我想对这 10 个文件进行分区,并想使用 5 个线程进行处理。我正在考虑使用自定义 多资源分区器

4) 5 个线程应该处理拆分过程中创建的所有 10 个文件。

5)我不想创建与文件数相同的线程数,因为在这种情况下我可能会面临内存问题。我正在寻找的是我只想使用 5 个线程处理它们的文件数量(我可以根据我的要求增加)。

专家你能告诉我这可以使用spring batch来实现吗?如果是,请分享指针或引用实现

提前致谢

工作作业配置 xml

<description>Spring Batch File Chunk Processing</description>

<import resource="../config/batch-context.xml" />

<batch:job id="file-partition-batch" job-repository="jobRepository" restartable="false">
<batch:step id="master">
<batch:partition partitioner="partitioner" handler="partitionHandler" />
</batch:step>
</batch:job>

<batch:step id="slave">
<batch:tasklet>
<batch:chunk reader="reader" processor="compositeProcessor"
writer="compositeWriter" commit-interval="5">
</batch:chunk>
</batch:tasklet>
</batch:step>

<bean id="partitionHandler" class="org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler">
<property name="taskExecutor" ref="taskExecutor"/>
<property name="step" ref="slave" />
<property name="gridSize" value="5" />
</bean>

<bean id="partitioner" class="com.poc.partitioner.FileMultiResourcePartitioner">
<property name="resources" value="file:/Users/anupghosh/Documents/Spring_Batch/FilePartitionBatch/*.txt" />
<property name="threadName" value="feed-processor" />
</bean>

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="5" />
<property name="maxPoolSize" value="5" />
</bean>

<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="resource" value="#{stepExecutionContext['fileName']}" />

<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="lineTokenizer">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value="|"/>
<property name="names" value="key,docName,docTypCD,itemType,itemNum,launchDate,status" />
</bean>
</property>
<property name="fieldSetMapper">
<bean class="com.poc.mapper.FileRowMapper" />
</property>
</bean>
</property>
</bean>

<bean id="validatingProcessor" class="org.springframework.batch.item.validator.ValidatingItemProcessor">
<constructor-arg ref="feedRowValidator" />
</bean>

<bean id="feedProcesor" class="com.poc.processor.FeedProcessor" />

<bean id="compositeProcessor" class="org.springframework.batch.item.support.CompositeItemProcessor" scope="step">
<property name="delegates">
<list>
<ref bean="validatingProcessor" />
<ref bean="feedProcesor" />
</list>
</property>
</bean>

<bean id="recordDecWriter" class="com.poc.writer.RecordDecWriter" />

<bean id="reconFlatFileCustomWriter" class="com.poc.writer.ReconFileWriter">
<property name="reconFlatFileWriter" ref="reconFlatFileWriter" />
</bean>

<bean id="reconFlatFileWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step">
<property name="resource" value="file:/Users/anupghosh/Documents/Spring_Batch/recon-#{stepExecutionContext[threadName]}.txt" />
<property name="shouldDeleteIfExists" value="true" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="|" />
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="validationError" />
</bean>
</property>
</bean>
</property>
</bean>

<bean id="compositeWriter" class="org.springframework.batch.item.support.CompositeItemWriter">
<property name="delegates">
<list>
<ref bean="recordDecWriter" />
<ref bean="reconFlatFileCustomWriter" />
</list>
</property>
</bean>

<bean id="feedRowValidator" class="org.springframework.batch.item.validator.SpringValidator">
<property name="validator">
<bean class="com.poc.validator.FeedRowValidator"/>
</property>
</bean>

最佳答案

能够使用 MultiResourcePartitioner 解决这个问题。下面是java配置

@Bean
public Partitioner partitioner() {
MultiResourcePartitioner partitioner = new MultiResourcePartitioner();
ClassLoader cl = this.getClass().getClassLoader();
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver(cl);
Resource[] resources = resolver.getResources("file:" + filePath + "/"+"*.csv");
partitioner.setResources(resources);
partitioner.partition(10);
return partitioner;
}

@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(4);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}

@Bean
@Qualifier("masterStep")
public Step masterStep() {
return stepBuilderFactory.get("masterStep")
.partitioner(ProcessDataStep())
.partitioner("ProcessDataStep",partitioner())
.taskExecutor(taskExecutor())
.listener(pcStressStepListener)
.build();
}


@Bean
@Qualifier("processData")
public Step processData() {
return stepBuilderFactory.get("processData")
.<pojo, pojo> chunk(5000)
.reader(reader)
.processor(processor())
.writer(writer)
.build();
}



@Bean(name="reader")
@StepScope
public FlatFileItemReader<pojo> reader(@Value("#{stepExecutionContext['fileName']}") String filename) {

FlatFileItemReader<pojo> reader = new FlatFileItemReader<>();
reader.setResource(new UrlResource(filename));
reader.setLineMapper(new DefaultLineMapper<pojo>() {
{
setLineTokenizer(new DelimitedLineTokenizer() {
{
setNames(FILE HEADER);


}
});
setFieldSetMapper(new BeanWrapperFieldSetMapper<pojo>() {
{
setTargetType(pojo.class);
}
});
}
});
return reader;
}

关于spring-batch - 使用 spring 批处理处理大文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36263965/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com