gpt4 book ai didi

java - 创建新的 Jet 自定义分区器

转载 作者:行者123 更新时间:2023-12-02 02:21:10 25 4
gpt4 key购买 nike

我的用例需要从 Kafka 主题读取消息,并按照发布到 Kafka 的自然顺序处理消息。

Kafka生产者负责发布在单个kafka主题分区中排序的每组消息,并且我需要在同一个顶点处理器中以相同的顺序处理每组消息。

enter image description here

上图代表了基本思想。有一些 KafkaSource-Processors 从 Kafka 读取数据。

一条边连接到一个顶点以解码kafka消息等等。

我可以使用 kafka 消息 key 作为分区 key ,但我认为我最终会得到不平衡的解码处理器。

鉴于:

  • 如何创建新的分区器?我找不到任何例子来启发我。
  • 在新的分区器上,如何识别发出消息的 KS 处理器?我希望前一个顶点进程和下一个顶点处理器之间存在一对一的关系,例如,KS#0 始终将消息发送到 Decode#0,KS#1 始终将消息发送到 Decode#1,依此类推。
  • 我是否需要一个新的分区器来实现这一点,或者是否有一些现成的功能可以实现这一点?

最佳答案

您不需要为此使用分区程序。 Edge.isolated()相等的局部并行性 就是为此而设计的:

dag.edge(between(kafkaSource, decode).isolated());

在这种情况下,源处理器的一个实例恰好与目标处理器的一个实例绑定(bind),并且将保留项目的顺序。请记住,单个 Kafka 源处理器可以从多个 Kafka 分区获取项目,因此您必须跟踪 Kafka 分区 ID。即使你让 Jet 处理器和 Kafka 分区的总数相等,你也不能依赖它,因为如果其中一个成员发生故障并重新启 Action 业,Jet 处理器的总数会减少,但 Kafka 分区的数量会获胜't。

另请注意,源的默认本地并行度并不相等:对于 Kafka 源,它默认为 2,对于其他源,它通常等于 CPU 计数。您需要手动指定相等的值。

另一个限制是,如果您使用 Processors.mapP 作为 decode 顶点,则映射函数必须是无状态的。因为您需要订购商品,所以我假设您有一些状态需要保留。为了使其正常工作,您必须使用自定义处理器:

Vertex decode = dag.newVertex("decode", MyDecodeP::new);

处理器实现:

private static class MyDecodeP extends AbstractProcessor {
private Object myStateObject;

@Override
protected boolean tryProcess(int ordinal, @Nonnull Object item) {
Object mappedItem = ...;
return tryEmit(mappedItem);
}
}

答案是为 Jet 0.5.1 编写的。

关于java - 创建新的 Jet 自定义分区器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48486511/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com