- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我尝试在 Kafka (0.11) 的聚合函数中使用 SessionWindows,但无法理解为什么会出现错误。
这是我的代码片段:
// defining some values:
public static final Integer SESSION_TIMEOUT_MS = 6000000;
public static final String INTOPIC = "input";
public static final String HOST = "host";
// setting up serdes:
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
// some more code to build up the streams
KStreamBuilder builder = new KStreamBuilder();
KStream<String, JsonNode> dataStream = builder.stream(Serdes.String(), jsonSerde, INTOPIC);
// constructing the initalMessage ObjectNode:
ObjectNode initialMessage = JsonNodeFactory.instance.objectNode();
initialMessage.put("count", 0);
initialMessage.put("endTime", "");
// transforming data to KGroupedStream<String,JsonNode>
KGroupedStream<String, JsonNode> data = dataStream.map((key, value) ->{return new KeyValue<>(value.get(HOST).asText(), value); }).groupByKey(Serdes.String(), jsonSerde);
// finally aggregate the data usind SessionWindows
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
() -> initialMessage,
(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),
SessionWindows.with(SESSION_TIMEOUT_MS),
jsonSerde,
"aggregated-data");
private static JsonNode countData(JsonNode incomingMessage, JsonNode initialMessage){
// some dataprocessing
}
当我改变
KTable<Windowed<String>,JsonNode>
至
KTable<String, JsonNode>
并删除
SessionWindows.with(SESSION_TIMEOUT_MS)
从聚合函数来看,一切正常。
如果我不这样做,Eclipse 会告诉我行
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate( [...])
The method aggregate(Initializer, Aggregator, Windows, Serde, String) in the type KGroupedStream is not applicable for the arguments (() -> {}, ( key, incomingMessage, initialMessage) -> {}, SessionWindows, Serde, String)
对于该行
() -> initialMessage
Type mismatch: cannot convert from ObjectNode to VR
和:
(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),
The method countData(JsonNode, JsonNode) in the type DataWindowed is not applicable for the arguments (JsonNode, VR)
我真的不明白,类型在哪里丢失了!任何提示都会很棒!
谢谢:D
最佳答案
我确实需要实现合并:
Merger<? super String, JsonNode>tmpMerger = new MergerClass<String, JsonNode>();
并将其添加到聚合函数中:
KTable<Windowed<String>, JsonNode> aggregatedData = data.aggregate(
() -> initialMessage,
(key, incomingMessage, initialMessage) -> countData(incomingMessage, initialMessage),
tmpMerger,
SessionWindows.with(SESSION_TIMEOUT_MS),
jsonSerde,
"aggregated-data");
关于java - 在 KafkaStreams 中的聚合数据上使用 SessionWindows (0.11),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46662688/
默认情况下,.windowsedBy(SessionWindows.with(...)) 将返回每个新的传入记录。那么,我如何才能等待至少 1 秒才能返回当前 session 窗口的最后一个结果? 我
假设我有一个带有 SessionWindowing 的 Kafka Stream,例如: windowedBy(SessionWindows.with(inactivity_time).until(a
如果没有恒定的输入记录流,则似乎带有宽限期和抑制的 session 窗口的 Kafka 流无法输出最终事件。 上下文:我们正在使用变更数据捕获 (CDC) 来监控对旧数据库的更改。当用户使用 UI 进
我尝试在 Kafka (0.11) 的聚合函数中使用 SessionWindows,但无法理解为什么会出现错误。 这是我的代码片段: // defining some values: public s
默认情况下,.windowedBy(SessionWindows.with(Duration.ofSeconds(60)) 为每个传入记录返回一条记录。 与 .count() 和 .filter()
我是一名优秀的程序员,十分优秀!