gpt4 book ai didi

elasticsearch - Confluent kafka connect elasticsearch文档ID创建

转载 作者:行者123 更新时间:2023-12-03 00:32:04 25 4
gpt4 key购买 nike

我正在使用 confluent 连接我的数据库和 ES,因为出现异常:

org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id.
at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:75)
at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:84)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:210)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我在 kafka-connect-JDBC 中的配置是: .
name=task-view-list-stage
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10
connection.url=jdbc:postgresql://localhost:5432/postgres?user=postgres&password=test
table.types=TABLE
query=select * from employee_master
mode=timestamp+incrementing
incrementing.column.name=employee_master_id
timestamp.column.name=modified_date
validate.non.null=false
topic.prefix=my-id-app

而我的 kafka-connect Elasticsearch 配置是:
name=es-id-view
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=my-id-app
topics.key.ignore=false
transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=employee_master_id
connection.url=http://localhost:9200
type.name=type_id

我的表结构是:
employee_master_id | emp_name | modified_date
-----------------------------------------------------------
1 | Bala | "2017-05-18 11:51:46.721182+05:30"
-------------------------------------------------------------------
2 | murugan | "2017-05-21 15:59:11.443901+05:30"
-------------------------------------------------------------------

请帮我解决这个问题

最佳答案

以及 ValueToKey你需要ExtractField将键从对象转换为普通字段:

transforms=InsertKey,ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=employee_master_id
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=employee_master_id

关于elasticsearch - Confluent kafka connect elasticsearch文档ID创建,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44096936/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com