gpt4 book ai didi

apache-kafka - Kafka Streams : how to write to a topic?

转载 作者:行者123 更新时间:2023-12-04 22:19:28 26 4
gpt4 key购买 nike

在Kafka Streams中,产生/编写流的规范方法是什么?在Spark中,有一个自定义接收器,它可以作为来自任意数据源的长期运行的适配器。 Kafka Streams中的等效项是什么?

具体来说,我不是在问如何将一个主题转换为另一个主题。该文档对此非常清楚。我想了解如何编写将在对Kafka进行的一系列转换中的第一次编写的工作人员。

我希望能够做到

builder1.<something>(<some intake worker like a spark reciver)
.to(topic1)
.start()

builder2.from(topic1)
.transform(<some transformation function>)
.to(topic2)
.start()

但是现有的文档都没有显示这一点?我想念什么吗?

最佳答案

取决于您使用的是Kafka Streams DSL还是Processor API:

  • Kafka Streams DSL 您可以使用KStream#to()KStream具体化为一个主题。这是将数据具体化为主题的规范方法。另外,您可以使用KStream#through()。这还将使数据具体化为一个主题,而且还返回生成的KStream以供进一步使用。那么,#to()#through()之间的唯一区别是,如果您希望将生成的物化分区作为KStreamBuilder#stream(),它将为您节省一个KStream
  • 处理器API 您可以通过将数据转发到接收器处理器来将数据实例化到分区。

  • 无论哪种方式,要注意的关键是在使用上述方法之一将数据写入分区之前,数据不会具体化为主题。 map()filter()等不会实现数据。数据保留在处理器任务/线程/内存中,直到通过上述方法之一实现为止。

    制作为Kafka Streams:
    Properties producerConfig = new Properties();
    producerConfig.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:2181");
    producerConfig.put(ACKS_CONFIG, "all");
    producerConfig.put(RETRIES_CONFIG, 0);
    Producer<Integer, Integer> producer = new KafkaProducer<>(producerConfig, new IntegerSerializer(), new IntegerSerializer<>());

    进而:
    Arrays.asList(1, 2, 3, 4).forEach(integer -> producer.send(new ProducerRecord<>("integers", integer, integer)))

    你会需要:
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>${version.kafka}</version>
    </dependency>

    关于apache-kafka - Kafka Streams : how to write to a topic?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38106863/

    26 4 0