gpt4 book ai didi

elasticsearch - 如何在 kafka-connect 汇合平台的 elasticsearch sink 连接器配置中使用 ca cert?

转载 作者:行者123 更新时间:2023-12-03 02:11:00 28 4
gpt4 key购买 nike

我目前正在尝试在分布式模式下的 kafka-connect 集群上配置 elasticsearch sink 连接器。该集群使用 confluent 提供的 helm 图表部署在 kubernetes 中。
这是属性 json 文件。(我使用用户名 passwd 连接到 elastic-出于安全目的将其从 json 文件中删除)

  "name": "elasticsearch-sink-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"transforms": "dropPrefix",
"config.action.reload": "restart",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": "_audit_log",
"errors.deadletterqueue.topic.name": "_audit_log_dead_letter_queue",
"errors.deadletterqueue.topic.replication.factor": "1",
"transforms.dropPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex": ".*",
"transforms.dropPrefix.replacement": "audit_log",
"connection.url": "https://path_to_elastic_cloud:9200",
"auto.create.indices.at.start": "false",
"type.name": "",
"key.ignore": "true",
"schema.ignore": "true",
"drop.invalid.message": "true",
"elastic.ca.cert.path": "/opt/xyz/elastic/certs/tls.crt",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false"
}
}```

but here are the error logs from the connect cluster.

[INFO] 2020-11-04 19:13:53,384 [task-thread-Brians-0] io.searchbox.client.AbstractJestClient setServers - Setting server pool to a list of 1 servers: [https://path_to_elastic_clound:9200]
[INFO] 2020-11-04 19:13:53,385 [task-thread-Brians-0] io.searchbox.client.JestClientFactory getConnectionManager - Using multi thread/connection supporting pooling connection manager
[INFO] 2020-11-04 19:13:53,386 [task-thread-Brians-0] io.searchbox.client.JestClientFactory getObject - Using default GSON instance
[INFO] 2020-11-04 19:13:53,386 [task-thread-Brians-0] io.searchbox.client.JestClientFactory getObject - Node Discovery disabled...
[INFO] 2020-11-04 19:13:53,386 [task-thread-Brians-0] io.searchbox.client.JestClientFactory getObject - Idle connection reaping enabled...
[INFO] 2020-11-04 19:13:53,387 [task-thread-Brians-0] io.searchbox.client.JestClientFactory getObject - Authentication cache set for preemptive authentication
[ERROR] 2020-11-04 19:13:53,410 [task-thread-Brians-0] org.apache.kafka.connect.runtime.WorkerTask doRun - WorkerSinkTask{id=Brians-0} Task threw an uncaught and unrecoverable exception
org.apache.kafka.connect.errors.ConnectException: Couldn't start ElasticsearchSinkTask due to connection error:
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:168)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:152)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:74)
at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.start(ElasticsearchSinkTask.java:48)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:302)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:326)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:269)
at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:264)
at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1339)
at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1214)
at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1157)
at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:422)
at java.base/sun.security.ssl.TransportContext.dispatch(TransportContext.java:183)
at java.base/sun.security.ssl.SSLTransport.decode(SSLTransport.java:171)
at java.base/sun.security.ssl.SSLSocketImpl.decode(SSLSocketImpl.java:1403)
at java.base/sun.security.ssl.SSLSocketImpl.readHandshakeRecord(SSLSocketImpl.java:1309)
at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:440)
at java.base/sun.security.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:411)
at org.apache.http.conn.ssl.SSLConnectionSocketFactory.createLayeredSocket(SSLConnectionSocketFactory.java:396)
at org.apache.http.conn.ssl.SSLConnectionSocketFactory.connectSocket(SSLConnectionSocketFactory.java:355)
at org.apache.http.impl.conn.DefaultHttpClientConnectionOperator.connect(DefaultHttpClientConnectionOperator.java:142)
at org.apache.http.impl.conn.PoolingHttpClientConnectionManager.connect(PoolingHttpClientConnectionManager.java:359)
at org.apache.http.impl.execchain.MainClientExec.establishRoute(MainClientExec.java:381)
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:237)
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185)
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89)
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111)
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83)
at io.searchbox.client.http.JestHttpClient.executeRequest(JestHttpClient.java:133)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:70)
at io.searchbox.client.http.JestHttpClient.execute(JestHttpClient.java:63)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.getServerVersion(JestElasticsearchClient.java:316)
at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.<init>(JestElasticsearchClient.java:161)
... 12 more
Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:439)
at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:306)
at java.base/sun.security.validator.Validator.validate(Validator.java:264)
at java.base/sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:222)
at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:129)
at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1323)
... 39 more
Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
at java.base/sun.security.provider.certpath.SunCertPathBuilder.build(SunCertPathBuilder.java:141)
at java.base/sun.security.provider.certpath.SunCertPathBuilder.engineBuild(SunCertPathBuilder.java:126)
at java.base/java.security.cert.CertPathBuilder.build(CertPathBuilder.java:297)
at java.base/sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:434)```

any pointers are deeply appreciated. Thank you.

最佳答案

此问题已得到解决。早些时候我们没有使用任何 key 到 keystore 。在遵循 github 上这张票的说明后,终于能够将这些添加到配置中(不是真实值)。很有帮助。
https://github.com/confluentinc/kafka-connect-elasticsearch/issues/432

    "elastic.security.protocol": "SSL",
"elastic.https.ssl.keystore.location": "/mnt/secrets/elastic/keystore.jks",
"elastic.https.ssl.keystore.password": "changeit",
"elastic.https.ssl.truststore.location": "/mnt/secrets//elastic/truststore.jks",
"elastic.https.ssl.truststore.password": "changeit"

关于elasticsearch - 如何在 kafka-connect 汇合平台的 elasticsearch sink 连接器配置中使用 ca cert?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64686425/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com