- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在 Kafka(0.11.0.1) Streams 中,演示应用程序 Play with a Streams Application
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "streams-plaintext-input");
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Group the text words as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count("Counts")
// Store the running counts as a changelog stream to the output topic.
wordCounts.to(stringSerde, longSerde, "streams-wordcount-output");
第5步,处理一些数据后,我们可以在接收器主题streams-wordcount-output中看到压缩的KV对(例如streams 2),
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092
--topic streams-wordcount-output \
--from-beginning \
--formatter kafka.tools.DefaultMessageFormatter \
--property print.key=true \
--property print.value=true \
--property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
--property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
all 1
streams 1
lead 1
to 1
kafka 1
hello 1
kafka 2
streams 2
问题是上面数据中的KTable wordCounts如何以Key-Value方式将数据写入主题streams-wordcount-output?
主题streams-wordcount-output的选项cleanup.policy似乎是默认值,delete
,而不是compact
(通过 bin/kafka-configs.sh)
最佳答案
所有输入和输出主题均“超出 Kafka Streams 的范围”。创建和配置这些主题是用户的责任。
因此,您的主题“streams-wordcount-output”
将具有您在创建主题时指定的配置。
关于java - KafkaStream.KTable如何以(压缩)KV风格将数据写入kafka主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46342526/
我有两个流来自两个主题 orders 和 fsource。主要订单是静态的,很少更新,而 fsource 的更新速度为每秒 1000 个。这里我使用了 KTable-KTabke join 因为它们具
我对以下拓扑的行为方式有一些问题: String topic = config.topic(); KTable myTable = topology.builder().table(UUIDSerde
documentation来自 Confluent 网站提到以下内容: The left KTable can have multiple records which map to the same
KTable table1 KTable table2 我正在尝试加入两个 KTables (无窗口) 键并将结果写为 到输出主题。 谁能帮我提供一些如何执行此操作的示例。 最佳答案 因为在 KTab
请参阅下面的更新以显示潜在的解决方法 我们的应用程序使用 2 个主题作为 KTables,执行左连接,并输出到一个主题。在测试期间,我们发现当我们的输出主题只有 1 个分区时,这可以正常工作。当我们增
我最近开始试验 kafka 流。我有一个场景需要加入 KStream与 KTable . KTable 可能是这种情况不包含某些键。在那种情况下,我得到一个 NullPointerException
我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试使用的是成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),因此我不使用TopologyTest
我正在尝试测试一个拓扑,该拓扑作为最后一个节点,具有 KTable。我的测试使用的是成熟的 Kafka 集群(通过 confluence 的 Docker 镜像),因此我不使用TopologyTest
我有一个 kafka 主题和一个听它的 KTable。 我想写一个 http POST 请求,它将遍历 ktable 中的当前项目,对它们执行一些操作并写回主题 所以基本上我有: private va
我们有以下高级 DSL 处理拓扑: TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceM
用例 - 有一个包含消息的主题(空,元数据)。我需要从主题创建一个 Ktable,其中键 (metadata.entity_id) 和值作为元数据。该表稍后将用于与具有相同键的流进行连接。 p
我在 Kafka 中创建带有时间窗口的 KTable 时遇到了一些问题。 我想创建一个表来计算流中 ID 的数量,就像这样。 ID (String) | Count (Long) X
假设我有一个包含多个分区的主题。我在其中写入 K/V 数据,并希望通过按键在 Tumbling Windows 中聚合所述数据。 假设我已经启动了与我有分区一样多的工作实例,并且每个工作实例都在单独的
我正在尝试在 Kafka Streams 中加入一个类似于许多文章的简单外键(例如: https://www.confluent.io/blog/data-enrichment-with-kafka-
如何确定主题的 KTable 物化何时完成? 例如假设 KTable 有几百万行。伪代码如下: KTable kt = kgroupedStream.groupByKey(..).reduce(..)
我有一个窗口KTable按预期工作,但每次收到新值时它都会输出。我找到了 .suppress 运算符,它完全符合我的要求:仅在时间窗口结束时输出结果。我已向 TimeWindow 添加了 grace
我是 Kafka Streams API 的新手,我正在尝试创建一个 KTable。我有一个输入主题:s-order-topic,它是一个json格式的消息,如下所示。 { "current_ts":
我想编写一个小型 Kafka Streams 应用程序,它会减少输入流的时间窗口,对值进行一些映射,然后将生成的 toStream() 更改日志发送到另一个主题。使用我的代码,我在 toStream(
有没有办法计算 Java/scala 应用程序中 KTable/KStream 随着时间的推移大约将使用多少堆(或任何其他)内存? 我有一些具体的假设,我想知道它们是否正确: Kafka 流仅使用内部
如何确定主题的 KTable 物化何时完成? 例如假设 KTable 有几百万行。伪代码如下: KTable kt = kgroupedStream.groupByKey(..).reduce(..)
我是一名优秀的程序员,十分优秀!