gpt4 book ai didi

java - Kafka Streams - 按时间戳/序列保存消息?

转载 作者:太空宇宙 更新时间:2023-11-04 10:22:44 24 4
gpt4 key购买 nike

我正在 Kafka 流上接收消息。它们由用户 ID 键入。当它们进来时,会得到一个序列号和时间戳。消息在 15 分钟后“过期”。用户可以根据给定时间(最多 15 分钟)或顺序请求新消息。

我最初拥有的是这样的:

` StreamsBuilder StreamsBuilder = new StreamsBuilder();

  KStream<String, Message> inboundStream = streamsBuilder.stream("incoming.topic");
messageSupplier = Stores.persistentKeyValueStore("user.messages");

KTable<String, MessageCache> messageTable = inboundStream
.filter(this::userExists)
.peek(this::recordInboundMessage)
.map(this::markMessage) // add sequence/timestamp
.groupByKey()
.aggregate(this::createMessageCache,
this::addMessageToMessageCache,
Materialized.as(messageSupplier));

// ---> Some other setup stuff, then start the streams

`

MessageCache 保存消息列表(当我们将消息添加到缓存时,会删除过期的消息)。当我收到消息请求时,我会浏览列表并过滤出适当的消息。

我想我可以使用其中一种窗口策略,但找不到实际保留消息列表的示例。

这是最好的方法吗?或者我错过了一些可以让这变得更容易/更好的东西?

最佳答案

Is this the best way to do this? Or am I missing something that would make this easier/better?

我认为您有一个使用 native java 类的简单解决方案,可以有效地将流应用程序与您的代码桥接起来...为了简单起见,有很多话要说!我看到的唯一缺点是,如果您的事件率太高,您的用户缓存可能会超出您的内存大小。此外,如果您需要容错,流应用程序将在发生故障时在另一个应用程序实例上重建状态存储的内容。但如果这不是问题,那就去做吧!

但是,就如何在流应用程序上下文中执行此操作而言,您可以进行一些调整:

  1. 定义您想要支持的用户查询的粒度。分钟?秒?为了便于讨论,我们说几分钟。根据该粒度设置流窗口。

  2. 定义一个累加器,类似于您所拥有的累加器,它将接受用户记录并将其添加到列表中。类似于 UserRecordGroup,它具有 UserRecordList,以及将 UserRecord 附加到 List 的方法 add(UserEvent evt)

然后,您可以构建流应用程序,如下所示:

KStream<String, Message> inboundStream = streamsBuilder.stream("incoming.topic");
Materialized<String, UserRecordGroup, WindowStore<Bytes, byte[]>> userStore =
Materialized.<String, UserRecordGroup, WindowStore<Bytes,byte[]>>as("user.messages")
.withValueSerde(/* your serializers here */);


KTable<String, MessageCache> messageTable = inboundStream
.filter(this::userExists)
.peek(this::recordInboundMessage)
.map(this::markMessage) // add sequence/timestamp
.groupByKey()
.windowedBy(TimeWindows.of(ONE_MINUTE_IN_MS))
.aggregate(UserRecordGroup::new,
(key, value, agg) -> { agg.add(value); },
userStore);

最后,当您想要从商店提供查询服务时,您可以

ReadOnlyWindowStore<Integer, UserRecordGroup> store =
streams.store("user.messages", QueryableStoreTypes.windowStore());
WindowStoreIterator<UserRecordGroup> windowIterator =
store.fetch(pathHash, startTimestamp, endTimeStamp);

你的迭代器将包含不同窗口的所有记录的列表;将这些列表合并在一起,您就可以获得 startTimestamp 和 endTimestamp 之间的用户 Activity 的描述。

关于java - Kafka Streams - 按时间戳/序列保存消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50917511/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com