gpt4 book ai didi

java - 线程池并行处理消息,但保留对话中的顺序

转载 作者:太空狗 更新时间:2023-10-30 19:37:15 26 4
gpt4 key购买 nike

我需要并行处理消息,但保留具有相同对话 ID 的消息的处理顺序。

示例:
让我们像这样定义一个消息:

class Message {
Message(long id, long conversationId, String someData) {...}
}

假设消息按以下顺序到达:
消息(1, 1, "a1"), 消息(2, 2, "a2"), 消息(3, 1, "b1"), 消息(4, 2, "b2") .

我需要在消息 1 之后处理消息 3,因为消息 1 和 3 具有相同的对话 ID(类似地,出于同样的原因,消息 4 应该在 2 之后处理)。
我不关心例如之间的相对顺序。 1 和 2,因为它们具有不同的对话 ID。

我想尽可能重用 java ThreadPoolExecutor 的功能,以避免在我的代码中手动替换死线程等。

更新:可能的“conversation-ids”数量没有限制,对话也没有时间限制。 (我个人不认为这是个问题,因为我可以有一个从 conversationId 到工作人员编号的简单映射,例如 conversationId % totalWorkers)。

更新 2:多队列解决方案存在一个问题,其中队列编号由例如'index = Objects.hash(conversationId) % total':如果处理某些消息需要很长时间,则所有具有相同'index'但不同'< em>conversationId' 将等待,即使其他线程可用于处理它。也就是说,我认为具有单个智能阻塞队列的解决方案会更好,但这只是一个意见,我对任何好的解决方案持开放态度。

你看到这个问题的优雅解决方案了吗?

最佳答案

前段时间我不得不做一些非常相似的事情,所以这是一个改编。

( See it in action online )

这实际上是完全相同的基本需求,但在我的例子中,键是一个字符串,更重要的是键集不会无限增长,所以我必须在这里添加一个“清理调度程序”。除此之外,它基本上是相同的代码,所以我希望我在适应过程中没有丢失任何严重的东西。我测试了它,看起来它有效。不过,它比其他解决方案更长,也许更复杂......

基本思路:

  • MessageTask将消息包装到 Runnable 中, 完成后通知队列
  • ConvoQueue :阻塞消息队列,用于对话。充当保证所需顺序的预排队。特别是这个三重奏:ConvoQueue.runNextIfPossible()MessageTask.run()ConvoQueue.complete() → ……
  • MessageProcessor有一个 Map<Long, ConvoQueue> , 和一个 ExecutorService
  • 消息由执行器中的任何线程处理,ConvoQueue让我们喂 ExecutorService并保证每个 convo 的消息顺序,但不是全局的(因此 “困难”消息不会阻止处理其他对话,这与其他一些解决方案不同,并且该属性在我们的案例中至关重要 - 如果这对你来说不是那么重要,也许更简单的解决方案更好)
  • ScheduledExecutorService清理(占用 1 个线程)

视觉上:

   ConvoQueues              ExecutorService's internal queue
(shared, but has at most 1 MessageTask per convo)
Convo 1 ########
Convo 2 #####
Convo 3 ####### Thread 1
Convo 4 } → #### → {
Convo 5 ### Thread 2
Convo 6 #########
Convo 7 #####

(Convo 4 is about to be deleted)

所有类下面(MessageProcessorTest可以直接执行):

// MessageProcessor.java
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import static java.util.concurrent.TimeUnit.SECONDS;

public class MessageProcessor {

private static final long CLEANUP_PERIOD_S = 10;
private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>();
private final ExecutorService executorService;

public MessageProcessor(int nbThreads) {
executorService = Executors.newFixedThreadPool(nbThreads);
ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1);
cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS);
}

public void addMessageToProcess(Message message) {
ConvoQueue queue = getQueue(message.getConversationId());
queue.addMessage(message);
}

private ConvoQueue getQueue(Long convoId) {
synchronized (queuesByConvo) {
return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService));
}
}

private void removeEmptyQueues() {
synchronized (queuesByConvo) {
queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty());
}
}

}


// ConvoQueue.java
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;

class ConvoQueue {

private Queue<MessageTask> queue;
private MessageTask activeTask;
private ExecutorService executorService;

ConvoQueue(ExecutorService executorService) {
this.executorService = executorService;
this.queue = new LinkedBlockingQueue<>();
}

private void runNextIfPossible() {
synchronized(this) {
if (activeTask == null) {
activeTask = queue.poll();
if (activeTask != null) {
executorService.submit(activeTask);
}
}
}
}

void complete(MessageTask task) {
synchronized(this) {
if (task == activeTask) {
activeTask = null;
runNextIfPossible();
}
else {
throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task);
}
}
}

boolean isEmpty() {
return queue.isEmpty();
}

void addMessage(Message message) {
add(new MessageTask(this, message));
}

private void add(MessageTask task) {
synchronized(this) {
queue.add(task);
runNextIfPossible();
}
}

}

// MessageTask.java
public class MessageTask implements Runnable {

private ConvoQueue convoQueue;
private Message message;

MessageTask(ConvoQueue convoQueue, Message message) {
this.convoQueue = convoQueue;
this.message = message;
}

@Override
public void run() {
try {
processMessage();
}
finally {
convoQueue.complete(this);
}
}

private void processMessage() {
// Dummy processing with random delay to observe reordered messages & preserved convo order
try {
Thread.sleep((long) (50*Math.random()));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(message);
}

}

// Message.java
class Message {

private long id;
private long conversationId;
private String data;

Message(long id, long conversationId, String someData) {
this.id = id;
this.conversationId = conversationId;
this.data = someData;
}

long getConversationId() {
return conversationId;
}

String getData() {
return data;
}

public String toString() {
return "Message{" + id + "," + conversationId + "," + data + "}";
}
}

// MessageProcessorTest.java
public class MessageProcessorTest {
public static void main(String[] args) {
MessageProcessor test = new MessageProcessor(2);
for (int i=1; i<100; i++) {
test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i));
}
}
}

输出(对于每个 convo ID(第二个字段)顺序被保留):

Message{1002,2,hi 2}
Message{1001,1,hi 1}
Message{1004,4,hi 4}
Message{1003,3,hi 3}
Message{1005,5,hi 5}
Message{1006,6,hi 6}
Message{1009,2,hi 9}
Message{1007,0,hi 7}
Message{1008,1,hi 8}
Message{1011,4,hi 11}
Message{1010,3,hi 10}
...
Message{1097,6,hi 97}
Message{1095,4,hi 95}
Message{1098,0,hi 98}
Message{1099,1,hi 99}
Message{1096,5,hi 96}

上面的测试让我有信心分享它,但我有点担心我可能忘记了病理病例的细节。它已经在生产环境中运行多年而没有任何障碍(尽管有更多的代码允许我们在需要查看正在发生的事情时实时检查它,为什么某个队列需要时间等等 - 上面的系统本身从来没有问题,但是有时与特定任务的处理)

编辑:click here在线测试。备选方案:复制 that gistthere ,然后按“编译并执行”。

关于java - 线程池并行处理消息,但保留对话中的顺序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44327619/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com