- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 Java 应用程序,尝试使用 spark-streaming-kafka-0-10_2.11
从 EC2 中的 Kafka 2.5.1 集群中使用主题。 .它仅适用于 Spark 集群或 AWS 之外的独立安装:当 Spark 也托管在 EC2 中时,Kafka 消费者组永远不会完全初始化。对于总共三个主题,只有两个消费者曾经连接过,而第三个消费者反复拒绝组协调器“不可用或无效”。
消费者失败的总是相同的第三个主题,但第二个和第三个主题配置相同并且都为空;它们之间的唯一区别是名称。删除并重新创建第三个主题不会改变任何内容。忽略应用程序代码中的主题 #3(前两个不容易解脱)会导致成功启动。
所有不同的 Sparks 都是 2.4.5 版本,Google Guava JAR 从发布的 14.0.1 更新到 19.0,但没有特殊配置。
Kafka 是一个三节点 EC2 集群,每个节点托管一个代理、一个 Zookeeper 实例和一个 Spark 工作线程。一切都在说话,从其他一切都可以ping通。 server.properties
配置 listeners
到内部 DNS 名称,而 advertised.listeners
是外部的。
listeners=PLAINTEXT://ip-abc-def-ghi-jkl.region.compute.internal:9092
advertised.listeners=PLAINTEXT://ec2-mno-pqr-stu-vwx.region.compute.amazonaws.com:9092
从 EC2 内部启动失败的 Spark 应用程序:
2020-10-08/21:09:32.694/UTC org.apache.kafka.clients.consumer.ConsumerConfig INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [ip-(broker 1 private dns).region.compute.internal:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mygroup
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2020-10-08/21:09:32.843/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka version : 2.0.0
2020-10-08/21:09:32.844/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka commitId : 3402a8361b734732
2020-10-08/21:09:33.098/UTC org.apache.kafka.clients.Metadata INFO Cluster ID: mk34tRyzT1m1VR1ZC9GYnQ
2020-10-08/21:09:33.100/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
2020-10-08/21:09:33.135/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Revoking previously assigned partitions []
2020-10-08/21:09:33.135/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] (Re-)joining group
2020-10-08/21:09:39.155/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Successfully joined group with generation 1
2020-10-08/21:09:39.159/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Setting newly assigned partitions [partitions here]
2020-10-08/21:09:39.188/UTC org.apache.kafka.clients.consumer.internals.Fetcher INFO [Consumer clientId=consumer-1, groupId=mygroup] Resetting offset for partition station-data-19 to offset 1976.
(more offset resets; consumer 2 has also joined the group successfully between 21:09:32 and 21:09:39. No activity from consumer 3 yet, unlike launches from an external Spark. Consumer 3 spin-up starts next)
2020-10-08/21:09:39.200/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka version : 2.0.0
2020-10-08/21:09:39.205/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka commitId : 3402a8361b734732
2020-10-08/21:09:39.213/UTC org.apache.kafka.clients.Metadata INFO Cluster ID: mk34tRyzT1m1VR1ZC9GYnQ
2020-10-08/21:09:39.214/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
2020-10-08/21:09:39.219/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Revoking previously assigned partitions []
2020-10-08/21:09:39.219/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] (Re-)joining group
2020-10-08/21:09:42.268/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
2020-10-08/21:09:42.268/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-2, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
(more heartbeat failures for consumers 1 and 2)
2020-10-08/21:10:09.254/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-10-08/21:10:09.330/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-2, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
2020-10-08/21:10:09.331/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
2020-10-08/21:10:09.377/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
在 EC2 之外的 Spark 中,这不会发生:所有三个消费者都或多或少地同时发现协调器并加入组,计算出它们的偏移量,然后开始比赛。但是当应用程序提交到 EC2 中的 Spark 集群时,只有前两个消费者成功加入该组。第三个消费者直到前两个消费者连接并重置它们的偏移后才开始初始化,随后它发现组协调器,尝试与之交谈(导致重新平衡以防止其他消费者心跳),失败并决定它是无效,然后再次找到相同的协调员,重复令人作呕。
bootstrap.servers
必须指向经纪人的外部 DNS 名称。但是,无论是指向代理的外部名称还是内部名称、一个代理还是多个代理,内部应用程序启动都会失败。
[2020-10-08 21:09:33,128] INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group mygroup in Empty state. Created a new member id consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:33,128] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 0 (__consumer_offsets-25) (reason: Adding new member consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:33,136] INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group mygroup in PreparingRebalance state. Created a new member id consumer-1-63909784-c821-4903-a08a-98a250d49b19 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,128] INFO [GroupCoordinator 3]: Stabilized group mygroup generation 1 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,146] INFO [GroupCoordinator 3]: Assignment received from leader for group mygroup for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,222] INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group mygroup in Stable state. Created a new member id consumer-3-3a91c573-a90b-4b5c-9707-af285bf9bbac for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,222] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 1 (__consumer_offsets-25) (reason: Adding new member consumer-3-3a91c573-a90b-4b5c-9707-af285bf9bbac with group instance id None) (kafka.coordinator.group.GroupCoordinator)
... (more unknown member ids joining the group in PreparingRebalance)
[2020-10-08 21:14:37,756] INFO [GroupCoordinator 3]: Member consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:37,757] INFO [GroupCoordinator 3]: Member consumer-1-63909784-c821-4903-a08a-98a250d49b19 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:37,757] INFO [GroupCoordinator 3]: Stabilized group mygroup generation 2 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:39,625] INFO [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Member consumer-3-057c4229-7183-4733-b973-9f758b9a69d0 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 2 (__consumer_offsets-25) (reason: removing member consumer-3-057c4229-7183-4733-b973-9f758b9a69d0 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Member consumer-3-58c8ca8c-2daa-46c9-964b-7be883193287 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
... (more members failing and being removed)
[2020-10-08 21:14:47,759] INFO [GroupCoordinator 3]: Group mygroup with generation 3 is now empty (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
最佳答案
结果证明这是关键:
The third consumer doesn't start to initialize until after the first two have connected and reset their offsets
taskset
限制 CPU 可用性时在我们的测试机器上,我们能够准确地重现问题。允许
spark-submit
三核成功。
关于apache-spark - 组中的一个 Kafka 消费者始终拒绝协调器,但仅当 Spark 和 Kafka 都在 EC2 中时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64285670/
当我使用路径文件上的快捷方式在文件之间移动时,似乎我不仅仅是在文件之间移动。 我使用>转到一个文件,在该文件中我更改光标的位置并执行某些操作,然后按 gf noremap 关于vim 通过快捷方式直
我正在尝试使用 Pong P. Chu 的书来学习 Verilog。我有一个关于如何评估和实现始终 block 的问题。作者代码中的风格让我感到困惑。 在此示例中,他编写了一个具有两个输出寄存器“y1
我正在尝试制作一个聊天应用程序,因此我需要它始终接收服务器信息。因此,当请求完成时,在: http.onreadystatechange=function(){ 我再次调用该函数,因此: reques
当您在 always block 敏感度列表中使用通配符 @* 时,我对什么被视为输入有点困惑。例如,在下面的示例中,哪些信号被解释为导致 always block 被重新评估的输入? 据我了解,cl
我有一个充当调试器的程序。我为线程设置了一个 hw bp,将 dr0 设置为我希望 bp 所在的地址,将 dr7 设置为 1,因为我希望 bp 在每次执行该地址时生成一个事件。 它有效,但现在的问题是
如何每次都以管理员身份在 Windows 上运行 git bash。 操作系统 - Windows 10 家庭版 64 位 最佳答案 我在 Google 上找到了这个结果: 将 Git Bash 设置
使用 accept() 时或 getpeername() , sockaddr_storage总是有 ss_family=AF_INET6 : struct sockaddr_storage addr
我在 Cordova 方面还有另一个问题。我想在 Cordova 7.1.0 中使用插件“cordova.custom.plugins.exitapp”和“cordova-plugins-printe
我试图让模块通过 ISE 12.4 中的语法检查,但它给了我一个我不明白的错误。首先是代码片段: parameter ROWBITS = 4; reg [ROWBITS-1:0] temp; genv
我正在使用Cordova开发适用于iOS的应用程序,其中包括地理位置功能(我使用官方插件https://github.com/apache/cordova-plugin-geolocation)。我在
我想知道是否有可能只在敏感列表中的多个信号一起变化时才执行 always block 。 例如,假设我有一个信号“in”和另一个“posedge clk”。我希望在两个信号都发生变化时执行 alway
我需要实现一种算法来访问数据库来检查最后一个元素,以便计算新的元素。当然,第一次这是不可能的,因为数据库是空的,我得到 IndexOutOfBoundsException) index 0 reque
我正在利用我在网上找到的画廊系统,根据鼠标图像的接近程度,它会按比例增长。 链接:Gallery 好吧,我调整了代码以响应(如您所见正在 build 中)并且没有明显的问题。我的问题在更改分辨率时开始
我正在创建一个 kiosk 应用程序,我想确保它无论如何始终位于其他 Windows 应用程序和 Windows 任务栏之上。 我已经阻止了 Windows 键盘命令(alt-tab 等),但仍有可能
我即将开始一个新的 React 项目,并尝试利用我以前的知识来创建一些关于我如何构建应用程序的规则。 有些事情我认为是真的: Redux 保存整个应用程序的“主要”数据 如果需要跨应用程序共享,Red
当你打开 VS Code 时,终端默认是在底部打开的。您可以单击该图标将其向右移动。我想知道是否有办法将右侧打开设置为默认值。 谢谢。 最佳答案 是的 - 在 v1.20 中引入了设置 workb
我有一个Events表,其中包含各种类型的事件。我只关心其中一种类型。因此,我编写的每个查询都以开头 Events.objects.filter(event_type="the_type").\
我在单例中创建了一个Timer,并且我一直在努力解决为什么Timer没有触发。我查看了这里的帖子,但没有找到我认为可以直接回答我的问题的帖子。 class ConnectionStateMonitor
我在 TableViewController 中显示了一组项目。它们在 TVC 中正确显示。下面的代码会继续,但它只会继续到我的 MKMapItem 数组的 indexPath 0,而不是被单击的单元
我的 VC 是这样的: var coins = 50 // coins override func viewDidLoad() { super.viewDidLoad() if(SKP
我是一名优秀的程序员,十分优秀!