- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我正在使用自定义 Kafka 连接器(用 Java 编写,使用 Kafka Connect 的 Java API)从外部来源提取数据并存储在主题中。我需要设置自定义分区策略。我了解设置自定义 partitioner通过设置 partitioner.class
property 可以在 Kafka Producer 中 .但是,此属性似乎对 Kafka 连接器没有任何作用。如何配置 Kafka Connect(我正在使用 connect-standalone
脚本来运行我的连接器)以使用我编写的自定义 Partitioner
?
最佳答案
源连接器可以通过SourceRecord
的partition
字段控制每个源记录写入的分区。如果这是您自己的连接器,这是最直接的。
但是,如果您想更改源连接器对每条记录的分区方式,您可以使用覆盖源记录的 partition
字段的单消息转换 (SMT)。您可能必须通过实现 org.apache.kafka.connect.transforms.Transformation
并使用您自己的分区逻辑来编写自定义 SMT,但这实际上比编写自定义 Kafka 分区程序要容易一些。
例如,这里有一个概念性的自定义转换,它展示了如何使用配置属性以及如何创建具有所需分区号的新 SourceRecord
实例。该示例不完整,因为它实际上没有任何真正的分区逻辑,但它应该是一个很好的起点。
package io.acme.example;import org.apache.kafka.common.config.AbstractConfig;import org.apache.kafka.common.config.ConfigDef;import org.apache.kafka.common.config.ConfigDef.Importance;import org.apache.kafka.common.config.ConfigDef.Type;import org.apache.kafka.connect.source.SourceRecord;import org.apache.kafka.connect.transforms.Transformation;import java.util.Map;public class CustomPartitioner implements Transformation { private static final String MAX_PARTITIONS_CONFIG = "max.partitions"; private static final String MAX_PARTITIONS_DOC = "The maximum number of partitions"; private static final int MAX_PARTITIONS_DEFAULT = 1; /** * The definition of the configurations. We just define a single configuration property here, * but you can chain multiple "define" methods together. Complex configurations may warrant * pulling all the config-related things into a separate class that extends {@link AbstractConfig} * and adds helper methods (e.g., "getMaxPartitions()"), and you'd use this class to parse the * parameters in {@link #configure(Map)} rather than {@link AbstractConfig}. */ private static final ConfigDef CONFIG_DEF = new ConfigDef().define(MAX_PARTITIONS_CONFIG, Type.INT, MAX_PARTITIONS_DEFAULT, Importance.HIGH, MAX_PARTITIONS_DOC); private int maxPartitions; @Override public void configure(Map configs) { // store any configuration parameters as fields ... AbstractConfig config = new AbstractConfig(CONFIG_DEF, configs); maxPartitions = config.getInt(MAX_PARTITIONS_CONFIG); } @Override public SourceRecord apply(SourceRecord record) { // Compute the desired partition here int actualPartition = record.kafkaPartition(); int desiredPartition = ... // Then create the new record with all of the existing fields except with the new partition ... return record.newRecord(record.topic(), desiredPartition, record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp()); } @Override public ConfigDef config() { return CONFIG_DEF; } @Override public void close() { // do nothing }}
ConfigDef
和 AbstractConfig
功能非常有用,可以做很多更有趣的事情,包括使用自定义 validator 和推荐器,以及具有依赖的配置属性在其他属性上。如果您想了解更多相关信息,请查看一些也使用相同框架的现有 Kafka Connect 连接器。
最后一件事。在运行 Kafka Connect 独立或分布式 worker 时,但一定要将 CLASSPATH 环境变量设置为指向包含自定义 SMT 的 JAR 文件以及您的 SMT 依赖的任何 JAR 文件除了 Kafka 提供的那些。 connect-standalone.sh
和 connect-distributed.sh
命令会自动将 Kafka JAR 添加到类路径中。
关于java - 在 Kafka 连接器中设置分区策略,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44810221/
作者:小林coding 计算机八股文网站:https://xiaolincoding.com 大家好,我是小林。 今天跟大家聊聊,常见的缓存更新策略。 Cache Aside(旁路缓存)策略; Rea
我使用 git 多年,最近为了一个项目改用 mercurial。在过去的 6 个月里,我已经学会了如何通过命令行很好地使用 Mercurial。 这可能是我的想象,但在我看来,mercurial 在
这个问题适合任何熟悉的人 Node.js express Passport 带有 Passport 的 JWT 身份验证(JSON Web token ) Facebook OAuth2.0 或谷歌
在 Coq 中,当试图证明记录的相等性时,是否有一种策略可以将其分解为所有字段的相等性?例如, Record R := {x:nat;y:nat}. Variables a b c d : nat.
我正在处理的项目目前只有一个 Bootstrap 文件,用于初始化应用程序中的所有 javascript 对象。类似于下面的代码 if(document.getElementById('nav'))
我正在考虑使用 OpenLDAP 在首次登录时添加密码到期和强制更改密码。 似乎使用 ppolicy 覆盖来实现这一点。 当我在 ppolicy.schema 中看到这个时,我开始使用 ppolicy
这基本上是我昨天问的一个问题的重新陈述,因为我得到的一个答案似乎没有理解我的问题,所以我一定是不清楚。我的错。 因为 WPF 依赖于 DirectX,所以它对卡和驱动程序的内部非常敏感。我有一个案例,
我是单点登录(SSO)概念的新手。我开始知道 SAML 请求和响应是实现 SSO 流程的最佳方式。然后我开始阅读有关 SAML2.0 的信息。我来了一个术语 NameIdPolicy 在 saml1.
关闭。这个问题需要更多 focused .它目前不接受答案。 想改进这个问题?更新问题,使其仅关注一个问题 editing this post . 5年前关闭。 Improve this questi
关闭。这个问题是opinion-based 。目前不接受答案。 想要改进这个问题吗?更新问题,以便 editing this post 可以用事实和引文来回答它。 . 已关闭 9 年前。 Improv
在 Azure 上创建新的 SQL 数据库时,它将“计算+存储”选项设置为“2 vCore + 32GB 数据最大大小”作为默认配置,但我不想使用 vCore,我可以更改它。但问题是,是否可以通过策略
我希望创建一项策略,防止在未启用身份验证的情况下创建应用服务(仅审核它们是不够的)。 以下策略可以正确识别未启用身份验证的现有资源: { "mode": "All", "policyRule"
我正在尝试从现有 AuditIfNotExists 策略创建 DeployIfNotExists 策略。部署时不会出错,但会错误提示“没有相关资源与策略定义中的效果详细信息匹配”。当评估政策时。当我将
我正在尝试从现有 AuditIfNotExists 策略创建 DeployIfNotExists 策略。部署时不会出错,但会错误提示“没有相关资源与策略定义中的效果详细信息匹配”。当评估政策时。当我将
我正在使用 wunderground 的 json api 来查询我网站上的天气状况。 api 为我提供了一个包含所有必要数据的漂亮 json 对象,但我每天只能进行多次调用。存储这些数据的首选方式是
我有一个名为可视化数据结构的项目。我有这样的 OOP 设计。 Class VisualDataStructures extends JFrame Class ControlPanel extends
这个问题在这里已经有了答案: 关闭 14 年前。 副本: Use javascript to inject script references as needed? Javascript 没有任何指
Android 应用程序遇到了一些 ANR 问题,因此我实现了 StrictMode 策略。以前从未使用过这个,所以希望有人可以帮助解释以下内容: 为什么日志显示 2 个看似相似的违规行为,除了前 4
我目前正在尝试解决一个问题。假设我们在路上行驶,我们知道路上有 10 家酒店。每家酒店都有 0 到 6 星。我的问题是:找到选择星级酒店的最佳解决方案。唯一的问题是:您不能回头去参观您已经决定不去的酒
我正在将我的应用程序迁移到 MVP。从这个 konmik 中获得了有关静态演示者模式的提示 这是我的简要 MVP 策略。为简洁起见,删除了大部分样板和 MVP 监听器。这个策略帮助我改变了方向,证明了
我是一名优秀的程序员,十分优秀!