- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我一直在研究 Kafka Streams 应用程序上的磁盘写入,并将拓扑减少到最低限度,即:
KStream<String, JsonElement> stream = builder.stream("input-topic");
然而,在 docker stats
上,我可以观察到我的应用程序一直在向磁盘写入内容。我检查了容器,没有看到任何可疑的文件句柄。
如果我评论上述行,这个问题就消失了,所以我得出的结论是,仅仅从一个主题中读取就会在磁盘上写入一些东西,但我猜不出是什么以及为什么。
此外,我注意到它与提交间隔设置密切相关;增加它会使写入频率降低。每次写入大约130kB的数据。
这是预期的行为吗?
最佳答案
Kafka Streams 将检查点文件写入本地磁盘。这些检查点文件实际上只用于有状态任务,但即使没有有状态操作也会创建(对于这种情况,检查点文件几乎是空的)。
这是一个已知问题,已在即将发布的 1.1 版本中修复:https://issues.apache.org/jira/browse/KAFKA-6499
关于java - KafkaStreams 在读取主题时写入磁盘,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49049700/
我正在开发 Kafka 流应用程序,但在弄清楚如何使聚合工作时遇到了一些麻烦。 我有一个 KStream bankTransactions其中键的类型为 String和类型 JsonNode 的值所以
我一直在研究 Kafka Streams 应用程序上的磁盘写入,并将拓扑减少到最低限度,即: KStream stream = builder.stream("input-topic"); 然而,在
我正在研究将现有 Flink 应用程序/拓扑转换为使用 KafkaStreams 的 POC。我的问题是关于部署。 具体来说 - 在 Flink 中,将“工作节点”添加到 flink 安装中,然后向拓
我正在玩 Kafka 和流技术;我已经为 KStream 创建了一个自定义序列化器和反序列化器,我将使用它来接收来自给定主题的消息。 现在,问题是我正在以这种方式创建一个 serde: JsonSer
我有一个 Kafka 消费者类,它监听事件并执行事件(订单、客户)之间的连接并将它们存储在物化 View 中。当收到 REST 调用时,我创建了另一个类来访问状态存储。但我越来越java.lang.I
我有一个 Kafka Streams 应用程序,它使用 Kafka Streams DSL 连接到我们的 Kafka 集群,如下所示: KStreamBuilder builder = new KSt
我是 kafkaStream 的新手,我正在开发一个 Stream,但是当我启动我的应用程序时,很多日志都在记录。 例如,如何将日志级别从 Debbug 更改为 Info。 谢谢你。 6:54:12.
下面是我的代码片段。我想并行kafka流处理。但我不想放入 Runnable,也不想多次启动这个应用程序。 有没有类似streams.parallel()的方法? final
我有一个 Kafka 主题,我希望其中的消息具有两种不同的键类型:旧的和新的。 即 "1-new" , "1-old" , "2-new" , "2-old" .键是唯一的,但有些可能会丢失。 现在使
所以我试图让交互式查询与 Kafka 流一起工作。我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上)。我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和
我已经设置了一个简单的聚合,将来自多个流的值平均在一起,我正在尝试对其进行测试。我已经花了很多时间,但似乎无法直接理解这些概念。我的流如下: // Combine multiple streams t
在 Kafka(0.11.0.1) Streams 中,演示应用程序 Play with a Streams Application // Serializers/deserializers (ser
我正在尝试删除下游变更日志中值为 null 的记录,我知道在状态存储中它们只是通过为 null(逻辑删除)而被删除,但是当您对 KTable 或 Stream 进行聚合时,它们会跳过null 并且不删
是否有可能获得window final result在 Kafka Streams 中通过抑制中间结果。 我无法实现这个目标。我的代码有什么问题? val builder = StreamsB
来自https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaStream ,我可以看到有一种简单的方法来运行运行
我正在为大容量高速分布式应用程序编写 Kafka Consumer。我只有一个主题,但收到的消息率非常高。为更多消费者提供服务的多个分区将适合此用例。最好的消费方式是拥有多个流阅读器。根据文档或可用示
我有四个使用相同应用程序 ID 运行的 Kafka 流应用程序实例。所有输入主题都属于单个分区。为了实现可扩展性,我通过了一个具有多个分区的中间虚拟主题。我已经设置了request.timeout.m
我尝试在 Kafka (0.11) 的聚合函数中使用 SessionWindows,但无法理解为什么会出现错误。 这是我的代码片段: // defining some values: public s
我用来创建 GlobalKTable 的话题非常活跃。在 KStream-GlobalKTable join 的文档中我读了 The GlobalKTable is fully bootstrappe
我有一个由 KafkaStreams Java api 编写的 Kafka 应用程序。它从 Mysql binlog 读取数据并执行一些与我的问题无关的操作。问题是某一特定行在 avro 反序列化过程
我是一名优秀的程序员,十分优秀!