gpt4 book ai didi

spring - 如何运行 implement spring-batch 来逐行处理 csv 文件?

转载 作者:行者123 更新时间:2023-12-04 06:29:29 24 4
gpt4 key购买 nike

我有一个 spring 批处理应用程序,它从 csv 文件读取数据,传递所有行并对其进行处理,传递所有已处理的行并将其写入数据库。非常经典。现在我的问题是 csv 文件太大,我有一个 java 堆空间,所以我想我可以通过每 x 行处理文件来优化它,假设每 10000 里格(释放内存每个 10000 而不是加载内存中的所有行)。

有没有办法告诉 spring-batch 以递归方式处理一个步骤?还是有其他方法可以解决我的问题?

任何建议将不胜感激。谢谢

最佳答案

下面是将下面的csv文件处理成bean的例子

headerA,headerB,headerC
col1,col2,col3

第一行(标题)被忽略,其他列直接映射到“匹配”对象。 (为了简洁起见,仅采用这种方式)。

这是使用 Spring Batch 开箱即用组件的作业配置;

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd">

<batch:job id="fileJob">
<batch:step id="fileJob.step1">
<batch:tasklet>
<batch:chunk reader="fileReader" writer="databaseWriter" commit-interval="10000"/>
</batch:tasklet>
</batch:step>
<batch:validator>
<bean class="org.springframework.batch.core.job.DefaultJobParametersValidator">
<property name="requiredKeys" value="fileName"/>
</bean>
</batch:validator>
</batch:job>

<bean id="fileReader"
class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
<property name="lineMapper" ref="lineMapper"/>
<property name="resource" value="file:#{jobParameters['fileName']}"/>
<property name="linesToSkip" value="1"/>
</bean>

<bean id="lineMapper"
class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
<property name="fieldSetMapper" ref="fieldSetMapper"/>
<property name="lineTokenizer" ref="lineTokenizer"/>
</bean>


<bean id="lineTokenizer"
class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
<property name="delimiter" value=","/>
<property name="names" value="col1,col2,col3"/>
</bean>

<bean id="fieldSetMapper"
class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
<property name="targetType" value="de.incompleteco.spring.batch.domain.SimpleEntity"/>
</bean>

<bean id="databaseWriter"
class="org.springframework.batch.item.database.JdbcBatchItemWriter">
<property name="dataSource" ref="dataSource"/>
<property name="itemSqlParameterSourceProvider">
<bean class="org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider"/>
</property>
<property name="sql" value="insert into simple_entity (col1,col2,col3) values (:col1,:col2,:col3)"/>
</bean>
</beans>

有几个注意事项;

  1. 此作业需要一个参数“fileName”来告诉 fileReader 在哪里可以找到文件。
  2. 设置了一个 jobParametersValidator 以确保参数存在

这里是批量资源配置;

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

<batch:job-repository id="jobRepository"/>

<bean id="jobExplorer"
class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean">
<property name="dataSource" ref="dataSource"/>
</bean>
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository"/>
<property name="taskExecutor" ref="taskExecutor"/>
</bean>

<beans profile="junit">
<jdbc:embedded-database id="dataSource" type="H2">
<jdbc:script location="classpath:/org/springframework/batch/core/schema-h2.sql"/>
<jdbc:script location="classpath:/META-INF/sql/schema-h2.sql"/>
</jdbc:embedded-database>

<task:executor id="taskExecutor"/>

<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
</beans>
</beans>

这里也有一个单元测试

package de.incompleteco.spring.batch;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.File;
import java.io.FileOutputStream;

import javax.sql.DataSource;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration({"classpath:/META-INF/spring/*-context.xml"})
@ActiveProfiles("junit")
public class FileJobIntegrationTest {

@Autowired
private Job job;

@Autowired
private JobLauncher jobLauncher;

@Autowired
private JobExplorer jobExplorer;

@Autowired
private DataSource dataSource;

private int recordCount = 1000000;

private String fileName = System.getProperty("java.io.tmpdir") + File.separator + "test.csv";

@Before
public void before() throws Exception {
if (new File(fileName).exists()) {
new File(fileName).delete();
}//end if
}

@Test
public void test() throws Exception {
//create a file
FileOutputStream fos = new FileOutputStream(fileName);
fos.write("col1,col2,col3".getBytes());
fos.flush();
for (int i=0;i<=recordCount;i++) {
fos.write(new String(i + "," + (i+1) + "," + (i+2) + "\n").getBytes());
fos.flush();//flush it
}//end for
fos.close();
//lets get the size of the file
long length = new File(fileName).length();
System.out.println("file size: " + ((length / 1024) / 1024));
//execute the job
JobParameters jobParameters = new JobParametersBuilder().addString("fileName",fileName).toJobParameters();
JobExecution execution = jobLauncher.run(job,jobParameters);
//monitor
while (jobExplorer.getJobExecution(execution.getId()).isRunning()) {
Thread.sleep(1000);
}//end while
//load again
execution = jobExplorer.getJobExecution(execution.getId());
//test
assertEquals(ExitStatus.COMPLETED.getExitCode(),execution.getExitStatus().getExitCode());
//lets see what's in the database
int count = new JdbcTemplate(dataSource).queryForObject("select count(*) from simple_entity", Integer.class);
//test
assertTrue(count == recordCount);
}

}

关于spring - 如何运行 implement spring-batch 来逐行处理 csv 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16724904/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com