gpt4 book ai didi

java - 了解用于 Java 的 Kafka 消费者 API

转载 作者:行者123 更新时间:2023-11-29 04:55:05 26 4
gpt4 key购买 nike

我想了解Kafka Receiving API。我提供了一个有效的示例代码。

  1. 为什么单个主题的 Kafka consumerStreamMap.get(topic) 有一个 KafkaStream<> 接收者列表?
  2. 目前的流程好像是遍历KafkaStream<> List,然后遍历消息。但是 KafkaReceiver 应该永远运行,所以我希望内部 while 永远循环。这使得 List> 变得多余。
  3. 一些示例还使用了 consumerStreamMap.get(topic).get(0)。那么这是写制作人的正确方式吗?

        Map<String, Integer> topicMap = new HashMap<String, Integer>();
    // Define single thread for topic
    topicMap.put(topicName, new Integer(1));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreamsMap = consumer.createMessageStreams(topicMap);
    List<KafkaStream<byte[], byte[]>> streamList = consumerStreamsMap.get(topic);

    for (final KafkaStream<byte[], byte[]> stream : streamList)
    {
    ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
    while (consumerIte.hasNext())
    {
    counter++;
    String message = new String(consumerIte.next().message());
    String id = topic.hashCode() + "-" + date.getTime() + "-" + counter;
    System.out.println(message);
    }
    }

最佳答案

你可以在kafka wiki中找到答案: https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

  1. consumerStreamMap 是(主题,KafkaStream 列表)对的映射。流的数量取决于代码中的以下行:

    topicMap.put(topicName, numberOfStreams);

if you provide more threads than there are partitions on the topic, some threads will never see a message. if you have more partitions than you have threads, some threads will receive data from multiple partitions. if you have multiple partitions per thread there is NO guarantee about the order you receive messages, other than that within the partition the offsets will be sequential. For example, you may receive 5 messages from partition 10 and 6 from partition 11, then 5 more from partition 10 followed by 5 more from partition 10 even if partition 11 has data available. adding more processes/threads will cause Kafka to re-balance, possibly changing the assignment of a Partition to a Thread.

  1. 您需要在自己的线程中迭代每个流。

    public void run(int a_numThreads) {
    Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    topicCountMap.put(topic, new Integer(a_numThreads));
    Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);

    // now launch all the threads
    //
    executor = Executors.newFixedThreadPool(a_numThreads);

    // now create an object to consume the messages
    //
    int threadNumber = 0;
    for (final KafkaStream stream : streams) {
    executor.submit(new ConsumerTest(stream, threadNumber));
    threadNumber++;
    }
    }

    public class ConsumerTest implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
    m_threadNumber = a_threadNumber;
    m_stream = a_stream;
    }

    public void run() {
    ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
    while (it.hasNext())
    System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
    System.out.println("Shutting down Thread: " + m_threadNumber);
    }
    }
  2. consumerStreamMap.get(topic).get(0) 只有在您有 1 个主题和 1 个流时才是正确的

关于java - 了解用于 Java 的 Kafka 消费者 API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34125773/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com