gpt4 book ai didi

spring - 如何在 Spring Batch 中使用 ClassifierCompositeItemProcessor 并将数据写入同一个表以进行 Insert 和 Upsert?

转载 作者:行者123 更新时间:2023-12-04 08:40:52 24 4
gpt4 key购买 nike

我浏览了链接 - https://github.com/spring-projects/spring-batch/blob/master/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/support/ClassifierCompositeItemProcessorTests.java ,但并没有受到太大影响。
我正在尝试将 ETL Informatica 映射逻辑替换为批处理。我想分开Status=IStatus=U进入单独的(个人)处理器,然后进一步执行查找和数据处理,然后将这些记录直接写入表中,状态 = I 和状态 = U,执行另一个复杂的逻辑(如查找、按摩和匹配和合并逻辑)和然后将这些记录再次插入同一个表中。
我试图做 POC,我希望在处理器中隔离记录
客户分类器.java

public class CustomerClassifier implements Classifier<Customer, ItemProcessor<Customer, Customer>> {

private ItemProcessor<Customer, Customer> insertCustomerProcessor;
private ItemProcessor<Customer, Customer> updateCustomerProcessor;

public CustomerClassifier(ItemProcessor<Customer, Customer> evenCustomerProcessor, ItemProcessor<Customer, Customer> oddCustomerProcessor) {
this.insertCustomerProcessor= insertCustomerProcessor;
this.updateCustomerProcessor= updateCustomerProcessor;
}

@Override
public ItemProcessor<Customer, Customer> classify(Customer customer) {
return customer.getStatus().equals("I") ? insertCustomerProcessor : updateCustomerProcessor;
}
}
奇怪的客户处理器.java
public class OddCustomerProcessor implements ItemProcessor<Customer, Customer> {

@Override
public Customer process(Customer item) throws Exception {
Customer customer = new Customer();
// Perform some msaaging and lookups here
customer.setId(item.getId());
customer.setFirstName(item.getFirstName());
customer.setLastName(item.getLastName());
customer.setBirthdate(item.getBirthdate());
customer.setStatus(item.getStatus());
return customer;
}
}
EvenCustomerProcessor.java
public class EvenCustomerProcessor implements ItemProcessor<Customer, Customer> {

@Override
public Customer process(Customer item) throws Exception {
Customer customer = new Customer();
// Perform some msaaging and lookups here
customer.setId(item.getId());
customer.setFirstName(item.getFirstName());
customer.setLastName(item.getLastName());
customer.setBirthdate(item.getBirthdate());
customer.setStatus(item.getStatus());
return customer;
}
}
自定义线聚合器.java
public class CustomLineAggregator implements LineAggregator<Customer> {
private ObjectMapper objectMapper = new ObjectMapper();

@Override
public String aggregate(Customer item) {
try {
return objectMapper.writeValueAsString(item);
} catch (Exception e) {
throw new RuntimeException("Unable to serialize Customer", e);
}
}
}
客户.java
@Data
@AllArgsConstructor
@Builder
@NoArgsConstructor
public class Customer {
private Long id;
private String firstName;
private String lastName;
private String birthdate;
private String status;
}
错误-

The method setClassifier(Classifier<? super Customer,ItemProcessor<?,? extends Customer>>) in the type ClassifierCompositeItemProcessor<Customer,Customer> is not applicable for thearguments (CustomerClassifier)


配置
@Configuration
public class JobConfiguration {
@Autowired
private JobBuilderFactory jobBuilderFactory;

@Autowired
private StepBuilderFactory stepBuilderFactory;

@Autowired
private DataSource dataSource;

@Bean
public JdbcPagingItemReader<Customer> customerPagingItemReader(){
// reading database records using JDBC in a paging fashion
JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
reader.setDataSource(this.dataSource);
reader.setFetchSize(1000);
reader.setRowMapper(new CustomerRowMapper());

// Sort Keys
Map<String, Order> sortKeys = new HashMap<>();
sortKeys.put("id", Order.ASCENDING);

// MySQL implementation of a PagingQueryProvider using database specific features.
MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
queryProvider.setSelectClause("id, firstName, lastName, birthdate");
queryProvider.setFromClause("from customer");
queryProvider.setSortKeys(sortKeys);

reader.setQueryProvider(queryProvider);

return reader;
}

@Bean
public EvenCustomerProcessor evenCustomerProcessor() {
return new EvenCustomerProcessor();
}

@Bean
public OddCustomerProcessor oddCustomerProcessor() {
return new OddCustomerProcessor();
}

@Bean
public JdbcBatchItemWriter<Customer> customerItemWriter(){
JdbcBatchItemWriter<Customer> batchItemWriter = new JdbcBatchItemWriter<>();
batchItemWriter.setDataSource(dataSource);
batchItemWriter.setSql(""); // Query Goes here
return batchItemWriter;
}

@Bean
public ClassifierCompositeItemProcessor<Customer, Customer> classifierCustomerCompositeItemProcessor() throws Exception{
ClassifierCompositeItemProcessor<Customer, Customer> itemProcessor = new ClassifierCompositeItemProcessor<>();
itemProcessor.setClassifier(new CustomerClassifier(evenCustomerProcessor(), oddCustomerProcessor()));
}

@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer> chunk(10)
.reader(customerPagingItemReader())
.processor(classifierCustomerCompositeItemProcessor())
.writer(customerItemWriter())
.build();
}

@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
}

最佳答案

您可以删除 CustomerClassifier并定义复合项目处理器如下:

@Bean
public ClassifierCompositeItemProcessor<Customer, Customer> classifierCustomerCompositeItemProcessor(
EvenCustomerProcessor evenCustomerProcessor,
OddCustomerProcessor oddCustomerProcessor
) {
ClassifierCompositeItemProcessor<Customer, Customer> itemProcessor = new ClassifierCompositeItemProcessor<>();
itemProcessor.setClassifier(new Classifier<Customer, ItemProcessor<?, ? extends Customer>>() {
@Override
public ItemProcessor<?, ? extends Customer> classify(Customer customer) {
return customer.getStatus().equals("I") ? evenCustomerProcessor : oddCustomerProcessor;
}
});
return itemProcessor;
}
然后按如下方式更新您的步骤定义:
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer> chunk(10)
.reader(customerPagingItemReader())
.processor(classifierCustomerCompositeItemProcessor(evenCustomerProcessor(), oddCustomerProcessor()))
.writer(customerItemWriter())
.build();
}

关于spring - 如何在 Spring Batch 中使用 ClassifierCompositeItemProcessor 并将数据写入同一个表以进行 Insert 和 Upsert?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64575208/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com