- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在开发Spring Batch Partitioning示例
,在这个示例中我尝试创建一个“Partitioner”
作业,它有10个线程
,每个线程都会根据提供的'id'
范围从数据库读取记录。我引用了 http://www.mkyong.com/spring-batch/spring-batch-partitioning-example/
当我第一次运行代码时,它工作正常,但下次当我再次运行代码时,它会显示以下错误,我希望每个运行文件都应该重新创建,那么我需要为此做什么?我需要做哪些额外配置?
org.springframework.batch.item.ItemStreamException: File already exists: [C:\Users\userpc\Documents\workspace-sts-3.6.4.RELEASE\SpringBatch-Partitioner-Example\csv\outputs\users.processed21-30.csv]
at org.springframework.batch.item.util.FileUtils.setUpOutputFile(FileUtils.java:61)
at org.springframework.batch.item.file.FlatFileItemWriter$OutputState.initializeBufferedWriter(FlatFileItemWriter.java:559)
at org.springframework.batch.item.file.FlatFileItemWriter$OutputState.access$000(FlatFileItemWriter.java:399)
at org.springframework.batch.item.file.FlatFileItemWriter.doOpen(FlatFileItemWriter.java:333)
at org.springframework.batch.item.file.FlatFileItemWriter.open(FlatFileItemWriter.java:323)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.doProceed(DelegatingIntroductionInterceptor.java:133)
at org.springframework.aop.support.DelegatingIntroductionInterceptor.invoke(DelegatingIntroductionInterceptor.java:121)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
at com.sun.proxy.$Proxy5.open(Unknown Source)
at org.springframework.batch.item.support.CompositeItemStream.open(CompositeItemStream.java:96)
at org.springframework.batch.core.step.tasklet.TaskletStep.open(TaskletStep.java:310)
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:197)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:139)
at org.springframework.batch.core.partition.support.TaskExecutorPartitionHandler$1.call(TaskExecutorPartitionHandler.java:136)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
Apr 06, 2016 12:07:29 AM org.springframework.batch.core.step.AbstractStep execute
SEVERE: Encountered an error executing step slave in job partitionJob
用户.java
public class User implements Serializable{
private static final long serialVersionUID = 1L;
private int id;
private String username;
private String password;
private int age;
// setters and getters
}
UserRowMapper.java
public class UserRowMapper implements RowMapper<User> {
@Override
public User mapRow(ResultSet rs, int rowNum) throws SQLException {
User user = new User();
user.setId(rs.getInt("id"));
user.setUsername(rs.getString("username"));
user.setPassword(rs.getString("password"));
user.setAge(rs.getInt("age"));
return user;
}
}
RangePartitioner.java
public class RangePartitioner implements Partitioner {
@Override
public Map<String, ExecutionContext> partition(int gridSize) {
Map<String, ExecutionContext> result = new HashMap<String, ExecutionContext>();
int range = 10;
int fromId = 1;
int toId = range;
for (int i = 1; i <= gridSize; i++) {
ExecutionContext value = new ExecutionContext();
System.out.println("\nStarting : Thread" + i);
System.out.println("fromId : " + fromId);
System.out.println("toId : " + toId);
value.putInt("fromId", fromId);
value.putInt("toId", toId);
// give each thread a name
value.putString("name", "Thread" + i);
result.put("partition" + i, value);
fromId = toId + 1;
toId += range;
}
return result;
}
}
UserProcessor.java
@Component("itemProcessor")
@Scope(value = "step")
public class UserProcessor implements ItemProcessor<User, User> {
@Value("#{stepExecutionContext[name]}")
private String threadName;
@Override
public User process(User item) throws Exception {
System.out.println(threadName + " processing : " + item.getId() + " : " + item.getUsername());
return item;
}
public String getThreadName() {
return threadName;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
}
数据库.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:jdbc="http://www.springframework.org/schema/jdbc"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
http://www.springframework.org/schema/jdbc
http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd">
<!-- connect to database -->
<bean id="dataSource" class="org.springframework.jdbc.datasource.DriverManagerDataSource">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/toga" />
<property name="username" value="root" />
<property name="password" value="root" />
</bean>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<!-- create job-meta tables automatically
<jdbc:initialize-database data-source="dataSource">
<jdbc:script location="org/springframework/batch/core/schema-drop-mysql.sql" />
<jdbc:script location="org/springframework/batch/core/schema-mysql.sql" />
</jdbc:initialize-database>
-->
</beans>
上下文.xml
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.2.xsd">
<!-- stored job-meta in memory -->
<bean id="jobRepository"
class="org.springframework.batch.core.repository.support.MapJobRepositoryFactoryBean">
<property name="transactionManager" ref="transactionManager" />
</bean>
<bean id="transactionManager"
class="org.springframework.batch.support.transaction.ResourcelessTransactionManager" />
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
</beans>
作业分区器.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:batch="http://www.springframework.org/schema/batch" xmlns:util="http://www.springframework.org/schema/util"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
<!-- spring batch core settings -->
<import resource="../config/context.xml" />
<!-- database settings -->
<import resource="../config/database.xml" />
<!-- ============= partitioner job ========== -->
<job id="partitionJob" xmlns="http://www.springframework.org/schema/batch">
<!-- master step, 10 threads (grid-size) -->
<step id="masterStep">
<partition step="slave" partitioner="rangePartitioner">
<handler grid-size="10" task-executor="taskExecutor" />
</partition>
</step>
</job>
<!-- ======= Jobs to run ===== -->
<step id="slave" xmlns="http://www.springframework.org/schema/batch">
<tasklet>
<chunk reader="pagingItemReader" writer="flatFileItemWriter"
processor="itemProcessor" commit-interval="1" />
</tasklet>
</step>
<bean id="rangePartitioner" class="com.mkyong.partition.RangePartitioner" />
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<bean id="itemProcessor" class="com.mkyong.processor.UserProcessor" scope="step">
<property name="threadName" value="#{stepExecutionContext[name]}" />
</bean>
<!-- ========== Paging Item Reader -->
<bean id="pagingItemReader" class="org.springframework.batch.item.database.JdbcPagingItemReader" scope="step">
<property name="dataSource" ref="dataSource" />
<property name="queryProvider">
<bean class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="selectClause" value="select id, username,password, age" />
<property name="fromClause" value="from user" />
<property name="whereClause" value="where id >= :fromId and id <= :toId" />
<property name="sortKey" value="id" />
</bean>
</property>
<!-- Inject via the ExecutionContext in rangePartitioner -->
<property name="parameterValues">
<map>
<entry key="fromId" value="#{stepExecutionContext[fromId]}" />
<entry key="toId" value="#{stepExecutionContext[toId]}" />
</map>
</property>
<property name="pageSize" value="10" />
<property name="rowMapper">
<bean class="com.mkyong.UserRowMapper" />
</property>
</bean>
<!-- ================= csv file writer ============== -->
<bean id="flatFileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step" >
<property name="resource"
value="file:csv/outputs/users.processed#{stepExecutionContext[fromId]}-#{stepExecutionContext[toId]}.csv" />
<property name="appendAllowed" value="false" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="," />
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="id, username, password, age" />
</bean>
</property>
</bean>
</property>
</bean>
<!-- ========= Mongo Item Reader ========-->
<bean id="mongoItemReader" class="org.springframework.batch.item.data.MongoItemReader" scope="step">
<property name="template" ref="mongoTemplate" />
<property name="targetType" value="com.mkyong.User" />
<property name="query"
value="{
'id':{$gt:#{stepExecutionContext[fromId]}, $lte:#{stepExecutionContext[toId]}
} }" />
<property name="sort">
<util:map id="sort">
<entry key="id" value="" />
</util:map>
</property>
</bean>
</beans>
请帮我解决这个问题。
应用程序.java
public class App {
public static void main(String[] args) {
App obj = new App();
obj.run();
}
private void run() {
final String[] springConfig = { "spring/batch/jobs/job-partitioner.xml" };
ApplicationContext context = new ClassPathXmlApplicationContext(springConfig);
JobLauncher jobLauncher = (JobLauncher) context.getBean("jobLauncher");
Job job = (Job) context.getBean("partitionJob");
try {
System.out.println("-----------------------------------------------");
JobExecution execution = jobLauncher.run(job, new JobParameters());
System.out.println("Exit Status : " + execution.getStatus());
System.out.println("Exit Status : " + execution.getAllFailureExceptions());
System.out.println("-----------------------------------------------");
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Done");
}
}
最佳答案
去掉appendAllowed配置即可生效(append默认为false,参见source of flatfileitemwriter)
<bean id="flatFileItemWriter" class="org.springframework.batch.item.file.FlatFileItemWriter" scope="step" >
<property name="resource"
value="file:csv/outputs/users.processed#{stepExecutionContext[fromId]}-#{stepExecutionContext[toId]}.csv" />
<!-- ******* remove this line ***** --><property name="appendAllowed" value="false" />
<property name="lineAggregator">
<bean class="org.springframework.batch.item.file.transform.DelimitedLineAggregator">
<property name="delimiter" value="," />
<property name="fieldExtractor">
<bean class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor">
<property name="names" value="id, username, password, age" />
</bean>
</property>
</bean>
</property>
</bean>
至于为什么这样可以解决问题,原因在于 FlatFileItemWriter 内部
/**
* Flag to indicate that the target file should be appended if it already
* exists. If this flag is set then the flag
* {@link #setShouldDeleteIfExists(boolean) shouldDeleteIfExists} is
* automatically set to false, so that flag should not be set explicitly.
* Defaults value is false.
*
* @param append the flag value to set
*/
public void setAppendAllowed(boolean append) {
this.append = append;
this.shouldDeleteIfExists = false;
}
独立于附加值,shouldDeleteIfExists将设置为 false,这会导致您的问题
看起来像一个错误,它是在 BATCH-2495 创建的
关于java - org.springframework.batch.item.ItemStreamException : File already exists: [SpringBatch-Partitioner-Example\csv\outputs\users.processed21-30.csv],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36434643/
我正在用 C 语言实现一个带有输入和输出重定向的 shell。我可以成功进行输入重定向,但输出重定向不起作用。例如,如果我执行 ls > out.txt,则 out.txt 包含文本“out.txt”
我正在处理创建 AWS API 网关。我正在尝试创建 CloudWatch Log 组并将其命名 API-Gateway-Execution-Logs_${restApiId}/${stageName
我正在修改原作者使用数组构建网页的一些代码: $output[]=$stuff_from_database; $output[]='more stuff'; // etc echo join(
我只想知道它们之间的区别: sort < output 和 sort output 在 Linux 中。它是如何工作的? 最佳答案 这已经在 unix.stackexchange 上讨论过:Perfo
我正在生成外部控制台应用程序并使用异步输出重定向。 as shown in this SO post 我的问题是,在我收到 OutputDataReceived 事件通知之前,生成的进程似乎需要产生一
在 Udemy 上开设类(class)时,我们一直允许使用组件类中的 @Input() 装饰器向组件传递数据。 在阅读 ngBook-2 时,我发现还有另一种方法,即在 @Component 装饰器中
考虑一个 Linux 服务器,它在您的用户的 .bash_profile 中有以下行: echo "Hello world" 因此,每次您通过 ssh 进入它时,您都会看到 Hello world 现
public static void main(String[] args) { String input = new String(JOptionPane.showInputDialog("
我正在使用 MSVS 2008 中的 FFTW3 库对某些数据执行 r2c DFT (n=128)。我已经发现只使用了真实数据 DFT 输出的前半部分……如果我查看我的输出,这似乎是正确的: 0-64
我制作了一个 C 程序,可以从二进制文件中打印出很多值。我相信程序完成它的功能并在它实际显示它吐出的值之前结束。因此,结果我得到了一个可爱的 RUN SUCCESSFUL(总时间:198ms) 突然出
在 hadoop 作业计数器中,“映射输出具体化字节”与“映射输出字节”之间有什么区别?当我禁用映射输出压缩时我没有看到前者所以我猜它是真正的输出字节(压缩)而后者是未压缩的字节? 最佳答案 我认为你
有很多 Stack Overflow 文章与此相关,但没有直接的答案。 这条命令会输出一堆单词 OutputVariable.exe %FILEPATH% 输出: Mary had a little
互联网上的许多文章都使用“标准输入/输出/错误流”术语好像每个术语都与使用的“标准输入/输出/错误设备”术语具有相同的含义在其他文章上。例如,很多文章说标准输出流默认是监视器,但可以重定向到文件、打印
我在 Keras 中使用一些 tensorflow 函数(reduce_sum 和 l2_normalize)在最后一层构建模型时遇到了这个问题。我已经搜索了一个解决方案,但所有这些都与“Keras
我有来自 API 的自定义输出,我想将其格式化为带有一些颜色值的字符串。 最佳答案 输出 channel 可以用 TmLanguage grammar 着色. Output Colorizer扩展扩展
我正在寻找一种方法来查看虚拟机创建过程中发生的情况,因为我使用复杂的集群配置并测试其是否正常工作,我需要能够查看输出,在某些情况下我是不是因为敏感。这与运行remote-exec选项有关 module
当谷歌搜索此错误时没有看到任何相关结果,所以我想发布它。 stack build Building all executables for `gitchapter' once. After a suc
假设module_a里面有register_a,它需要链接到module_b。 register_a 是否应该单独声明并分配给 module_a 的输出: reg register_a; assign
我正在寻找一种方法来查看虚拟机创建过程中发生的情况,因为我使用复杂的集群配置并测试其是否正常工作,我需要能够查看输出,在某些情况下我是不是因为敏感。这与运行remote-exec选项有关 module
输入文件如下 eno::ename::dept::sal 101::emp1::comp1::2800000 201::emp2::comp2::2800000 301::emp3::comp3::3
我是一名优秀的程序员,十分优秀!