gpt4 book ai didi

java - 如何根据来自另一个主题的响应从 Kafka 主题中读取消息

转载 作者:行者123 更新时间:2023-12-04 10:37:46 24 4
gpt4 key购买 nike

我在 Kafka 中有 2 个主题:MetaData 和 MasterData。
我正在使用 Kafka Listners 实时读取数据。有2种情况:

  • 我必须先阅读 MetaData 主题,然后再阅读 MasterData 主题。
  • 也有可能 MetaData 主题中没有新的消息,但 MasterData 主题中插入了一条消息。在这种情况下,MasterData 主题的消费应该继续进行。

  • Ant 建议如何实现这一目标?

    最佳答案

    在评论中澄清之后,这是我所理解的,你想要实现的目标:

    You have a database like system, and the meta data from the kafka topic is used to adjust the table structure. Then the actual data comes into the master kafka topic which you then want to insert into the table. In other words: You want to define the structure of your data using kafka.



    我不确定,如果这就是我所说的 Kafka 的常规用例,因为它旨在成为交换事件(即数据)的接口(interface)。您尝试做的是使用系统作为定义数据结构的手段。但这只是我的意见。

    正如前面在评论中所说,实际上没有办法说在一个主题中不会有消息。你只能说:还没有消息“还”。您“可以”做的可能是:当 master 中的消息时话题到了,你先查看 meta主题,如果有消息,也是。如果有,则应用数据结构的更改,然后从主主题导入消息。

    另一种选择是使用 Kafka Streams 而不是原始消费者
    将这两个主题与 join 结合在一起例如。

    无论如何,要定义数据结构,通常会使用 Avro 之类的东西,它可以为您提供模式演变等等。然后编写您的应用程序以了解架构更改并将它们相应地应用于您的数据库表。

    更新:如何使用 Kafka Streams 解决

    如上所述,使用 Kafka Streams 可能是您的情况的一种解决方案。让我解释一下,我在一些伪 kafka 流代码中的意思是:

    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.kstream.GlobalKTable;

    import java.nio.charset.StandardCharsets;

    /**
    * Just an incomplete Kafka Streams Code Demo to show how the problem could
    * be solved with this framework instead of using the consumers directly.
    * Kafka Streams Boilerplate code not included.
    */
    public class Example {
    private static final String META_TOPIC = "meta";
    private static final String MASTER_TOPIC = "master";

    // You have to make sure, the meta data is stored under this
    // specific key in the meta topic.
    private static final byte[] META_KEY = MetaEvent.class.getName().getBytes(StandardCharsets.UTF_8);

    public static void main(String[] args) {
    new Example().createTopology();
    }

    public void createTopology() {

    final StreamsBuilder builder = new StreamsBuilder();

    final GlobalKTable<byte[], MetaEvent> metaTable = builder.globalTable(META_TOPIC);

    builder.<byte[], MasterEvent>stream(MASTER_TOPIC)
    .leftJoin(
    metaTable,
    (k, v) -> META_KEY,
    MasterWithMeta::new)
    .foreach(this::handleEvent);
    }

    private void handleEvent(byte[] key, MasterWithMeta masterWithMeta) {
    // 1) check if meta has changed, if so, apply changes to database
    // 2) import master data to database
    }
    }

    class MasterWithMeta {
    private final MasterEvent master;
    private final MetaEvent meta;

    public MasterEvent getMaster() {
    return master;
    }

    public MetaEvent getMeta() {
    return meta;
    }

    public MasterWithMeta(MasterEvent master, MetaEvent meta) {
    this.master = master;
    this.meta = meta;
    }

    public static MasterWithMeta create(MasterEvent master, MetaEvent meta) {
    return new MasterWithMeta(master, meta);
    }
    }

    class MetaEvent {
    // ...
    }

    class MasterEvent {
    // ...
    }

    关于java - 如何根据来自另一个主题的响应从 Kafka 主题中读取消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60090897/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com