gpt4 book ai didi

apache-spark - 组中的一个 Kafka 消费者始终拒绝协调器,但仅当 Spark 和 Kafka 都在 EC2 中时

转载 作者:行者123 更新时间:2023-12-03 14:52:41 25 4
gpt4 key购买 nike

我有一个 Java 应用程序,尝试使用 spark-streaming-kafka-0-10_2.11 从 EC2 中的 Kafka 2.5.1 集群中使用主题。 .它仅适用于 Spark 集群或 AWS 之外的独立安装:当 Spark 也托管在 EC2 中时,Kafka 消费者组永远不会完全初始化。对于总共三个主题,只有两个消费者曾经连接过,而第三个消费者反复拒绝组协调器“不可用或无效”。
消费者失败的总是相同的第三个主题,但第二个和第三个主题配置相同并且都为空;它们之间的唯一区别是名称。删除并重新创建第三个主题不会改变任何内容。忽略应用程序代码中的主题 #3(前两个不容易解脱)会导致成功启动。
所有不同的 Sparks 都是 2.4.5 版本,Google Guava JAR 从发布的 14.0.1 更新到 19.0,但没有特殊配置。
Kafka 是一个三节点 EC2 集群,每个节点托管一个代理、一个 Zookeeper 实例和一个 Spark 工作线程。一切都在说话,从其他一切都可以ping通。 server.properties配置 listeners到内部 DNS 名称,而 advertised.listeners是外部的。

listeners=PLAINTEXT://ip-abc-def-ghi-jkl.region.compute.internal:9092
advertised.listeners=PLAINTEXT://ec2-mno-pqr-stu-vwx.region.compute.amazonaws.com:9092
从 EC2 内部启动失败的 Spark 应用程序:
2020-10-08/21:09:32.694/UTC org.apache.kafka.clients.consumer.ConsumerConfig INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [ip-(broker 1 private dns).region.compute.internal:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mygroup
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2020-10-08/21:09:32.843/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka version : 2.0.0
2020-10-08/21:09:32.844/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka commitId : 3402a8361b734732
2020-10-08/21:09:33.098/UTC org.apache.kafka.clients.Metadata INFO Cluster ID: mk34tRyzT1m1VR1ZC9GYnQ
2020-10-08/21:09:33.100/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
2020-10-08/21:09:33.135/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Revoking previously assigned partitions []
2020-10-08/21:09:33.135/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] (Re-)joining group
2020-10-08/21:09:39.155/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Successfully joined group with generation 1
2020-10-08/21:09:39.159/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Setting newly assigned partitions [partitions here]
2020-10-08/21:09:39.188/UTC org.apache.kafka.clients.consumer.internals.Fetcher INFO [Consumer clientId=consumer-1, groupId=mygroup] Resetting offset for partition station-data-19 to offset 1976.

(more offset resets; consumer 2 has also joined the group successfully between 21:09:32 and 21:09:39. No activity from consumer 3 yet, unlike launches from an external Spark. Consumer 3 spin-up starts next)

2020-10-08/21:09:39.200/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka version : 2.0.0
2020-10-08/21:09:39.205/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka commitId : 3402a8361b734732
2020-10-08/21:09:39.213/UTC org.apache.kafka.clients.Metadata INFO Cluster ID: mk34tRyzT1m1VR1ZC9GYnQ
2020-10-08/21:09:39.214/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
2020-10-08/21:09:39.219/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Revoking previously assigned partitions []
2020-10-08/21:09:39.219/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] (Re-)joining group
2020-10-08/21:09:42.268/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
2020-10-08/21:09:42.268/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-2, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing

(more heartbeat failures for consumers 1 and 2)

2020-10-08/21:10:09.254/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-10-08/21:10:09.330/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-2, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
2020-10-08/21:10:09.331/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
2020-10-08/21:10:09.377/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
在 EC2 之外的 Spark 中,这不会发生:所有三个消费者都或多或少地同时发现协调器并加入组,计算出它们的偏移量,然后开始比赛。但是当应用程序提交到 EC2 中的 Spark 集群时,只有前两个消费者成功加入该组。第三个消费者直到前两个消费者连接并重置它们的偏移后才开始初始化,随后它发现组协调器,尝试与之交谈(导致重新平衡以防止其他消费者心跳),失败并决定它是无效,然后再次找到相同的协调员,重复令人作呕。
像这样失败的 Spark 提交与来自 EC2 外部的成功 Spark 提交之间唯一重要的配置区别是后者 bootstrap.servers必须指向经纪人的外部 DNS 名称。但是,无论是指向代理的外部名称还是内部名称、一个代理还是多个代理,内部应用程序启动都会失败。
这是来自 broker 3 的 Kafka server.log,上面标识为组协调器:
[2020-10-08 21:09:33,128] INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group mygroup in Empty state. Created a new member id consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:33,128] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 0 (__consumer_offsets-25) (reason: Adding new member consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:33,136] INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group mygroup in PreparingRebalance state. Created a new member id consumer-1-63909784-c821-4903-a08a-98a250d49b19 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,128] INFO [GroupCoordinator 3]: Stabilized group mygroup generation 1 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,146] INFO [GroupCoordinator 3]: Assignment received from leader for group mygroup for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,222] INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group mygroup in Stable state. Created a new member id consumer-3-3a91c573-a90b-4b5c-9707-af285bf9bbac for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,222] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 1 (__consumer_offsets-25) (reason: Adding new member consumer-3-3a91c573-a90b-4b5c-9707-af285bf9bbac with group instance id None) (kafka.coordinator.group.GroupCoordinator)

... (more unknown member ids joining the group in PreparingRebalance)

[2020-10-08 21:14:37,756] INFO [GroupCoordinator 3]: Member consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:37,757] INFO [GroupCoordinator 3]: Member consumer-1-63909784-c821-4903-a08a-98a250d49b19 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:37,757] INFO [GroupCoordinator 3]: Stabilized group mygroup generation 2 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:39,625] INFO [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Member consumer-3-057c4229-7183-4733-b973-9f758b9a69d0 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 2 (__consumer_offsets-25) (reason: removing member consumer-3-057c4229-7183-4733-b973-9f758b9a69d0 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Member consumer-3-58c8ca8c-2daa-46c9-964b-7be883193287 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

... (more members failing and being removed)

[2020-10-08 21:14:47,759] INFO [GroupCoordinator 3]: Group mygroup with generation 3 is now empty (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)

最佳答案

结果证明这是关键:

The third consumer doesn't start to initialize until after the first two have connected and reset their offsets


前两个消费者在消费者 3 启动时几秒钟没有事件,之后心跳开始失败。
Spark 驱动程序是具有两个 vCPU 内核的旧实例 (m3.large)。我们成功的测试来自具有更多内核的较新机器,以及当我们使用 taskset 限制 CPU 可用性时在我们的测试机器上,我们能够准确地重现问题。允许 spark-submit三核成功。
以前使用“简单”的 Kafka 0.8+ API 对我们来说这不是问题,但是开始使用 0.10+ 的"new"消费者 API 似乎需要组中的每个消费者都有一个核心。

关于apache-spark - 组中的一个 Kafka 消费者始终拒绝协调器,但仅当 Spark 和 Kafka 都在 EC2 中时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64285670/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com