gpt4 book ai didi

java - Apache Kafka 根据消息的值对窗口消息进行排序

转载 作者:塔克拉玛干 更新时间:2023-11-03 05:16:12 24 4
gpt4 key购买 nike

我正在尝试找到一种方法来重新排序主题分区内的消息并将排序后的消息发送到新主题。

我有 Kafka 发布者发送以下格式的字符串消息: {system_timestamp}-{event_name}?{parameters}

例如:

1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3

此外,我们为每条消息添加一些消息 key ,以将它们发送到相应的分区。

我想做的是根据消息的 {system-timestamp} 部分并在 1 分钟的窗口内重新排序事件,因为我们的发布者不保证消息将在根据 {system-timestamp} 值。

例如,我们可以向主题传递一条具有更大 {system-timestamp} 值的消息。

我研究了 Kafka Stream API 并找到了一些关于消息窗口和聚合的示例:

Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-sorter");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> stream = builder.stream("events");
KGroupedStream<String>, String> groupedStream = stream.groupByKey();//grouped events within partion.

/* commented since I think that I don't need any aggregation, but I guess without aggregation I can't use time windowing.
KTable<Windowed<String>, String> windowedEvents = stream.groupByKey().aggregate(
() -> "", // initial value
(aggKey, value, aggregate) -> aggregate + "", // aggregating value
TimeWindows.of(1000), // intervals in milliseconds
Serdes.String(), // serde for aggregated value
"test-store"
);*/

但是接下来我应该如何处理这个分组流?我没有看到任何可用的“sort() (e1,e2) -> e1.compareTo(e2)”方法,也可以将窗口应用于 aggregation() , reduce() ,count() ,但我认为我不需要任何消息数据操作。

如何在 1 分钟窗口内重新排序消息并将它们发送到另一个主题?

最佳答案

这是一个大纲:

创建一个处理器实现:

  • 在 process() 方法中,对于每条消息:

    • 从消息值中读取时间戳
    • 使用 (timestamp, message-key) 对作为键,使用消息值作为值插入到 KeyValueStore 中。注意,这也提供重复数据删除。您需要提供一个自定义 Serde 来序列化 key ,以便时间戳排在第一位,按字节排列,以便范围查询按时间戳排在第一位。
  • 在 punctuate() 方法中:

    • 使用从 0 到时间戳的范围提取读取存储 - 60'000(=1 分钟)
    • 使用 context.forward() 按顺序发送获取的消息并将它们从存储中删除

这种方法的问题在于,如果没有新消息到达以提前“流时间”,则不会触发 punctuate()。如果这对您的情况有风险,您可以创建一个外部调度程序,它会定期向主题的每个(!)分区发送“滴答”消息,您的处理器应该忽略这些消息,但它们会导致标点符号在不存在的情况下触发“真实”的消息。KIP-138 将通过添加对系统时间标点符号的明确支持来解决此限制: https://cwiki.apache.org/confluence/display/KAFKA/KIP-138%3A+Change+punctuate+semantics

关于java - Apache Kafka 根据消息的值对窗口消息进行排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43939534/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com