- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试实现一种机制,其中可运行对象既是生产者又是消费者;
情况是-
我需要从数据库中批量读取记录,并进行处理。我正在尝试使用生产者消费者模式。我得到一批,我处理。获取一批,进行处理。每当它看到队列为空时就会得到一个批处理。其中一个线程去获取东西。但问题是我无法标记为处理而获取的记录,这是我的限制。因此,如果我们在完全提交前一批之前获取下一批,我可能会再次获取相同的记录。因此,我需要能够在拉动另一个之前完全提交前一个。我很困惑我应该在这里做什么。我尝试保留所获取的计数,然后保留我的获取,直到也达到该计数。
处理这种情况的最佳方法是什么?以 block 的形式处理数据库中的记录 - 我这里最大的限制是我无法标记已拾取的记录。所以,我希望批处理按顺序进行。但批处理应该在内部使用多线程。
public class DealStoreEnricher extends AsyncExecutionSupport {
private static final int BATCH_SIZE = 5000;
private static final Log log = LogFactory.getLog(DealStoreEnricher.class);
private final DealEnricher dealEnricher;
private int concurrency = 10;
private final BlockingQueue<QueryDealRecord> dealsToBeEnrichedQueue;
private final BlockingQueue<QueryDealRecord> dealsEnrichedQueue;
private DealStore dealStore;
private ExtractorProcess extractorProcess;
ExecutorService executor;
public DealStoreEnricher(DealEnricher dealEnricher, DealStore dealStore, ExtractorProcess extractorProcess) {
this.dealEnricher = dealEnricher;
this.dealStore = dealStore;
this.extractorProcess = extractorProcess;
dealsToBeEnrichedQueue = new LinkedBlockingQueue<QueryDealRecord>();
dealsEnrichedQueue = new LinkedBlockingQueue<QueryDealRecord>(BATCH_SIZE * 3);
}
public ExtractorProcess getExtractorProcess() {
return extractorProcess;
}
public DealEnricher getDealEnricher() {
return dealEnricher;
}
public int getConcurrency() {
return concurrency;
}
public void setConcurrency(int concurrency) {
this.concurrency = concurrency;
}
public DealStore getDealStore() {
return dealStore;
}
public DealStoreEnricher withConcurrency(int concurrency) {
setConcurrency(concurrency);
return this;
}
@Override
public void start() {
super.start();
executor = Executors.newFixedThreadPool(getConcurrency());
for (int i = 0; i < getConcurrency(); i++)
executor.submit(new Runnable() {
public void run() {
try {
QueryDealRecord record = null;
while ((record = get()) != null && !isCancelled()) {
try {
update(getDealEnricher().enrich(record));
processed.incrementAndGet();
} catch (Exception e) {
failures.incrementAndGet();
log.error("Failed to process deal: " + record.getTradeId(), e);
}
}
} catch (InterruptedException e) {
setCancelled();
}
}
});
executor.shutdown();
}
protected void update(QueryDealRecord enrichedRecord) {
dealsEnrichedQueue.add(enrichedRecord);
if (batchComplete()) {
List<QueryDealRecord> enrichedRecordsBatch = new ArrayList<QueryDealRecord>();
synchronized (this) {
dealsEnrichedQueue.drainTo(enrichedRecordsBatch);
}
if (!enrichedRecordsBatch.isEmpty())
updateTheDatabase(enrichedRecordsBatch);
}
}
private void updateTheDatabase(List<QueryDealRecord> enrichedRecordsBatch) {
getDealStore().insertEnrichedData(enrichedRecordsBatch, getExtractorProcess());
}
/**
* @return true if processed records have reached the batch size or there's
* nothing to be processed now.
*/
private boolean batchComplete() {
return dealsEnrichedQueue.size() >= BATCH_SIZE || dealsToBeEnrichedQueue.isEmpty();
}
/**
* Gets an item from the queue of things to be enriched
*
* @return {@linkplain QueryDealRecord} to be enriched
* @throws InterruptedException
*/
protected synchronized QueryDealRecord get() throws InterruptedException {
try {
if (!dealsToBeEnrichedQueue.isEmpty()) {
return dealsToBeEnrichedQueue.take();
} else {
List<QueryDealRecord> records = getNextBatchToBeProcessed();
if (!records.isEmpty()) {
dealsToBeEnrichedQueue.addAll(records);
return dealsToBeEnrichedQueue.take();
}
}
} catch (InterruptedException ie) {
throw new UnRecoverableException("Unable to retrieve QueryDealRecord", ie);
}
return null;
}
private List<QueryDealRecord> getNextBatchToBeProcessed() {
List<QueryDealRecord> recordsThatNeedEnriching = getDealStore().getTheRecordsThatNeedEnriching(getExtractorProcess());
return recordsThatNeedEnriching;
}
@Override
public void stop() {
super.stop();
if (executor != null)
executor.shutdownNow();
}
@Override
public boolean await() throws InterruptedException {
return executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS) && !isCancelled() && complete();
}
@Override
public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
return executor.awaitTermination(timeout, unit) && !isCancelled() && complete();
}
private boolean complete() {
setCompleted();
return true;
}
}
最佳答案
您已经在使用 BlockingQueue
- 它可以为您完成所有工作。
但是,您使用了错误的方法 addAll()
向队列添加新元素。如果队列无法接受元素,该方法将引发异常。相反,您应该使用 put()
,因为这是与您正确使用的 take()
对应的阻塞方法。
关于您在帖子标题中的陈述:
second batch shouldn't come until the previous batch is complete
如果正确使用 BlockingQueue,则无需关心传入批处理与传出批处理的时间。
关于java - 生产者批量消费;在前一批完成之前,第二批不应到来,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27445340/
这是我在阅读了几个关于 jpa 批量插入的主题后创建的简单示例,我有 2 个持久对象用户和站点。一个用户可以有多个站点,所以我们在这里有一对多的关系。假设我想创建用户并将多个站点创建/链接到用户帐户。
我有文档列表(对象),该对象有多个文档,即存在 Json 记录,但是当我尝试上传文档束(记录)时,它没有上传到文档数据库,但当我上传单个文档记录时,它上传成功。 List listObj = ne
我希望进行批量域名查找,看看是否有一些域名可供购买。我找不到 perl 模块,但似乎应该有一种方法可以在 perl 中执行此操作。我正在寻找免费的东西。谢谢! 最佳答案 从这里:http://www.
我制作了一个批处理类来检查 FTP 上的文件、下载它们并在 FTP 上删除它们。 当我手动运行它(不是批量运行)时,它运行完美,下载 FTP 中的所有文件并在下载完成后删除它们。 当我尝试批量运行时,
我有一个 *+* 形式的字符串 base。我想得到+之前的所有内容。例如,如果 base=foo+bar,我想获取 foo。 我尝试过使用字符串替换来实现 set left=%base:+*=% 但这
我需要创建几十个表,并且我需要它们是innodb, 有没有办法做到这一点,而不是将 engine=innodb 附加到每个 create table 语句? 最佳答案 可以在服务器级别指定默认引擎,在
我正在尝试制作显示 unix/linux 提示符的 dos shell。代码是: @echo off :hi set tmpdrv=%cd:~0,2% if %homedrive% == %tmpdr
我有以下代码,基本上是在二维矩阵的每一行上进行一维卷积。卷积核是一样的。所以真的是 SIMD 案例。 a = [ 1,2,3,4,5; 6,7,8,9,7; 7,6
情况: 我尝试在 shell 中的循环内移动文件,但我的代码无法正常工作。 for /D %%F in (*) do ( if "%%F" NEQ "%directoryToPutFilesIn
目录包含 2 个(或更多)任意名称的视频文件。 video1.mkv video2.mkv 需要找出每个视频的持续时间。为此,我们使用 MediaInfo . setlocal EnableDelay
如何在 Windows 中批量删除数千个文件中的空格(而不是替换为下划线)?我可以从 DOS 命令执行此操作吗? 目前: file one.mp3 file two.mp3 所有文件需要变成: fil
我想创建一个批处理文件,它读取 2 个不同的值,并根据它们的比较方式进行相应处理。但是,比较永远不会起作用。代码是: REM string1 and string2 contain the follo
我正在尝试将一个文件夹的子文件夹复制到许多其他名称未知的文件夹中。目的是在所有使用它的员工文件夹中备份程序的源文件。如果在员工文件夹中找不到程序文件夹,则不应执行任何操作。这看起来如下: 来源: F:
我正在寻找一种简单的方法来检测一小段文本(几句话)是否为英语。在我看来,这个问题比尝试检测任意语言要容易得多。有没有可以做到这一点的软件?我正在用 python 编写,并且更喜欢 python 库,但
我们正在尝试向 8k 种不同的设备发送促销推送消息。我们正在成功响应推送通知 URL https://fcm.googleapis.com/fcm/send 但只有部分用户收到此通知,并非全部。那么
基本上我只是用这一段来替换我的 var 中的一个字符串,但我无法让嵌套延迟扩展正常工作。这甚至可能吗? set replace=!replace:!search!=!replaceVal!! 我知道执
如何使用 ffmpeg 对一批视频文件进行编码,使用相同的设置? 我找到了 one-line solution将当前文件夹中的 .avi 文件转换为 .mov。请注意,我要编码 .mov -> .mo
我正在尝试制作一个批处理文件,每次循环时都会将变量增加 1,然后检查变量是否等于 5,如果不是,则再次循环。我知道这可能有一个 while 循环,但我不知道如何做到这一点,我现在只是享受学习 Batc
我正在尝试创建一个循环,读取多个 CSV 文件,这些文件都具有相同类型的气温数据。但是,我想跳过数据上方的行。这些是数据集中的“警报”。每个文件可能有不同数量的警报,因此要跳过不同数量的行。见下文:
因此,我正在批量创建一个Mail程序,而消息传递部分出现了问题。 消息传递部分是无限循环。 当我输入多个单词时,它会崩溃。 这是代码。请帮忙! :rep set line= set /p line=
我是一名优秀的程序员,十分优秀!