gpt4 book ai didi

java - 有没有办法直接从处理器内将数据发送到 Kafka 主题?

转载 作者:行者123 更新时间:2023-12-02 01:59:26 25 4
gpt4 key购买 nike

我正在尝试在 Kafka Streams 的帮助下实现以下逻辑:

  1. 聆听主题中的一些引用数据,例如。 ref-data-topic 并从中创建一个全局 StateStore

  2. 监听来自另一个主题data-topic的消息,该消息必须根据引用数据进行验证,并且发送到成功错误 主题

这里是示例伪代码:

class SomeProcessor implements Processor<String, String> {

private KeyValueStore<String, String> refDataStore;

@Override
public void init(final ProcessorContext context) {
refDataStore = (KeyValueStore) context.getStateStore("ref-data-store");
}

@Override
public void process(String key String value) {
Object refData = refDataStore.get("some_key");

// business logic here

if(ok) {
sendValueToTopic("success");
} else {
sendValueToTopic("errors");
}
}
}

或者实现这种期望行为的规范方法是什么?

就像我现在想到的另一种选择是用验证信息丰富处理器中的数据,然后将所有内容发送到一个主题中,使客户端能够处理例如收到的消息中的 validationStatus

尽管如此,我确实希望有一个包含两个主题的解决方案,因为例如在这种情况下,我可以使用 Kafka Connect 将成功主题直接与某些数据存储链接并处理错误topic 有所不同。在只有一个主题的方法中,我再次不知道如何实现这一“store_only_successively_validated_entities”用例。

有什么想法和建议吗?

最佳答案

如果您使用处理器API,您可以按名称将数据转发到不同的处理器:

class SomeProcessor implements Processor<String, String> {

private KeyValueStore<String, String> refDataStore;
private ProcessorContext processorContext;

@Override
public void init(final ProcessorContext context) {
refDataStore = (KeyValueStore) context.getStateStore("ref-data-store");
processorContext = context;
}

@Override
public void process(String key String value) {
Object refData = refDataStore.get("some_key");

// business logic here

if(ok) {
processorContext.forward(key, value, To.child("success"));
} else {
processorContext.forward(key, value, To.child("error"));
}
}
}

插入拓扑时,您将添加两个接收器节点,名称为“success”“error”,分别写入成功和错误主题。

或者您将数据转发到单个接收器节点,并使用TopicNameExtractor而不是硬编码的主题名称添加接收器。 (需要2.0版本。)

如果您使用 DSL,则可以使用 KStream#branch() 来拆分流,并通过 KStream#to(...) 将不同的数据堆积到不同的主题中(或者您通过 KStream#to(TopicNameExtractor) 使用动态路由——需要 2.0 版本)

关于java - 有没有办法直接从处理器内将数据发送到 Kafka 主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51860481/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com