gpt4 book ai didi

java - Redis使用Java为每个使用者流化一条消息

转载 作者:行者123 更新时间:2023-12-03 06:42:13 27 4
gpt4 key购买 nike

我正在尝试使用Redis流实现Java应用程序,其中每个consomer都消耗一条消息。就像流水线/队列一样,每个使用者都只接收一条消息,然后对其进行处理,而在使用者结束后,则接收下一条消息,该消息在流中到目前为止尚未处理。
起作用的是,每条消息仅由一个使用者(使用xreadgroup)消耗。
我从this tutorial from redislabs开始
编码:

    RedisClient redisClient = RedisClient.create("redis://pw@host:port");
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();

try {
syncCommands.xgroupCreate(XReadArgs.StreamOffset.from(STREAM_KEY, "0-0"), ID_READ_GROUP);
} catch (RedisBusyException redisBusyException) {
System.out.println(String.format("\t Group '%s' already exists", ID_READ_GROUP));
}

System.out.println("Waiting for new messages ");

while (true) {
List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
Consumer.from(ID_READ_GROUP, ID_WORKER), ReadArgs.StreamOffset.lastConsumed(STREAM_KEY));

if (!messages.isEmpty()) {
System.out.println(messages.size()); //
for (StreamMessage<String, String> message : messages) {
System.out.println(message.getId());
Thread.sleep(5000);
syncCommands.xack(STREAM_KEY, ID_READ_GROUP, message.getId());
}
}

}
我当前的问题是,一个使用者从队列中获取一条消息以上,并且在某些情况下,其他使用者正在等待,而一个使用者一次要处理10条消息。
提前致谢!

最佳答案

注意XREADGROUP可以获取COUNT参数。
通过传递XReadArgs,在Lettuce xreadgroup中查看JavaDoc的操作方法。

关于java - Redis使用Java为每个使用者流化一条消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63447993/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com