- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我已经设置了一个简单的聚合,将来自多个流的值平均在一起,我正在尝试对其进行测试。我已经花了很多时间,但似乎无法直接理解这些概念。我的流如下:
// Combine multiple streams together.
KStream<String, IndividualTick> tickerStream =
priceIndexStreamBuilder.stream(exchangeTopics, Consumed.with(...));
// Group by a key & compute average per key
KStream<K, AveragedTick> avgTickerStream = tickStream.selectKey((key,
value) -> value.getK())
.groupByKey(...)
.aggregate(AvgTick::new,
(key, value, aggregate) -> {
aggregate.addTick(value);
return aggregate;
},
Materialized.with(...))
.toStream();
indexTickerStream.to(sinkTopic, Produced.with(...));
我的测试使用 EmbeddedKafka,将一堆记录发布到主题,然后坐在阻塞队列中等待记录到达 sinkTopic
。
我对这种聚合如何随时间变化很感兴趣,所以我希望断言每个输出代码的平均值。我可能会添加一定程度的窗口,但我现在尽量保持简单。
当我运行测试时,我得到了不同的结果。假设我的拓扑中有 10 个输入记录:
AverageTick
序列化器中放置的断点被调用了不同的次数。我认为这是因为 KIP-63 中定义的缓存功能- 记录非常快速地出现在处理节点上,并被最新记录合并/覆盖。 (虽然我不完全确定。)
我的单元测试通过了 ProcessorTopologyTestDriver
,但我正在尝试为包含此逻辑的服务编写一些验收测试。
我还尝试使用我的 commit.interval.ms
配置,以及在发布我的输入记录之间放置 sleep ,以取得不同程度的(不稳定的)成功。
我觉得我在这里做的事情在概念上是错误的——我只是不知道该采取什么其他方法。
最佳答案
您的观察是正确的。缓存使测试变得困难,因为它引入了不确定性。
要编写有用的测试,您有两个选择:
顺便说一句:在即将到来的 1.1 Kafka 中添加了一个公共(public)测试包,我们计划添加更多:https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams
关于apache-kafka - 测试 KafkaStreams 应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48599228/
我正在开发 Kafka 流应用程序,但在弄清楚如何使聚合工作时遇到了一些麻烦。 我有一个 KStream bankTransactions其中键的类型为 String和类型 JsonNode 的值所以
我一直在研究 Kafka Streams 应用程序上的磁盘写入,并将拓扑减少到最低限度,即: KStream stream = builder.stream("input-topic"); 然而,在
我正在研究将现有 Flink 应用程序/拓扑转换为使用 KafkaStreams 的 POC。我的问题是关于部署。 具体来说 - 在 Flink 中,将“工作节点”添加到 flink 安装中,然后向拓
我正在玩 Kafka 和流技术;我已经为 KStream 创建了一个自定义序列化器和反序列化器,我将使用它来接收来自给定主题的消息。 现在,问题是我正在以这种方式创建一个 serde: JsonSer
我有一个 Kafka 消费者类,它监听事件并执行事件(订单、客户)之间的连接并将它们存储在物化 View 中。当收到 REST 调用时,我创建了另一个类来访问状态存储。但我越来越java.lang.I
我有一个 Kafka Streams 应用程序,它使用 Kafka Streams DSL 连接到我们的 Kafka 集群,如下所示: KStreamBuilder builder = new KSt
我是 kafkaStream 的新手,我正在开发一个 Stream,但是当我启动我的应用程序时,很多日志都在记录。 例如,如何将日志级别从 Debbug 更改为 Info。 谢谢你。 6:54:12.
下面是我的代码片段。我想并行kafka流处理。但我不想放入 Runnable,也不想多次启动这个应用程序。 有没有类似streams.parallel()的方法? final
我有一个 Kafka 主题,我希望其中的消息具有两种不同的键类型:旧的和新的。 即 "1-new" , "1-old" , "2-new" , "2-old" .键是唯一的,但有些可能会丢失。 现在使
所以我试图让交互式查询与 Kafka 流一起工作。我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上)。我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和
我已经设置了一个简单的聚合,将来自多个流的值平均在一起,我正在尝试对其进行测试。我已经花了很多时间,但似乎无法直接理解这些概念。我的流如下: // Combine multiple streams t
在 Kafka(0.11.0.1) Streams 中,演示应用程序 Play with a Streams Application // Serializers/deserializers (ser
我正在尝试删除下游变更日志中值为 null 的记录,我知道在状态存储中它们只是通过为 null(逻辑删除)而被删除,但是当您对 KTable 或 Stream 进行聚合时,它们会跳过null 并且不删
是否有可能获得window final result在 Kafka Streams 中通过抑制中间结果。 我无法实现这个目标。我的代码有什么问题? val builder = StreamsB
来自https://micronaut-projects.github.io/micronaut-kafka/latest/guide/#kafkaStream ,我可以看到有一种简单的方法来运行运行
我正在为大容量高速分布式应用程序编写 Kafka Consumer。我只有一个主题,但收到的消息率非常高。为更多消费者提供服务的多个分区将适合此用例。最好的消费方式是拥有多个流阅读器。根据文档或可用示
我有四个使用相同应用程序 ID 运行的 Kafka 流应用程序实例。所有输入主题都属于单个分区。为了实现可扩展性,我通过了一个具有多个分区的中间虚拟主题。我已经设置了request.timeout.m
我尝试在 Kafka (0.11) 的聚合函数中使用 SessionWindows,但无法理解为什么会出现错误。 这是我的代码片段: // defining some values: public s
我用来创建 GlobalKTable 的话题非常活跃。在 KStream-GlobalKTable join 的文档中我读了 The GlobalKTable is fully bootstrappe
我有一个由 KafkaStreams Java api 编写的 Kafka 应用程序。它从 Mysql binlog 读取数据并执行一些与我的问题无关的操作。问题是某一特定行在 avro 反序列化过程
我是一名优秀的程序员,十分优秀!