gpt4 book ai didi

ssl - kafka - ssl握手失败

转载 作者:行者123 更新时间:2023-12-04 22:39:51 24 4
gpt4 key购买 nike

我已经在我的本地 Kafka 实例上设置了 SSL,当我在 SSL 端口上启动 Kafka 控制台生产者/消费者时,它给出了 SSL 握手错误

Karans-MacBook-Pro:keystore karanalang$ $CONFLUENT_HOME/bin/kafka-console-producer --broker-list localhost:9093 --topic karantest --producer.config $CONFLUENT_HOME/props/client-ssl.properties
>[2021-11-10 13:15:09,824] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-11-10 13:15:09,826] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-11-10 13:15:10,018] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2021-11-10 13:15:10,019] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9093 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2021-11-10 13:15:10,195] ERROR [Producer clientId=console-producer] Connection to node -1 (localhost/127.0.0.1:9093) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
以下是所做的更改:
  • 创建信任库和 keystore

  • 这是用于检查 SSL 连接的 openssl 命令的输出:
    Karans-MacBook-Pro:keystore karanalang$ openssl s_client -debug -connect localhost:9093 -tls1
    CONNECTED(00000005)
    write to 0x13d7bdf90 [0x13e01ea03] (118 bytes => 118 (0x76))
    0000 - 16 03 01 00 71 01 00 00-6d 03 01 81 e8 00 cd c4 ....q...m.......
    0010 - 04 4b 64 86 3e 30 97 32-c3 66 3a 8c ed 05 bf 97 .Kd.>0.2.f:.....
    0020 - ff d5 b2 a4 26 fe 99 c0-7f 94 a1 00 00 2e c0 14 ....&...........
    0030 - c0 0a 00 39 ff 85 00 88-00 81 00 35 00 84 c0 13 ...9.......5....
    ---
    0076 - <SPACES/NULS>
    read from 0x13d7bdf90 [0x13e01a803] (5 bytes => 5 (0x5))
    0005 - <SPACES/NULS>
    4307385836:error:1400410B:SSL routines:CONNECT_CR_SRVR_HELLO:wrong version number:/System/Volumes/Data/SWE/macOS/BuildRoots/e90674e518/Library/Caches/com.apple.xbs/Sources/libressl/libressl-56.60.2/libressl-2.8/ssl/ssl_pkt.c:386:
    ---
    no peer certificate available
    ---
    No client certificate CA names sent
    ---
    SSL handshake has read 5 bytes and written 0 bytes
    ---
    New, (NONE), Cipher is (NONE)
    Secure Renegotiation IS NOT supported
    Compression: NONE
    Expansion: NONE
    No ALPN negotiated
    SSL-Session:
    Protocol : TLSv1
    Cipher : 0000
    Session-ID:
    Session-ID-ctx:
    Master-Key:
    Start Time: 1636579015
    Timeout : 7200 (sec)
    Verify return code: 0 (ok)
    ---

    这是 server.properties :
    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements. See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License. You may obtain a copy of the License at
    #
    # http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.

    # see kafka.server.KafkaConfig for additional details and defaults

    ############################# Server Basics #############################

    # The id of the broker. This must be set to a unique integer for each broker.
    broker.id=0

    ############################# Socket Server Settings #############################

    # The address the socket server listens on. It will get the value returned from
    # java.net.InetAddress.getCanonicalHostName() if not configured.
    # FORMAT:
    # listeners = listener_name://host_name:port
    # EXAMPLE:
    # listeners = PLAINTEXT://your.host.name:9092
    # SSL CHANGE
    listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093

    # Hostname and port the broker will advertise to producers and consumers. If not set,
    # it uses the value for "listeners" if configured. Otherwise, it will use the value
    # returned from java.net.InetAddress.getCanonicalHostName().
    # SSL CHANGE
    advertised.listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
    ssl.client.auth=none

    # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
    listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

    # The number of threads that the server uses for receiving requests from the network and sending responses to the network
    num.network.threads=3

    # The number of threads that the server uses for processing requests, which may include disk I/O
    num.io.threads=8

    # The send buffer (SO_SNDBUF) used by the socket server
    socket.send.buffer.bytes=102400

    # The receive buffer (SO_RCVBUF) used by the socket server
    socket.receive.buffer.bytes=102400

    # The maximum size of a request that the socket server will accept (protection against OOM)
    socket.request.max.bytes=104857600


    ############################# Log Basics #############################

    # A comma separated list of directories under which to store log files
    log.dirs=/tmp/kafka-logs

    # The default number of log partitions per topic. More partitions allow greater
    # parallelism for consumption, but this will also result in more files across
    # the brokers.
    num.partitions=1

    # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
    # This value is recommended to be increased for installations with data dirs located in RAID array.
    num.recovery.threads.per.data.dir=1

    ############################# Internal Topic Settings #############################
    # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
    # For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1

    ############################# Log Flush Policy #############################

    # Messages are immediately written to the filesystem but by default we only fsync() to sync
    # the OS cache lazily. The following configurations control the flush of data to disk.
    # There are a few important trade-offs here:
    # 1. Durability: Unflushed data may be lost if you are not using replication.
    # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
    # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
    # The settings below allow one to configure the flush policy to flush data after a period of time or
    # every N messages (or both). This can be done globally and overridden on a per-topic basis.

    # The number of messages to accept before forcing a flush of data to disk
    #log.flush.interval.messages=10000

    # The maximum amount of time a message can sit in a log before we force a flush
    #log.flush.interval.ms=1000

    ############################# Log Retention Policy #############################

    # The following configurations control the disposal of log segments. The policy can
    # be set to delete segments after a period of time, or after a given size has accumulated.
    # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
    # from the end of the log.

    # The minimum age of a log file to be eligible for deletion due to age
    log.retention.hours=168

    # A size-based retention policy for logs. Segments are pruned from the log unless the remaining
    # segments drop below log.retention.bytes. Functions independently of log.retention.hours.
    #log.retention.bytes=1073741824

    # The maximum size of a log segment file. When this size is reached a new log segment will be created.
    log.segment.bytes=1073741824

    # The interval at which log segments are checked to see if they can be deleted according
    # to the retention policies
    log.retention.check.interval.ms=300000

    ############################# Zookeeper #############################

    # Zookeeper connection string (see zookeeper docs for details).
    # This is a comma separated host:port pairs, each corresponding to a zk
    # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
    # You can also append an optional chroot string to the urls to specify the
    # root directory for all kafka znodes.
    zookeeper.connect=localhost:2181

    # Timeout in ms for connecting to zookeeper
    zookeeper.connection.timeout.ms=18000

    ##################### Confluent Metrics Reporter #######################
    # Confluent Control Center and Confluent Auto Data Balancer integration
    #
    # Uncomment the following lines to publish monitoring data for
    # Confluent Control Center and Confluent Auto Data Balancer
    # If you are using a dedicated metrics cluster, also adjust the settings
    # to point to your metrics kakfa cluster.
    #metric.reporters=io.confluent.metrics.reporter.ConfluentMetricsReporter
    #confluent.metrics.reporter.bootstrap.servers=localhost:9092
    #
    # Uncomment the following line if the metrics cluster has a single broker
    #confluent.metrics.reporter.topic.replicas=1

    ############################# Group Coordinator Settings #############################

    # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
    # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
    # The default value for this is 3 seconds.
    # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
    # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
    group.initial.rebalance.delay.ms=0


    ############################# Confluent Authorizer Settings #############################

    # Uncomment to enable Confluent Authorizer with support for ACLs, LDAP groups and RBAC
    #authorizer.class.name=io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer
    # Semi-colon separated list of super users in the format <principalType>:<principalName>
    #super.users=
    # Specify a valid Confluent license. By default free-tier license will be used
    #confluent.license=
    # Replication factor for the topic used for licensing. Default is 3.
    confluent.license.topic.replication.factor=1

    # Uncomment the following lines and specify values where required to enable CONFLUENT provider for RBAC and centralized ACLs
    # Enable CONFLUENT provider
    #confluent.authorizer.access.rule.providers=ZK_ACL,CONFLUENT
    # Bootstrap servers for RBAC metadata. Must be provided if this broker is not in the metadata cluster
    #confluent.metadata.bootstrap.servers=PLAINTEXT://127.0.0.1:9092
    # Replication factor for the metadata topic used for authorization. Default is 3.
    confluent.metadata.topic.replication.factor=1

    # Replication factor for the topic used for audit logs. Default is 3.
    confluent.security.event.logger.exporter.kafka.topic.replicas=1

    # Listeners for metadata server
    #confluent.metadata.server.listeners=http://0.0.0.0:8090
    # Advertised listeners for metadata server
    #confluent.metadata.server.advertised.listeners=http://127.0.0.1:8090

    ############################# Confluent Data Balancer Settings #############################

    # The Confluent Data Balancer is used to measure the load across the Kafka cluster and move data
    # around as necessary. Comment out this line to disable the Data Balancer.
    confluent.balancer.enable=true

    # By default, the Data Balancer will only move data when an empty broker (one with no partitions on it)
    # is added to the cluster or a broker failure is detected. Comment out this line to allow the Data
    # Balancer to balance load across the cluster whenever an imbalance is detected.
    #confluent.balancer.heal.uneven.load.trigger=ANY_UNEVEN_LOAD

    # The default time to declare a broker permanently failed is 1 hour (3600000 ms).
    # Uncomment this line to turn off broker failure detection, or adjust the threshold
    # to change the duration before a broker is declared failed.
    #confluent.balancer.heal.broker.failure.threshold.ms=-1

    # Edit and uncomment the following line to limit the network bandwidth used by data balancing operations.
    # This value is in bytes/sec/broker. The default is 10MB/sec.
    #confluent.balancer.throttle.bytes.per.second=10485760

    # Capacity Limits -- when set to positive values, the Data Balancer will attempt to keep
    # resource usage per-broker below these limits.
    # Edit and uncomment this line to limit the maximum number of replicas per broker. Default is unlimited.
    #confluent.balancer.max.replicas=10000

    # Edit and uncomment this line to limit what fraction of the log disk (0-1.0) is used before rebalancing.
    # The default (below) is 85% of the log disk.
    #confluent.balancer.disk.max.load=0.85

    # Edit and uncomment these lines to define a maximum network capacity per broker, in bytes per
    # second. The Data Balancer will attempt to ensure that brokers are using less than this amount
    # of network bandwidth when rebalancing.
    # Here, 10MB/s. The default is unlimited capacity.
    #confluent.balancer.network.in.max.bytes.per.second=10485760
    #confluent.balancer.network.out.max.bytes.per.second=10485760

    # Edit and uncomment this line to identify specific topics that should not be moved by the data balancer.
    # Removal operations always move topics regardless of this setting.
    #confluent.balancer.exclude.topic.names=

    # Edit and uncomment this line to identify topic prefixes that should not be moved by the data balancer.
    # (For example, a "confluent.balancer" prefix will match all of "confluent.balancer.a", "confluent.balancer.b",
    # "confluent.balancer.c", and so on.)
    # Removal operations always move topics regardless of this setting.
    #confluent.balancer.exclude.topic.prefixes=

    # The replication factor for the topics the Data Balancer uses to store internal state.
    # For anything other than development testing, a value greater than 1 is recommended to ensure availability.
    # The default value is 3.
    confluent.balancer.topic.replication.factor=1

    ################################## Confluent Telemetry Settings ##################################

    # To start using Telemetry, first generate a Confluent Cloud API key/secret. This can be done with
    # instructions at https://docs.confluent.io/current/cloud/using/api-keys.html. Note that you should
    # be using the '--resource cloud' flag.
    #
    # After generating an API key/secret, to enable Telemetry uncomment the lines below and paste
    # in your API key/secret.
    #
    #confluent.telemetry.enabled=true
    #confluent.telemetry.api.key=<CLOUD_API_KEY>
    #confluent.telemetry.api.secret=<CCLOUD_API_SECRET>

    ############ SSL #################

    ssl.truststore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/truststore/kafka.truststore.jks
    ssl.truststore.password=test123
    ssl.keystore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/keystore/kafka.keystore.jks
    ssl.keystore.password=test123
    ssl.key.password=test123

    # confluent.metrics.reporter.bootstrap.servers=localhost:9093
    # confluent.metrics.reporter.security.protocol=SSL
    # confluent.metrics.reporter.ssl.truststore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/truststore/kafka.truststore.jks
    # confluent.metrics.reporter.ssl.truststore.password=test123
    # confluent.metrics.reporter.ssl.keystore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/keystore/kafka.keystore.jks
    # confluent.metrics.reporter.ssl.keystore.password=test123
    # confluent.metrics.reporter.ssl.key.password=test123

    客户端-ssl.properties:
    bootstrap.servers=localhost:9093
    security.protocol=SSL
    ssl.truststore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/truststore/kafka.truststore.jks
    ssl.truststore.password=test123
    ssl.keystore.location=/Users/karanalang/Documents/Technology/confluent-6.2.1/ssl_certs/keystore/kafka.keystore.jks
    ssl.keystore.password=test123
    ssl.key.password=test123
    启动控制台生产者/消费者的命令:
    $CONFLUENT_HOME/bin/kafka-console-producer --broker-list localhost:9093 --topic karantest --producer.config $CONFLUENT_HOME/props/client-ssl.properties
    $CONFLUENT_HOME/bin/kafka-console-consumer --bootstrap-server localhost:9093 --topic karantest --consumer.config $CONFLUENT_HOME/props/client-ssl.properties --from-beginning
    关于如何解决这个问题的任何想法?
    更新 :
    这是我尝试调试时的错误(使用-export KAFKA_OPTS=-Djavax.net.debug=all)
    javax.net.ssl|DEBUG|0E|kafka-producer-network-thread | console-producer|2021-11-10 14:04:26.107 PST|SSLExtensions.java:173|Ignore unavailable extension: status_request
    javax.net.ssl|DEBUG|0E|kafka-producer-network-thread | console-producer|2021-11-10 14:04:26.107 PST|SSLExtensions.java:173|Ignore unavailable extension: status_request
    javax.net.ssl|ERROR|0E|kafka-producer-network-thread | console-producer|2021-11-10 14:04:26.108 PST|TransportContext.java:341|Fatal (CERTIFICATE_UNKNOWN): No name matching localhost found (
    "throwable" : {
    java.security.cert.CertificateException: No name matching localhost found
    at java.base/sun.security.util.HostnameChecker.matchDNS(HostnameChecker.java:234)
    at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:103)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:455)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:429)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:283)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:141)
    at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1335)
    at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1232)
    at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1175)
    at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
    at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:443)
    at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1074)
    at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1061)
    at java.base/java.security.AccessController.doPrivileged(Native Method)
    at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1008)
    at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:509)
    at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:601)
    at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:447)
    at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:332)
    at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:229)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:563)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:499)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:639)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:327)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:242)
    at java.base/java.lang.Thread.run(Thread.java:829)}

    最佳答案

    client-ssl.properties 中添加以下内容解决了这个问题:

    ssl.endpoint.identification.algorithm=
    此设置意味着证书与您用于运行使用者的机器的主机名不匹配。在这种情况下,这似乎是推荐的方法。
    相关话题:
    Kafka java consumer SSL handshake Error : java.security.cert.CertificateException: No subject alternative names present

    关于ssl - kafka - ssl握手失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69920375/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com