gpt4 book ai didi

java - 生产者批量消费;在前一批完成之前,第二批不应到来

转载 作者:行者123 更新时间:2023-12-01 22:32:28 24 4
gpt4 key购买 nike

我正在尝试实现一种机制,其中可运行对象既是生产者又是消费者;

情况是-

我需要从数据库中批量读取记录,并进行处理。我正在尝试使用生产者消费者模式。我得到一批,我处理。获取一批,进行处理。每当它看到队列为空时就会得到一个批处理。其中一个线程去获取东西。但问题是我无法标记为处理而获取的记录,这是我的限制。因此,如果我们在完全提交前一批之前获取下一批,我可能会再次获取相同的记录。因此,我需要能够在拉动另一个之前完全提交前一个。我很困惑我应该在这里做什么。我尝试保留所获取的计数,然后保留我的获取,直到也达到该计数。

处理这种情况的最佳方法是什么?以 block 的形式处理数据库中的记录 - 我这里最大的限制是我无法标记已拾取的记录。所以,我希望批处理按顺序进行。但批处理应该在内部使用多线程。

public class DealStoreEnricher extends AsyncExecutionSupport {
private static final int BATCH_SIZE = 5000;
private static final Log log = LogFactory.getLog(DealStoreEnricher.class);
private final DealEnricher dealEnricher;
private int concurrency = 10;
private final BlockingQueue<QueryDealRecord> dealsToBeEnrichedQueue;
private final BlockingQueue<QueryDealRecord> dealsEnrichedQueue;
private DealStore dealStore;
private ExtractorProcess extractorProcess;
ExecutorService executor;

public DealStoreEnricher(DealEnricher dealEnricher, DealStore dealStore, ExtractorProcess extractorProcess) {
this.dealEnricher = dealEnricher;
this.dealStore = dealStore;
this.extractorProcess = extractorProcess;
dealsToBeEnrichedQueue = new LinkedBlockingQueue<QueryDealRecord>();
dealsEnrichedQueue = new LinkedBlockingQueue<QueryDealRecord>(BATCH_SIZE * 3);
}

public ExtractorProcess getExtractorProcess() {
return extractorProcess;
}

public DealEnricher getDealEnricher() {
return dealEnricher;
}

public int getConcurrency() {
return concurrency;
}

public void setConcurrency(int concurrency) {
this.concurrency = concurrency;
}

public DealStore getDealStore() {
return dealStore;
}

public DealStoreEnricher withConcurrency(int concurrency) {
setConcurrency(concurrency);
return this;
}

@Override
public void start() {
super.start();
executor = Executors.newFixedThreadPool(getConcurrency());
for (int i = 0; i < getConcurrency(); i++)
executor.submit(new Runnable() {
public void run() {
try {
QueryDealRecord record = null;
while ((record = get()) != null && !isCancelled()) {
try {
update(getDealEnricher().enrich(record));
processed.incrementAndGet();
} catch (Exception e) {
failures.incrementAndGet();
log.error("Failed to process deal: " + record.getTradeId(), e);
}
}
} catch (InterruptedException e) {
setCancelled();
}
}
});

executor.shutdown();
}

protected void update(QueryDealRecord enrichedRecord) {
dealsEnrichedQueue.add(enrichedRecord);
if (batchComplete()) {
List<QueryDealRecord> enrichedRecordsBatch = new ArrayList<QueryDealRecord>();
synchronized (this) {
dealsEnrichedQueue.drainTo(enrichedRecordsBatch);
}
if (!enrichedRecordsBatch.isEmpty())
updateTheDatabase(enrichedRecordsBatch);
}
}

private void updateTheDatabase(List<QueryDealRecord> enrichedRecordsBatch) {
getDealStore().insertEnrichedData(enrichedRecordsBatch, getExtractorProcess());
}

/**
* @return true if processed records have reached the batch size or there's
* nothing to be processed now.
*/
private boolean batchComplete() {
return dealsEnrichedQueue.size() >= BATCH_SIZE || dealsToBeEnrichedQueue.isEmpty();
}

/**
* Gets an item from the queue of things to be enriched
*
* @return {@linkplain QueryDealRecord} to be enriched
* @throws InterruptedException
*/
protected synchronized QueryDealRecord get() throws InterruptedException {
try {
if (!dealsToBeEnrichedQueue.isEmpty()) {
return dealsToBeEnrichedQueue.take();
} else {
List<QueryDealRecord> records = getNextBatchToBeProcessed();
if (!records.isEmpty()) {
dealsToBeEnrichedQueue.addAll(records);
return dealsToBeEnrichedQueue.take();
}
}
} catch (InterruptedException ie) {
throw new UnRecoverableException("Unable to retrieve QueryDealRecord", ie);
}
return null;
}

private List<QueryDealRecord> getNextBatchToBeProcessed() {


List<QueryDealRecord> recordsThatNeedEnriching = getDealStore().getTheRecordsThatNeedEnriching(getExtractorProcess());

return recordsThatNeedEnriching;
}

@Override
public void stop() {
super.stop();
if (executor != null)
executor.shutdownNow();
}

@Override
public boolean await() throws InterruptedException {
return executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS) && !isCancelled() && complete();
}

@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(timeout, unit) && !isCancelled() && complete();
}

private boolean complete() {
setCompleted();
return true;
}

}

最佳答案

您已经在使用 BlockingQueue - 它可以为您完成所有工作。

但是,您使用了错误的方法 addAll() 向队列添加新元素。如果队列无法接受元素,该方法将引发异常。相反,您应该使用 put(),因为这是与您正确使用的 take() 对应的阻塞方法。

关于您在帖子标题中的陈述:

second batch shouldn't come until the previous batch is complete

如果正确使用 BlockingQueue,则无需关心传入批处理与传出批处理的时间。

关于java - 生产者批量消费;在前一批完成之前,第二批不应到来,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27445340/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com