gpt4 book ai didi

java - 在java中实现微批处理

转载 作者:行者123 更新时间:2023-11-30 02:10:38 25 4
gpt4 key购买 nike

我正在开发一个基于kafka的应用程序,其中kafka监听器将监听记录;一旦kafka收到记录,我可能需要将记录写入文件。在这里,为了将记录写入文件,我们希望使用带有批量大小和超时设置的微批处理。例如,批量大小为 10,超时设置为 1000 毫秒,这意味着在写入文件之前等待 10 条记录,等待时间为 1000 毫秒。如果在任何情况下 Kafka 在 1000 毫秒内仅收到 5 条记录,则在该批处理中仅写入 5 条记录。

我在 Java 中做到这一点的效率如何。

最佳答案

在这种情况下,常见的方法之一是将所有记录放入队列中。当队列大小达到 10 或 1000 毫秒后,有一个线程将获取此记录,具体取决于先发生的情况。

消费者代码:

 CountDownLatch countDownLatch = new CountDownLatch(10);
countDownLatch.await(1000, TimeUnit.MILLISECONDS);
int queueSize = queue.size();
for(int i = 0; i < queueSize; ++i) {
... do your work here or put in a batch a do it right after loop
}

生产者代码:

 Record record = ...receive new record...
queue.put(record);
consumer.getCountDownLatch().countDown();

作为队列,我建议使用未绑定(bind)的队列,例如 LinkedTransferQueue,因为您不想在达到 10 个任务时停止生产者,但您仍然需要使用来自 kafka 的结果。

另一个选项是 reactive streams .

关于java - 在java中实现微批处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50191797/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com