gpt4 book ai didi

java - 如何实现 FlinkKafkaPartitioner?

转载 作者:行者123 更新时间:2023-12-01 21:27:48 27 4
gpt4 key购买 nike

我正在开发一项 Flink 服务,从一个 Kafka 读取消息,然后将它们反序列化到 HashMap 并用它们做一些事情,最后将它们写入另一个 Kafka。现在我遇到了一个我不知道如何解决的问题,并且我在网上没有找到有关如何解决它的示例。我想做的是为我的 Flink Kafka Producer 创建一个自定义分区,以便具有相同 id 的事件进入相同的分区,因为按顺序保持来自相同 id 的事件非常重要。但是我不明白如何实现 FlinkKafkaPartitioner 并且文档在这方面没有什么帮助。

到目前为止,我对 Producer 的了解如下(对 FlinkKafkaProducer 使用 null,因为我只是想让它正常工作,但这应该由自定义分区程序替换):

FlinkKafkaProducer010<String> writeToNewPipe = new FlinkKafkaProducer010<String>(
processorConfig.getKafkaDestTopic(),
new SimpleStringSchema(),
producerProps,
(FlinkKafkaPartitioner) null
);

在我的代码中,我执行以下操作:

eventsFromOldPipe
.map(event -> {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
try {
return mapper.writeValueAsString(event);
}
catch (JsonProcessingException e) {
e.printStackTrace();
}
return null;
})
.addSink(writeToNewPipe);

其中 eventsFromOldPipe 发出 HashMap。

现在假设来自 eventsFromOldPipe 的 HashMap 包含一个我想用作分区键的 sessionId 字段,理想情况下我还想从发送的记录中删除该 sessionId如果可能的话,由生产者发送给 Kafka(删除并不重要,但会很好)。

我对 Flink 实现的更多“自定义”部分还很陌生,所以我对此非常迷失,所以感谢任何帮助。

最佳答案

只需实现KafkaSerializationSchema,用键和值定义ProducerRecord。kafka 将根据您定义的键对记录进行分区。

关于java - 如何实现 FlinkKafkaPartitioner?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58824501/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com