gpt4 book ai didi

java - 在一段时间内使用 kafka-streams 处理和检查事件

转载 作者:行者123 更新时间:2023-12-01 18:18:56 26 4
gpt4 key购买 nike

我有一个 KStream eventsStream,它从主题“事件”获取数据。有两种类型的事件,它们的键:
1. {user_id = X, event_id = 1} {..value, include time_event...}
2. {user_id = X, event_id = 2} {..value, include time_event...}

如果用户在 10 分钟内没有给出 event_id = 2 的事件,我需要将 event_id = 1 的事件迁移到主题“结果”。

例如,
1. 第一种情况:我们获取数据 {user_id = 100, event_id = 1} {.. time_event = xxxx ...} 并且在 10 分钟内没有发生任何事件 {user_id = 100, event_id = 2} {.. time_event = xxxx + 10 分钟...},所以我们将其写入 results-topic
2. 第二种情况:我们获取数据 {user_id = 100, event_id = 1} {.. time_event = xxxx ...} 以及 10 分钟内的事件 {user_id = 100, event_id = 2} {.. time_event = xxxx + 5 分钟...},因此我们不会将其写入 results-topic

如何使用 kafka-streams 在 java 代码中实现此行为?

我的代码:

公共(public)类 ResultStream{

public static KafkaStreams newStream() {

Properties properties = Config.getProperties("ResultStream");

Serde<String> stringSerde = Serdes.String();

StreamsBuilder builder = new StreamsBuilder();

StoreBuilder<KeyValueStore<String, String>> store =
Stores.keyValueStoreBuilder(
Stores.inMemoryKeyValueStore("inmemory"),
stringSerde,
stringSerde
);
builder.addStateStore(store);

KStream<String, String> resourceEventStream = builder.stream(EVENTS.topicName(), Consumed.with(stringSerde, stringSerde));
resourceEventStream.print(Printed.toSysOut());

resourceEventStream.process(() -> new CashProcessor("inmemory"), "inmemory");
resourceEventStream.process(() -> new FilterProcessor("inmemory", resourceEventStream), "inmemory");

Topology topology = builder.build();

return new KafkaStreams(topology, properties);

}

}

公共(public)类 FilterProcessor 实现处理器{

private ProcessorContext context;
private String eventStoreName;
private KeyValueStore<String, String> eventStore;
private KStream<String, String> stream;

public FilterProcessor(String eventStoreName, KStream<String, String> stream) {
this.eventStoreName = eventStoreName;
this.stream = stream;
}

@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
eventStore = (KeyValueStore) processorContext.getStateStore(eventStoreName);
}

@Override
public void process(Object key, Object value) {

this.context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {

System.out.println("Scheduler is working");

stream.filter((k, v) -> {

JsonObject events = new Gson().fromJson(k, JsonObject.class);
if (***condition***) {
return true;
}

return false;
}).to("results");
});
}

@Override
public void close() {

}

}

CashProcessor 的作用只是将事件放入本地存储,并在给定同一用户的 event_id = 2 的情况下由用户删除 event_id = 1 的记录。

FilterProcess 应每分钟使用本地存储过滤事件。但我无法正确调用此处理(正如我实际上所做的那样)...

我真的需要帮助。

最佳答案

为什么要将 KStream 传递到处理器中?这不是 DSL 的工作原理。

当您已经通过 resourceEventStream.process()“连接”处理器时,您的 FilterProcessor#process(key, value) 方法将为自动流 - 但是,KStream#process() 是终端操作,因此不允许您向下游发送任何数据。相反,您可能想要使用 transform() (这基本上与 process() 加上输出 KStream 相同)。

要实际在标点符号中向下游转发数据,您应该使用通过 init() 提供的 ProcessorContext 来使用 context.forward() > 方法。

关于java - 在一段时间内使用 kafka-streams 处理和检查事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60314918/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com