gpt4 book ai didi

multithreading - 使用 Spring 批处理文件项阅读器进行多线程处理

转载 作者:行者123 更新时间:2023-12-04 23:51:25 25 4
gpt4 key购买 nike

在 Spring Batch 中,我试图读取一个 CSV 文件并希望将每一行分配给一个单独的线程并对其进行处理。我试图通过使用 TaskExecutor 来实现它,但是所有线程都在一次选择同一行。我也尝试使用 Partioner 来实现这个概念,也发生了同样的事情。请参阅下面我的配置 Xml。

步骤说明

    <step id="Step2">
<tasklet task-executor="taskExecutor">
<chunk reader="reader" processor="processor" writer="writer" commit-interval="1" skip-limit="1">
</chunk>
</tasklet>
</step>

<bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader">
<property name="resource" value="file:cvs/user.csv" />

<property name="lineMapper">
<bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<!-- split it -->
<property name="lineTokenizer">
<bean
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="names" value="userid,customerId,ssoId,flag1,flag2" />
</bean>
</property>
<property name="fieldSetMapper">

<!-- map to an object -->
<bean
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="prototypeBeanName" value="user" />
</bean>
</property>

</bean>
</property>

</bean>

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor">
<property name="concurrencyLimit" value="4"/>

我尝试过不同类型的任务执行器,但它们的行为方式都相同。如何将每一行分配给单独的线程?

最佳答案

FlatFileItemReader 不是线程安全的。在您的示例中,您可以尝试将 CSV 文件拆分为较小的 CSV 文件,然后使用 MultiResourcePartitioner处理它们中的每一个。这可以分两步完成,一个用于拆分原始文件(如 10 个较小的文件),另一个用于处理拆分的文件。这样您就不会遇到任何问题,因为每个文件都将由一个线程处理。

例子:

<batch:job id="csvsplitandprocess">
<batch:step id="step1" next="step2master">
<batch:tasklet>
<batch:chunk reader="largecsvreader" writer="csvwriter" commit-interval="500">
</batch:chunk>
</batch:tasklet>
</batch:step>
<batch:step id="step2master">
<partition step="step2" partitioner="partitioner">
<handler grid-size="10" task-executor="taskExecutor"/>
</partition>
</batch:step>
</batch:job>

<batch:step id="step2">
<batch:tasklet>
<batch:chunk reader="smallcsvreader" writer="writer" commit-interval="100">
</batch:chunk>
</batch:tasklet>
</batch:step>


<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="10" />
</bean>

<bean id="partitioner"
class="org.springframework.batch.core.partition.support.MultiResourcePartitioner">
<property name="resources" value="file:cvs/extracted/*.csv" />
</bean>

替代分区的替代方案可能是自定义线程安全读取器,它将为每一行创建一个线程,但分区可能是您的最佳选择

关于multithreading - 使用 Spring 批处理文件项阅读器进行多线程处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21279463/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com