gpt4 book ai didi

java - 允许所有工作人员使用全局消费率的队列服务器

转载 作者:塔克拉玛干 更新时间:2023-11-03 06:36:37 24 4
gpt4 key购买 nike

我的服务器需要处理许多任务。由于工作人员需要满足的 API 调用速率限制,这些任务必须以特定的给定速率处理。

为了保证这些任务不会以高于 API 速率限制的速率执行,我希望能够配置队列发送消息以进行处理的速率。

此外,该队列必须保持推送消息的顺序并以 FIFO 顺序释放它们以提供公平性。

最后,如果编码明智,这在使用时将是透明的,那么客户端将调用 API 将消息发送到队列,并且同一客户端将在消息被释放后接收回消息,这将是很棒的根据工作进度和相关顺序排队。例如使用 RxJava

waitForMessageToBeReleased(message, queue)
.subscribe(message -> // do some stuff) // message received to the same
client after it was released by the queue according to the defined work rate.

我目前正在使用 Redis 通过创建一个具有特定数量的 TTL 的变量来控制执行速率,其他调用会一直等到该变量过期。但是,它不处理订单,并可能导致客户在高负载的情况下饿死。

最佳答案

Cadence Workflow能够以最小的努力支持您的用例。

这里有一个稻草人设计可以满足你的要求:

  • 使用 userID 作为工作流 ID 向用户工作流发送 signalWithStart 请求。它要么将信号传递给工作流,要么首先启动工作流并向其传递信号。
  • 对该工作流的所有请求都由它缓冲。 Cadence 提供了硬性保证,即只有一个具有给定 ID 的工作流才能处于打开状态。因此,所有信号(事件)都保证在属于用户的工作流中进行缓冲。
  • 内部工作流事件循环逐一分派(dispatch)这些请求。
  • 当缓冲区为空时,工作流可以完成。

下面是用Java实现的工作流代码(也支持Go客户端):

public interface SerializedExecutionWorkflow {

@WorkflowMethod
void execute();

@SignalMethod
void addTask(Task t);
}

public interface TaskProcessorActivity {
@ActivityMethod
void process(Task poll);
}

public class SerializedExecutionWorkflowImpl implements SerializedExecutionWorkflow {

private final Queue<Task> taskQueue = new ArrayDeque<>();
private final TaskProcesorActivity processor = Workflow.newActivityStub(TaskProcesorActivity.class);

@Override
public void execute() {
while(!taskQueue.isEmpty()) {
processor.process(taskQueue.poll());
}
}

@Override
public void addTask(Task t) {
taskQueue.add(t);
}
}

然后是通过信号方法将该任务排入工作流的代码:

private void addTask(WorkflowClient cadenceClient, Task task) {
// Set workflowId to userId
WorkflowOptions options = new WorkflowOptions.Builder().setWorkflowId(task.getUserId()).build();
// Use workflow interface stub to start/signal workflow instance
SerializedExecutionWorkflow workflow = cadenceClient.newWorkflowStub(SerializedExecutionWorkflow.class, options);
BatchRequest request = cadenceClient.newSignalWithStartRequest();
request.add(workflow::execute);
request.add(workflow::addTask, task);
cadenceClient.signalWithStart(request);
}

与使用队列进行任务处理相比,Cadence 提供了许多其他优势。

  • 构建了具有无限到期间隔的指数重试
  • 故障处理。例如,如果在配置的时间间隔内两次更新均未成功,它允许执行通知另一项服务的任务。
  • 支持长时间运行的心跳操作
  • 能够实现复杂的任务依赖关系。例如,在发生不可恢复的故障时实现调用链或补偿逻辑 (SAGA)
  • 提供对当前更新状态的完整可见性。例如,当使用队列时,您知道队列中是否有一些消息,并且您需要额外的数据库来跟踪整体进度。使用 Cadence,每个事件都会被记录下来。
  • 能够在飞行中取消更新。
  • 分布式 CRON 支持

参见 the presentation这涵盖了 Cadence 编程模型。

关于java - 允许所有工作人员使用全局消费率的队列服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56725284/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com