gpt4 book ai didi

spring - ElasticsearchItemReader继续读取相同的记录

转载 作者:行者123 更新时间:2023-12-02 23:51:55 24 4
gpt4 key购买 nike

我真的是Spring的初学者,我必须使用spring-batch开发应用程序。该应用程序必须从elasticsearch索引中读取并将所有记录写入File。

当我运行程序时,没有任何错误,应用程序读取记录并将它们正确地写入文件中。问题是应用程序永不停止,并且继续读取,处理和写入数据而不会结束。在下图中,您可以看到相同的记录正在处理多次。

enter image description here

我认为在我的代码或软件设计中一定有问题,因此在下面附加了我代码中最重要的部分。

我开发了以下ElasticsearchItemReader:

public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

private final Logger logger;

private final ElasticsearchOperations elasticsearchOperations;

private final SearchQuery query;

private final Class<? extends T> targetType;

public ElasticsearchItemReader(ElasticsearchOperations elasticsearchOperations, SearchQuery query, Class<? extends T> targetType) {
setName(getShortName(getClass()));
logger = getLogger(getClass());
this.elasticsearchOperations = elasticsearchOperations;
this.query = query;
this.targetType = targetType;
}

@Override
public void afterPropertiesSet() throws Exception {
state(elasticsearchOperations != null, "An ElasticsearchOperations implementation is required.");
state(query != null, "A query is required.");
state(targetType != null, "A target type to convert the input into is required.");
}

@Override
@SuppressWarnings("unchecked")
protected Iterator<T> doPageRead() {

logger.debug("executing query {}", query.getQuery());

return (Iterator<T>)elasticsearchOperations.queryForList(query, targetType).iterator();
}
}

我还编写了以下ReadWriterConfig:
@Configuration
public class ReadWriterConfig {

@Bean
public ElasticsearchItemReader<AnotherElement> elasticsearchItemReader() {

return new ElasticsearchItemReader<>(elasticsearchOperations(), query(), AnotherElement.class);
}


@Bean
public SearchQuery query() {

NativeSearchQueryBuilder builder = new NativeSearchQueryBuilder()
.withQuery(matchAllQuery());

return builder.build();
}

@Bean
public ElasticsearchOperations elasticsearchOperations() {

Client client = null;
try {
Settings settings = Settings.builder()
.build();

client = new PreBuiltTransportClient(settings)
.addTransportAddress(new TransportAddress(InetAddress.getByName("localhost"), 9300));
return new ElasticsearchTemplate(client);
} catch (UnknownHostException e) {
e.printStackTrace();
return null;
}


}
}

我编写了批处理配置,在其中我称为读取器,写入器和处理器:
@Configuration
@EnableBatchProcessing
public class BatchConfiguration {

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

// tag::readerwriterprocessor[]
@Bean
public ElasticsearchItemReader<AnotherElement> reader() {
return new ReadWriterConfig().elasticsearchItemReader();
}

@Bean
public PersonItemProcessor processor() {
return new PersonItemProcessor();
}

@Bean
public FlatFileItemWriter itemWriter() {
return new FlatFileItemWriterBuilder<AnotherElement>()
.name("itemWriter")
.resource(new FileSystemResource("target/output.txt"))
.lineAggregator(new PassThroughLineAggregator<>())
.build();
}

// end::readerwriterprocessor[]

// tag::jobstep[]
@Bean
public Job importUserJob(JobCompletionNotificationListener listener, Step stepA) {
return jobBuilderFactory.get("importUserJob")
.flow(stepA)
.end()
.build();
}



@Bean
public Step stepA(FlatFileItemWriter<AnotherElement> writer) {
return stepBuilderFactory.get("stepA")
.<AnotherElement, AnotherElement> chunk(10)
.reader(reader())
.processor(processor())
.writer(itemWriter())
.build();
}
// end::jobstep[]

}

我附上了一些我曾经写过以下代码的网站:

https://github.com/spring-projects/spring-batch-extensions/blob/master/spring-batch-elasticsearch/README.md

https://spring.io/guides/gs/batch-processing/

最佳答案

您的读者应该为Iterator的每次调用返回一个doPageRead(),以便可以迭代数据集的一页。由于您没有将Elasticsearch查询的结果拆分为页面,而是一步查询整个集合,因此您将在第一次调用doPageRead()时返回整个结果集的迭代器。然后在下一个调用中,您将再次在完全相同的结果集上返回迭代器。

因此,您必须跟踪是否已经返回了迭代器,例如:

public class ElasticsearchItemReader<T> extends AbstractPaginatedDataItemReader<T> implements InitializingBean {

// leaving out irrelevant parts

boolean doPageReadCalled = false;

@Override
@SuppressWarnings("unchecked")
protected Iterator<T> doPageRead() {

if(doPageReadCalled) {
return null;
}

doPageReadCalled = true

return (Iterator<T>)elasticsearchOperations.queryForList(query, targetType).iterator();
}
}

在第一个调用中,将标志设置为 true,然后返回迭代器,在下一个调用中,您将看到已经返回数据并返回 null

这是一个非常基本的解决方案,具体取决于您从Elasticsearch获得的数据量,使用滚动api查询并返回页面直到处理完所有内容可能会更好。

关于spring - ElasticsearchItemReader继续读取相同的记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56976893/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com