gpt4 book ai didi

java - KafkaStream.KTable如何以(压缩)KV风格将数据写入kafka主题

转载 作者:行者123 更新时间:2023-12-02 02:35:51 25 4
gpt4 key购买 nike

在 Kafka(0.11.0.1) Streams 中,演示应用程序 Play with a Streams Application

// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();

// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-plaintext-input");

KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

// Group the text words as message keys
.groupBy((key, value) -> value)

// Count the occurrences of each word (message key).
.count("Counts")

// Store the running counts as a changelog stream to the output topic.
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");

第5步,处理一些数据后,我们可以在接收器主题streams-wordcount-output中看到压缩的KV对(例如streams 2),

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2

问题是上面数据中的KTable wordCounts如何以Key-Value方式将数据写入主题streams-wordcount-output

主题streams-wordcount-output的选项cleanup.policy似乎是默认值,delete,而不是compact (通过 bin/kafka-configs.sh)

最佳答案

所有输入和输出主题均“超出 Kafka Streams 的范围”。创建和配置这些主题是用户的责任。

因此,您的主题“streams-wordcount-output”将具有您在创建主题时指定的配置。

比照。 https://docs.confluent.io/current/streams/developer-guide.html#managing-topics-of-a-kafka-streams-application

关于java - KafkaStream.KTable如何以(压缩)KV风格将数据写入kafka主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46342526/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com