- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在 Kafka 流上接收消息。它们由用户 ID 键入。当它们进来时,会得到一个序列号和时间戳。消息在 15 分钟后“过期”。用户可以根据给定时间(最多 15 分钟)或顺序请求新消息。
我最初拥有的是这样的:
` StreamsBuilder StreamsBuilder = new StreamsBuilder();
KStream<String, Message> inboundStream = streamsBuilder.stream("incoming.topic");
messageSupplier = Stores.persistentKeyValueStore("user.messages");
KTable<String, MessageCache> messageTable = inboundStream
.filter(this::userExists)
.peek(this::recordInboundMessage)
.map(this::markMessage) // add sequence/timestamp
.groupByKey()
.aggregate(this::createMessageCache,
this::addMessageToMessageCache,
Materialized.as(messageSupplier));
// ---> Some other setup stuff, then start the streams
`
MessageCache
保存消息列表(当我们将消息添加到缓存时,会删除过期的消息)。当我收到消息请求时,我会浏览列表并过滤出适当的消息。
我想我可以使用其中一种窗口策略,但找不到实际保留消息列表的示例。
这是最好的方法吗?或者我错过了一些可以让这变得更容易/更好的东西?
最佳答案
Is this the best way to do this? Or am I missing something that would make this easier/better?
我认为您有一个使用 native java 类的简单解决方案,可以有效地将流应用程序与您的代码桥接起来...为了简单起见,有很多话要说!我看到的唯一缺点是,如果您的事件率太高,您的用户缓存可能会超出您的内存大小。此外,如果您需要容错,流应用程序将在发生故障时在另一个应用程序实例上重建状态存储的内容。但如果这不是问题,那就去做吧!
但是,就如何在流应用程序上下文中执行此操作而言,您可以进行一些调整:
定义您想要支持的用户查询的粒度。分钟?秒?为了便于讨论,我们说几分钟。根据该粒度设置流窗口。
定义一个累加器,类似于您所拥有的累加器,它将接受用户记录并将其添加到列表中。类似于 UserRecordGroup
,它具有 UserRecord
的 List
,以及将 UserRecord
附加到 List
的方法 add(UserEvent evt)
。
然后,您可以构建流应用程序,如下所示:
KStream<String, Message> inboundStream = streamsBuilder.stream("incoming.topic");
Materialized<String, UserRecordGroup, WindowStore<Bytes, byte[]>> userStore =
Materialized.<String, UserRecordGroup, WindowStore<Bytes,byte[]>>as("user.messages")
.withValueSerde(/* your serializers here */);
KTable<String, MessageCache> messageTable = inboundStream
.filter(this::userExists)
.peek(this::recordInboundMessage)
.map(this::markMessage) // add sequence/timestamp
.groupByKey()
.windowedBy(TimeWindows.of(ONE_MINUTE_IN_MS))
.aggregate(UserRecordGroup::new,
(key, value, agg) -> { agg.add(value); },
userStore);
最后,当您想要从商店提供查询服务时,您可以
ReadOnlyWindowStore<Integer, UserRecordGroup> store =
streams.store("user.messages", QueryableStoreTypes.windowStore());
WindowStoreIterator<UserRecordGroup> windowIterator =
store.fetch(pathHash, startTimestamp, endTimeStamp);
你的迭代器将包含不同窗口的所有记录的列表;将这些列表合并在一起,您就可以获得 startTimestamp 和 endTimestamp 之间的用户 Activity 的描述。
关于java - Kafka Streams - 按时间戳/序列保存消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50917511/
给定一个带有多个 date_time 戳的字符串,我想 提取第一个戳及其前面的文本 候选字符串可以有一个或多个时间戳 后续的 date_time 戳记将被 sep="-" 隔开 后续date_time
是否可以合并从相机拍摄的文本和照片?我想在照片上标记日期和时间,但我在 Google 上找不到任何内容。 最佳答案 使用下面的代码来实现你所需要的。 Bitmap src = Bitm
有没有办法通过 Graph API 戳另一个用户?基于this post ,并使用 Graph Explorer ,我发布到“/USERID/pokes”,我已经授予它(Graph API 应用程序和
我有两个向左浮动的元素。一个是 body 的第一个 child ,另一个是容器的第一个 child ,容器是 body 的第二个 child 。 ...
我是一名优秀的程序员,十分优秀!