gpt4 book ai didi

elasticsearch - 从MSSQL vis kafka连接器到具有嵌套类型的elasticsearch的批量数据更新失败

转载 作者:行者123 更新时间:2023-12-03 02:23:58 25 4
gpt4 key购买 nike

MSSQL值:列名称= prop->值= 100,列名称=角色->值= [{“role”:“actor”},{“role”:“director”}]
注意:column:role以json格式保存。

从kafka主题阅读:

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"prop"
},
{
"type":"string",
"optional":true,
"field":"roles"
}
],
"optional":false
},
"payload":{ "prop":100, "roles":"[{"role":"actor"},{"role":"director"}]"}

失败,原因如下:
Error was [{"type":"mapper_parsing_exception","reason":"object mapping for [roles] tried to parse field [roles] as object, but found a concrete value"}

失败的原因是连接器无法将架构创建为角色的数组

上面的输入消息是由融合的JdbcSourceConnector创建的,并且使用的接收器连接器是融合的ElasticsearchSinkConnector

配置详细信息:

接收器配置:
name=prop-test
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
connection.url=<elasticseach url>

tasks.max=1

topics=test_prop
type.name=prop

#transforms=InsertKey, ExtractId

transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=prop

transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=prop

源配置:
name=test_prop_source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:sqlserver://*.*.*.*:1433;instance=databaseName=test;
connection.user=*****
connection.password=*****
query=EXEC <store proc>
mode=bulk
batch.max.rows=2000000
topic.prefix=test_prop
transforms=createKey,extractInt
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=prop

transforms.extractInt.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractInt.field=prop

connect-standalone.properties:
    bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

需要了解如何明确地使模式成为角色而不是字符串的阵列。

最佳答案

jdbc Source连接器将始终将列名视为字段名并将列值视为值的事实。现有的jdbc源连接器支持无法实现从字符串到数组的转换,它必须是自定义转换或自定义插件才能启用。

用于从MSSQL传输数据并将其插入到Elastic Search中的最佳选择是使用logstash。它具有丰富的过滤器插件,可以使来自MSSQL的数据以任何所需的格式流到任何所需的JSON输出环境(logstash / kafka主题)

流程:MSSQL-> logstash-> Kakfa主题-> Kafka flex 接收器连接器-> Elasticsearch

关于elasticsearch - 从MSSQL vis kafka连接器到具有嵌套类型的elasticsearch的批量数据更新失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61618861/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com