gpt4 book ai didi

elasticsearch - “index_already_exists_exception”-从Kafka到Elastic Search(SSL)

转载 作者:行者123 更新时间:2023-12-02 22:35:49 24 4
gpt4 key购买 nike

我们已经有kafka flex 搜索接收器连接器,用于将数据传输到 flex 搜索(v 5.6.3 )。

我正在使用融合v5.0.0,但没有看到其他错误。我已经删除了索引并重新启动了 flex 搜索连接器。但是仍然出现相同的错误

连接器配置如下:

{
"name":"elasticsearch_topic",
"config":
{
"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max":"3",
"topics":"sample_topic",
"connection.url":"https://127.0.0.1:9200,https://127.0.0.2:9200",
"connection.username":"elsatic_user",
"connection.password":"elastic_user",
"type.name":"log",
"flush.timeout.ms":"60000",
"connection.timeout.ms":"60000",
"read.timeout.ms":"60000",
"batch.size":"20",
"topic.index.map":"sample_topic:elastic_search_index_test",
"transforms":"extract,insertenv,inserttimestamp,convert_current_ts,routeTS",
"schema.ignore": "true",
"transforms.extract.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extract.field":"RE_NUM",
"transforms.insertenv.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertenv.static.field": "_env",
"transforms.insertenv.static.value": "dev",
"transforms.inserttimestamp.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.inserttimestamp.timestamp.field": "date_time",
"transforms.convert_current_ts.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.convert_current_ts.target.type": "Timestamp",
"transforms.convert_current_ts.field": "date_time",
"transforms.convert_current_ts.format": "yyyy-MM-dd HH:mm:ss.SSSSSS",
"transforms.routeTS.type":"org.apache.kafka.connect.transforms.TimestampRouter",
"transforms.routeTS.topic.format":"elastic_search_index_test-${timestamp}",
"transforms.routeTS.timestamp.format":"yyyyMMdd"
}
}

到现在为止还挺好。没有问题。

最近,我们在 flex 搜索上启用了SSL,为此,我在上述配置中添加了“用户名”,“密码”和“https”。然后重新启动了连接器和工作器。
从那时起,我可以看到“ index_already_exists_exception ”错误,如下所示:
[2018-12-06 03:36:21,487] ERROR WorkerSinkTask{id=elasticsearch_topic-1} 
Task threw an uncaught and unrecoverable exception
(org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Could not create index
'elastic_search_index_test': {"root_cause":
[{"type":"index_already_exists_exception","reason":"index
[elastic_search_index_test/QVgWV8E7RmuSArtIJt3m3g] already exists","index_uuid":"QVgWV8E7RmuSArtIJt3m3g","index":"elastic_search_index_test"}],"type":"index_already_exists_exception","reason":"index [elastic_search_index_test/QVgWV8E7RmuSArtIJt3m3g] already exists","index_uuid":"QVgWV8E7RmuSArtIJt3m3g","index":"elastic_search_index_test"}
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createIndices(JestElasticsearchClient.java:238)
at io.confluent.connect.elasticsearch.ElasticsearchWriter.createIndicesForTopics(ElasticsearchWriter.java:330)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.open(ElasticsearchSinkTask.java:157)
at org.apache.kafka.connect.runtime.WorkerSinkTask.openPartitions(WorkerSinkTask.java:612)
at org.apache.kafka.connect.runtime.WorkerSinkTask.access$1100(WorkerSinkTask.java:69)
at org.apache.kafka.connect.runtime.WorkerSinkTask$HandleRebalance.onPartitionsAssigned(WorkerSinkTask.java:672)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:283)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:343)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at org.apache.kafka.connect.runtime.WorkerSinkTask.pollConsumer(WorkerSinkTask.java:444)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:317)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-12-06 03:36:21,487] ERROR WorkerSinkTask{id=elasticsearch_topic-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)

我到目前为止已经尝试过的步骤:
  • 停止了 flex 搜索接收器连接器和工作程序
  • (通过Kibana)从 flex 搜索中删除了索引“elastic_search_index_test”
  • 重新启动工作程序和 flex 搜索连接器

  • 但是仍然出现相同的错误(如上所述)

    有人可以提示出什么问题了吗?

    提前致谢!!

    最佳答案

    当启动具有多个任务的连接器时,这是非常常见的错误(在当前情况下为“tasks.max”:“3”)。

    内部步骤kafka-connect-elasticsearch

  • kafka-connect-elasticsearch将检查indexe是否不存在
  • 如果ES缺少
  • ,它将创建索引

    问题:-

    该存储区正在运行3个任务(意味着3个线程执行相同的代码),并且有多个任务发现索引不存在并继续创建索引。
    第一个任务成功执行,第二个任务将引发未找到索引异常,因为它已由第一个任务创建。

    解决方案:-
  • 通过一个任务“tasks.max”启动连接器:“1”(如果我们有大量数据,这是错误的选择)
  • 在运行连接器
  • 之前在Es中创建索引
  • 使用分布式锁(如zookeeper)
  • 关于elasticsearch - “index_already_exists_exception”-从Kafka到Elastic Search(SSL),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53650730/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com