gpt4 book ai didi

apache-kafka - 创建和使用自定义 kafka 连接配置提供程序

转载 作者:行者123 更新时间:2023-12-03 21:18:13 24 4
gpt4 key购买 nike

我已经在分布式模式下安装并测试了 kafka connect,它现在可以工作,它连接到配置的接收器并从配置的源读取。
在这种情况下,我开始改进我的安装。我认为需要立即关注的一个领域是,要创建连接器,唯一可用的方法是通过 REST 调用,这意味着我需要通过线路发送我的信息,不 protected 。
为了确保这一点,kafka 引入了新的 ConfigProvider,见 here .
这很有帮助,因为它允许在服务器中设置属性,然后在 rest 调用中引用它们,如下所示:

{
.
.
"property":"${file:/path/to/file:nameOfThePropertyInFile}"
.
.
}

这非常有效,只需在服务器上添加属性文件并在 Distributed.properties 文件中添加以下配置:
config.providers=file   # multiple comma-separated provider types can be specified here
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider

虽然这个解决方案有效,但它确实无助于缓解我对安全性的担忧,因为信息现在从通过网络发送到现在位于存储库中,并且每个人都可以看到文本。
kafka 团队预见到了这个问题,并允许客户端生成自己的配置提供程序来实现接口(interface) ConfigProvider。
我已经创建了自己的实现并打包在一个 jar 中,并为其指定最终名称:
META-INF/services/org.apache.kafka.common.config.ConfigProvider

并在分发的文件中添加了以下条目:
config.providers=cust
config.providers.cust.class=com.somename.configproviders.CustConfigProvider

但是我从连接中收到一个错误,说明一个实现 ConfigProvider 的类,名称为:
com.somename.configproviders.CustConfigProvider

找不到。
我现在不知所措,因为他们网站上的文档没有明确说明如何很好地配置自定义配置提供程序。

有没有人研究过类似的问题并且可以提供一些见解?任何帮助,将不胜感激。

最佳答案

我最近刚刚通过这些设置了一个自定义 ConfigProvider。官方文档模棱两可且令人困惑。

I have created my own implementation and packaged in a jar, givin it the sugested final name: META-INF/services/org.apache.kafka.common.config.ConfigProvider



您可以随意命名 jar 的最终名称,但需要打包为具有 .jar 后缀的 jar 格式。

这是完整的一步一步。假设您的自定义 ConfigProvider 完全限定名称是 com.my.CustomConfigProvider.MyClass .
1.在目录下创建一个文件:META-INF/services/org.apache.kafka.common.config.ConfigProvider。文件内容是全限定类名:
com.my.CustomConfigProvider.MyClass
  • 包括你的源代码,以及上面的 META-INF 文件夹来生成一个 Jar 包。如果您使用 Maven,文件结构类似于 this
  • 将您的最终 Jar 文件(例如 custom-config-provider-1.0.jar)放在 Kafka worker 插件文件夹下。默认为/usr/share/java。 Kafka worker 配置文件中的 PLUGIN_PATH。
  • 也将所有依赖项 jar 上传到 PLUGIN_PATH。使用 Jar 文件中的 META-INFO/MANIFEST.MF 文件来配置代码将使用的依赖 jar 的“ClassPath”。
  • 在 kafka worker 配置文件中,创建两个附加属性:
  • CONNECT_CONFIG_PROVIDERS: 'mycustom', // Alias name of your ConfigProvider
    CONNECT_CONFIG_PROVIDERS_MYCUSTOM_CLASS:'com.my.CustomConfigProvider.MyClass',
  • 重启worker
  • 通过将 POST curl 到 Kafka Restful API 来更新您的连接器配置文件。在连接器配置文件中,您可以引用 ConfigData 中的值从 ConfigProvider:get(path, keys) 返回通过使用如下语法:
  • database.password=${mycustom:/path/pass/to/get/method:password}
    ConfigData是一个包含 {password: 123} 的 HashMap
  • 如果您仍然看到 ClassNotFound 异常,可能是您的 ClassPath 设置不正确。

  • 笔记:
    • 如果您使用AWS ECS/EC2,您需要通过设置环境变量来设置工作器配置文件。
    • 工作人员配置和连接器配置文件不同。

    关于apache-kafka - 创建和使用自定义 kafka 连接配置提供程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57995742/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com