gpt4 book ai didi

elasticsearch - kafka-connect接收器中的ExtractField和Parse JSON

转载 作者:行者123 更新时间:2023-12-02 23:17:24 26 4
gpt4 key购买 nike

我有mongodb-> kafka connect-> elasticsearch的kafka-connect流,可以从头到尾发送数据,但是有效负载文档是JSON编码的。这是我的原始mongodb文件。

{
"_id": "1541527535911",
"enabled": true,
"price": 15.99,
"style": {
"color": "blue"
},
"tags": [
"shirt",
"summer"
]
}

这是我的mongodb源连接器配置:
{
"name": "redacted",
"config": {
"connector.class": "com.teambition.kafka.connect.mongo.source.MongoSourceConnector",
"databases": "redacted.redacted",
"initial.import": "true",
"topic.prefix": "redacted",
"tasks.max": "8",
"batch.size": "1",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.JSONSerializer",
"key.serializer.schemas.enable": false,
"value.serializer.schemas.enable": false,
"compression.type": "none",
"mongo.uri": "mongodb://redacted:27017/redacted",
"analyze.schema": false,
"schema.name": "__unused__",
"transforms": "RenameTopic",
"transforms.RenameTopic.type":
"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.RenameTopic.regex": "redacted.redacted_Redacted",
"transforms.RenameTopic.replacement": "redacted"
}
}

在elasticsearch中,最终看起来像这样:
{
"_index" : "redacted",
"_type" : "kafka-connect",
"_id" : "{\"schema\":{\"type\":\"string\",\"optional\":true},\"payload\":\"1541527535911\"}",
"_score" : 1.0,
"_source" : {
"ts" : 1541527536,
"inc" : 2,
"id" : "1541527535911",
"database" : "redacted",
"op" : "i",
"object" : "{ \"_id\" : \"1541527535911\", \"price\" : 15.99,
\"enabled\" : true, \"tags\" : [\"shirt\", \"summer\"],
\"style\" : { \"color\" : \"blue\" } }"
}
}

我想使用2个单一消息转换:
  • ExtractField抓取object,它是JSON的字符串
  • 一种东西,可以将JSON解析为一个对象,或者让普通的JSONConverter处理它,只要它在Elasticsearch中结构正确即可。

  • 我已经尝试在接收器配置中仅使用 ExtractField来执行此操作,但是我看到kafka记录了此错误
    kafka-connect_1       | org.apache.kafka.connect.errors.ConnectException:
    Bulk request failed: [{"type":"mapper_parsing_exception",
    "reason":"failed to parse",
    "caused_by":{"type":"not_x_content_exception",
    "reason":"Compressor detection can only be called on some xcontent bytes or
    compressed xcontent bytes"}}]

    这是我的elasticsearch接收器连接器配置。在此版本中,我可以正常工作,但必须编写自定义的ParseJson SMT。它运行良好,但是如果有更好的方法或某种方式将内置的东西(转换器,SMT或其他可行的东西)组合在一起使用,我很乐意看到这一点。
    {
    "name": "redacted",
    "config": {
    "connector.class":
    "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "batch.size": 1,
    "connection.url": "http://redacted:9200",
    "key.converter.schemas.enable": true,
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "schema.ignore": true,
    "tasks.max": "1",
    "topics": "redacted",
    "transforms": "ExtractFieldPayload,ExtractFieldObject,ParseJson,ReplaceId",
    "transforms.ExtractFieldPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.ExtractFieldPayload.field": "payload",
    "transforms.ExtractFieldObject.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
    "transforms.ExtractFieldObject.field": "object",
    "transforms.ParseJson.type": "reaction.kafka.connect.transforms.ParseJson",
    "transforms.ReplaceId.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
    "transforms.ReplaceId.renames": "_id:id",
    "type.name": "kafka-connect",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false
    }
    }

    最佳答案

    我不确定您的Mongo连接器。我无法识别类(class)或配置...大多数人可能都使用Debezium Mongo connector

    我会这样设置

    "connector.class": "com.teambition.kafka.connect.mongo.source.MongoSourceConnector",

    "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
    "value.serializer": "org.apache.kafka.common.serialization.JSONSerializer",
    "key.serializer.schemas.enable": false,
    "value.serializer.schemas.enable": true,
    schemas.enable很重要,因此内部Connect数据类可以知道如何与其他格式进行转换。

    然后,在接收器中,您再次需要使用JSON De 序列化器(通过转换器),以便它创建完整的对象而不是纯文本字符串,如在Elasticsearch( {\"schema\":{\"type\":\"string\")中看到的那样。
    "connector.class":
    "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",

    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": true

    如果这不起作用,那么您可能必须提前在Elasticsearch中手动创建索引映射,以便它知道如何实际解析要发送的字符串

    关于elasticsearch - kafka-connect接收器中的ExtractField和Parse JSON,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53177863/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com