gpt4 book ai didi

apache-zookeeper - 卡夫卡 : Get broker host from ZooKeeper

转载 作者:行者123 更新时间:2023-12-03 10:21:18 26 4
gpt4 key购买 nike

出于特殊原因,我需要同时使用两者 - ConsumerGroup (又名高级消费者)和 SimpleConsumer (又名低级消费者)从 Kafka 读取。对于 ConsumerGroup我使用基于 ZooKeeper 的配置并且对它非常满意,但是 SimpleConsumer需要实例化种子代理。

我不想同时保留 ZooKeeper 和代理主机的列表。因此,我正在寻找一种方法来 自动发现经纪人 对于特定主题 来自 ZooKeeper .

由于一些间接信息,我相信这些数据存储在 ZooKeeper 中的以下路径之一:

  • /brokers/topics/<topic>/partitions/<partition-id>/state
  • /brokers/ids/

  • 但是,当我尝试从这些节点读取数据时,出现序列化错误(为此我使用了 com.101tec.zkclient):

    org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:37) at org.I0Itec.zkclient.ZkClient.derializable(ZkClient.java:740) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:773) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:761) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:750) at org.I0Itec.zkclient.ZkClient.readData(ZkClient.java:744) ... 64 elided Caused by: java.io.StreamCorruptedException: invalid stream header: 7B226A6D at java.io.ObjectInputStream.readStreamHeader(ObjectInputStream.java:804) at java.io.ObjectInputStream.(ObjectInputStream.java:299) at org.I0Itec.zkclient.serialize.TcclAwareObjectIputStream.(TcclAwareObjectIputStream.java:30) at org.I0Itec.zkclient.serialize.SerializableSerializer.deserialize(SerializableSerializer.java:31) ... 69 more



    我可以毫无问题地编写和读取自定义 Java 对象(例如字符串),所以我相信这不是客户端的问题,而是棘手的编码问题。因此,我想知道:
  • 如果这是正确的方法,如何正确读取这些节点 ?
  • 如果整个方法是错误的,什么是正确的 ?
  • 最佳答案

    这就是我的一位同事获取 Kafka 经纪人列表的方式。我认为当您想动态获取经纪人列表时,这是一种正确的方法。

    这是显示如何获取列表的示例代码。

    public class KafkaBrokerInfoFetcher {

    public static void main(String[] args) throws Exception {
    ZooKeeper zk = new ZooKeeper("localhost:2181", 10000, null);
    List<String> ids = zk.getChildren("/brokers/ids", false);
    for (String id : ids) {
    String brokerInfo = new String(zk.getData("/brokers/ids/" + id, false, null));
    System.out.println(id + ": " + brokerInfo);
    }
    }
    }

    在由三个代理组成的集群上运行代码会导致
    1: {"jmx_port":-1,"timestamp":"1428512949385","host":"192.168.0.11","version":1,"port":9093}
    2: {"jmx_port":-1,"timestamp":"1428512955512","host":"192.168.0.11","version":1,"port":9094}
    3: {"jmx_port":-1,"timestamp":"1428512961043","host":"192.168.0.11","version":1,"port":9095}

    关于apache-zookeeper - 卡夫卡 : Get broker host from ZooKeeper,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29490113/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com