gpt4 book ai didi

elasticsearch - 自定义 Kafka Connect - ElasticSearch Sink Connector

转载 作者:行者123 更新时间:2023-12-02 22:12:35 25 4
gpt4 key购买 nike

我有 Kafka 主题,其中有多种类型的消息流入并使用 Kafka Connect 写入 Elastic Search。流式传输看起来不错,直到我必须将唯一的消息集分成唯一的索引。 IE。我必须根据字段(是 JSON 消息)获取新数据集的新索引。

如何配置/自定义 Kafka 连接来为我做同样的事情?每条消息都包含一个字段,表示消息的类型和时间戳。

示例 Json 如下所示:示例 1:{"log":{"data":"information", "version":"1.1"}, "type":"xyz", "timestamp":"2019-08-28t10:07:40.370 z", "值":{}} ,

示例 2:{"log":{"data":"information", "version":"1.1", "value":{}}, "type":"abc", "timestamp": “2019-08-28t10:07:40.370z”

我想自定义/配置 Kafka connect ES 接收器,将 Sample1 文档写入索引“xyz.20190828”,将 Sample2 文档写入索引“abc.20190828”。

我正在使用 Kafka-2.2.0 和 confluentinc-kafka-connect-elasticsearch-5.2.1 插件。

感谢您的帮助。

最佳答案

您可以使用您需要的自定义单消息转换 (SMT) 来执行此操作 write yourself .通过根据内容更改消息的主题,您会将其路由到不同的 Elasticsearch 索引。

目前 Apache Kafka 附带一个 SMT,它可以重命名整个主题 (RegExRouter) 或添加时间戳 (TimestampRouter)。您可能会发现这些是编写您自己的有用的起点。

另一种选择是@wardzniak 在他的评论中建议的——在使用 Kafka Connect 将生成的单独主题发送到 Elasticsearch 之前,使用流处理(例如 Kafka Streams、KSQL)预处理源主题。

关于elasticsearch - 自定义 Kafka Connect - ElasticSearch Sink Connector,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57784452/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com