gpt4 book ai didi

java - 使用 kafka-streams 有条件地对 json 输入流进行排序

转载 作者:行者123 更新时间:2023-11-30 07:54:43 25 4
gpt4 key购买 nike

我是开发 kafka-streams 应用程序的新手。我的流处理器旨在根据输入的 json 消息中的用户键值对 json 消息进行排序。

Message 1: {"UserID": "1", "Score":"123", "meta":"qwert"}
Message 2: {"UserID": "5", "Score":"780", "meta":"mnbvs"}
Message 3: {"UserID": "2", "Score":"0", "meta":"fghjk"}

我在这里读过Dynamically connecting a Kafka input stream to multiple output streams没有动态解决方案。

在我的用例中,我知道对输入流进行排序所需的用户键和输出主题。因此,我正在编写特定于每个用户的单独处理器应用程序,其中每个处理器应用程序都匹配不同的用户 ID。

所有不同的流处理器应用程序都从 kafka 中的同一个 json 输入主题读取,但如果满足预设的用户条件,每个应用程序只会将消息写入特定用户的输出主题。

public class SwitchStream extends AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
HashMap<String, String> message = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
message = mapper.readValue(value, HashMap.class);
} catch (IOException e){}

// User condition UserID = 1
if(message.get("UserID").equals("1")) {
context().forward(key, value);
context().commit();
}
}

public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "sort-stream-processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
TopologyBuilder builder = new TopologyBuilder();
builder.addSource("Source", "INPUT_TOPIC");
builder.addProcessor("Process", SwitchStream::new, "Source");
builder.addSink("Sink", "OUTPUT_TOPIC", "Process");

KafkaStreams streams = new KafkaStreams(builder, props);
streams.start();
}
}

问题一:如果使用低级处理器 API,是否可以使用高级流 DSL 轻松实现相同的功能? (我承认我发现很难理解和遵循高级流 DSL 的其他在线示例)

问题二:输入 json 主题正在以 20K-25K EPS 的高速率获取输入。我的处理器应用程序似乎跟不上这个输入流。我已经尝试部署每个进程的多个实例,但结果与我想要的结果相去甚远。理想情况下,每个处理器实例应该能够处理 3-5K EPS。

有没有办法改进我的处理器逻辑或使用高级流 DSL 编写相同的处理器逻辑?这会有什么不同吗?

最佳答案

您可以通过 filter() 在高级 DSL 中执行此操作(您有效地实现了一个过滤器,因为您只返回一条消息,如果它是 userID==1)。您可以使用 KStream#branch() 来概括此过滤器模式(有关详细信息,请参阅文档:http://docs.confluent.io/current/streams/developer-guide.html#stateless-transformations)。另请阅读 JavaDocs:http://kafka.apache.org/0102/javadoc/index.html?org/apache/kafka/streams

KStreamBuilder builder = new KStreamBuilder();
builder.stream("INPUT_TOPIC")
.filter(new Predicate() {
@Overwrite
boolean test(String key, String value) {
// put you processor logic here
return message.get("UserID").equals("1")
}
})
.to("OUTPUT_TOPIC");

关于性能。单个实例应该能够处理 10K+ 条记录。如果没有任何进一步的信息,很难判断可能是什么问题。我建议在 Kafka 用户列表中询问(参见 http://kafka.apache.org/contact )

关于java - 使用 kafka-streams 有条件地对 json 输入流进行排序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43830457/

25 4 0