gpt4 book ai didi

java - 在 Kafka 连接器中设置分区策略

转载 作者:搜寻专家 更新时间:2023-10-31 20:00:01 24 4
gpt4 key购买 nike

我正在使用自定义 Kafka 连接器(用 Java 编写,使用 Kafka Connect 的 Java API)从外部来源提取数据并存储在主题中。我需要设置自定义分区策略。我了解设置自定义 partitioner通过设置 partitioner.class property 可以在 Kafka Producer 中 .但是,此属性似乎对 Kafka 连接器没有任何作用。如何配置 Kafka Connect(我正在使用 connect-standalone 脚本来运行我的连接器)以使用我编写的自定义 Partitioner

最佳答案

源连接器可以通过SourceRecordpartition 字段控制每个源记录写入的分区。如果这是您自己的连接器,这是最直接的。

但是,如果您想更改源连接器对每条记录的分区方式,您可以使用覆盖源记录的 partition 字段的单消息转换 (SMT)。您可能必须通过实现 org.apache.kafka.connect.transforms.Transformation 并使用您自己的分区逻辑来编写自定义 SMT,但这实际上比编写自定义 Kafka 分区程序要容易一些。

例如,这里有一个概念性的自定义转换,它展示了如何使用配置属性以及如何创建具有所需分区号的新 SourceRecord 实例。该示例不完整,因为它实际上没有任何真正的分区逻辑,但它应该是一个很好的起点。

package io.acme.example;import org.apache.kafka.common.config.AbstractConfig;import org.apache.kafka.common.config.ConfigDef;import org.apache.kafka.common.config.ConfigDef.Importance;import org.apache.kafka.common.config.ConfigDef.Type;import org.apache.kafka.connect.source.SourceRecord;import org.apache.kafka.connect.transforms.Transformation;import java.util.Map;public class CustomPartitioner implements Transformation {  private static final String MAX_PARTITIONS_CONFIG = "max.partitions";  private static final String MAX_PARTITIONS_DOC = "The maximum number of partitions";  private static final int MAX_PARTITIONS_DEFAULT = 1;  /**    * The definition of the configurations. We just define a single configuration property here,   * but you can chain multiple "define" methods together. Complex configurations may warrant   * pulling all the config-related things into a separate class that extends {@link AbstractConfig}   * and adds helper methods (e.g., "getMaxPartitions()"), and you'd use this class to parse the   * parameters in {@link #configure(Map)} rather than {@link AbstractConfig}.   */  private static final ConfigDef CONFIG_DEF = new ConfigDef().define(MAX_PARTITIONS_CONFIG, Type.INT, MAX_PARTITIONS_DEFAULT, Importance.HIGH, MAX_PARTITIONS_DOC);  private int maxPartitions;  @Override  public void configure(Map configs) {    // store any configuration parameters as fields ...    AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs);    maxPartitions = config.getInt(MAX_PARTITIONS_CONFIG);  }  @Override  public SourceRecord apply(SourceRecord record) {    // Compute the desired partition here    int actualPartition = record.kafkaPartition();    int desiredPartition = ...    // Then create the new record with all of the existing fields except with the new partition ...    return record.newRecord(record.topic(), desiredPartition,                            record.keySchema(), record.key(),                            record.valueSchema(), record.value(),                            record.timestamp());  }  @Override  public ConfigDef config() {    return CONFIG_DEF;  }  @Override  public void close() {    // do nothing  }}

ConfigDefAbstractConfig 功能非常有用,可以做很多更有趣的事情,包括使用自定义 validator 和推荐器,以及具有依赖的配置属性在其他属性上。如果您想了解更多相关信息,请查看一些也使用相同框架的现有 Kafka Connect 连接器。

最后一件事。在运行 Kafka Connect 独立或分布式 worker 时,但一定要将 CLASSPATH 环境变量设置为指向包含自定义 SMT 的 JAR 文件以及您的 SMT 依赖的任何 JAR 文件除了 Kafka 提供的那些。 connect-standalone.shconnect-distributed.sh 命令会自动将 Kafka JAR 添加到类路径中。

关于java - 在 Kafka 连接器中设置分区策略,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44810221/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com