gpt4 book ai didi

java - 多个 JavaDStream 使用相同的 mapWithState 函数

转载 作者:行者123 更新时间:2023-12-02 03:24:54 25 4
gpt4 key购买 nike

我有 2 个 JavaPairDStream。它们具有相同的键(类型和值)和相同的值类型(不同的值)。我需要它们共享相同的状态以返回基于当前状态的结果,因此我使用相同的 mapWithState 函数。

JavaPairDStream<String, String> inputMessagesStream = readFromKafkaStream1();
JavaPairDStream<String, String> inputMessagesStream2 = readFromKafkaStream();
Function3<String, Optional<String>, State<MessageState>, String> messageState = (key, value, state) -> {
if (state.exists()) {
return state.get().process(value.get());
} else {
MessageState ms = new MessageState();
ms.process(value.get());
state.update(ms);
return null;
}
};

JavaMapWithStateDStream<String, String, MessageState, String> message1 = inputMessagesStream.mapWithState(StateSpec.function(messageState));
JavaMapWithStateDStream<String, String, MessageState, String> message2 = inputMessagesStream2.mapWithState(StateSpec.function(messageState));

可以对 2 个不同的流使用相同的函数吗?状态是否正确更新并由每个流共享?

最佳答案

您需要将inputMessagesStreaminputMessagesStream2组合为

JavaPairDStream<String, String> combinedStream = inputMessagesStream.union(inputMessagesStream2);

为了区分上述两个流,您可以定义一些标志并将其添加到MapWithState

JavaMapWithStateDStream<String, String, MessageState, String> message = combinedStream.mapWithState(StateSpec.function(messageState));

它会起作用。

关于java - 多个 JavaDStream 使用相同的 mapWithState 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39096203/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com