- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个 Java 应用程序,尝试使用 spark-streaming-kafka-0-10_2.11
从 EC2 中的 Kafka 2.5.1 集群中使用主题。 .它仅适用于 Spark 集群或 AWS 之外的独立安装:当 Spark 也托管在 EC2 中时,Kafka 消费者组永远不会完全初始化。对于总共三个主题,只有两个消费者曾经连接过,而第三个消费者反复拒绝组协调器“不可用或无效”。
消费者失败的总是相同的第三个主题,但第二个和第三个主题配置相同并且都为空;它们之间的唯一区别是名称。删除并重新创建第三个主题不会改变任何内容。忽略应用程序代码中的主题 #3(前两个不容易解脱)会导致成功启动。
所有不同的 Sparks 都是 2.4.5 版本,Google Guava JAR 从发布的 14.0.1 更新到 19.0,但没有特殊配置。
Kafka 是一个三节点 EC2 集群,每个节点托管一个代理、一个 Zookeeper 实例和一个 Spark 工作线程。一切都在说话,从其他一切都可以ping通。 server.properties
配置 listeners
到内部 DNS 名称,而 advertised.listeners
是外部的。
listeners=PLAINTEXT://ip-abc-def-ghi-jkl.region.compute.internal:9092
advertised.listeners=PLAINTEXT://ec2-mno-pqr-stu-vwx.region.compute.amazonaws.com:9092
从 EC2 内部启动失败的 Spark 应用程序:
2020-10-08/21:09:32.694/UTC org.apache.kafka.clients.consumer.ConsumerConfig INFO ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [ip-(broker 1 private dns).region.compute.internal:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mygroup
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
2020-10-08/21:09:32.843/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka version : 2.0.0
2020-10-08/21:09:32.844/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka commitId : 3402a8361b734732
2020-10-08/21:09:33.098/UTC org.apache.kafka.clients.Metadata INFO Cluster ID: mk34tRyzT1m1VR1ZC9GYnQ
2020-10-08/21:09:33.100/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
2020-10-08/21:09:33.135/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Revoking previously assigned partitions []
2020-10-08/21:09:33.135/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] (Re-)joining group
2020-10-08/21:09:39.155/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Successfully joined group with generation 1
2020-10-08/21:09:39.159/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Setting newly assigned partitions [partitions here]
2020-10-08/21:09:39.188/UTC org.apache.kafka.clients.consumer.internals.Fetcher INFO [Consumer clientId=consumer-1, groupId=mygroup] Resetting offset for partition station-data-19 to offset 1976.
(more offset resets; consumer 2 has also joined the group successfully between 21:09:32 and 21:09:39. No activity from consumer 3 yet, unlike launches from an external Spark. Consumer 3 spin-up starts next)
2020-10-08/21:09:39.200/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka version : 2.0.0
2020-10-08/21:09:39.205/UTC org.apache.kafka.common.utils.AppInfoParser INFO Kafka commitId : 3402a8361b734732
2020-10-08/21:09:39.213/UTC org.apache.kafka.clients.Metadata INFO Cluster ID: mk34tRyzT1m1VR1ZC9GYnQ
2020-10-08/21:09:39.214/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
2020-10-08/21:09:39.219/UTC org.apache.kafka.clients.consumer.internals.ConsumerCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Revoking previously assigned partitions []
2020-10-08/21:09:39.219/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] (Re-)joining group
2020-10-08/21:09:42.268/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
2020-10-08/21:09:42.268/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-2, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
(more heartbeat failures for consumers 1 and 2)
2020-10-08/21:10:09.254/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null) is unavailable or invalid, will attempt rediscovery
2020-10-08/21:10:09.330/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-2, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
2020-10-08/21:10:09.331/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-1, groupId=mygroup] Attempt to heartbeat failed since group is rebalancing
2020-10-08/21:10:09.377/UTC org.apache.kafka.clients.consumer.internals.AbstractCoordinator INFO [Consumer clientId=consumer-3, groupId=mygroup] Discovered group coordinator ec2-(broker 3 public dns).region.compute.amazonaws.com:9092 (id: 2147483644 rack: null)
在 EC2 之外的 Spark 中,这不会发生:所有三个消费者都或多或少地同时发现协调器并加入组,计算出它们的偏移量,然后开始比赛。但是当应用程序提交到 EC2 中的 Spark 集群时,只有前两个消费者成功加入该组。第三个消费者直到前两个消费者连接并重置它们的偏移后才开始初始化,随后它发现组协调器,尝试与之交谈(导致重新平衡以防止其他消费者心跳),失败并决定它是无效,然后再次找到相同的协调员,重复令人作呕。
bootstrap.servers
必须指向经纪人的外部 DNS 名称。但是,无论是指向代理的外部名称还是内部名称、一个代理还是多个代理,内部应用程序启动都会失败。
[2020-10-08 21:09:33,128] INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group mygroup in Empty state. Created a new member id consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:33,128] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 0 (__consumer_offsets-25) (reason: Adding new member consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:33,136] INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group mygroup in PreparingRebalance state. Created a new member id consumer-1-63909784-c821-4903-a08a-98a250d49b19 for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,128] INFO [GroupCoordinator 3]: Stabilized group mygroup generation 1 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,146] INFO [GroupCoordinator 3]: Assignment received from leader for group mygroup for generation 1 (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,222] INFO [GroupCoordinator 3]: Dynamic Member with unknown member id joins group mygroup in Stable state. Created a new member id consumer-3-3a91c573-a90b-4b5c-9707-af285bf9bbac for this member and add to the group. (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:09:39,222] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 1 (__consumer_offsets-25) (reason: Adding new member consumer-3-3a91c573-a90b-4b5c-9707-af285bf9bbac with group instance id None) (kafka.coordinator.group.GroupCoordinator)
... (more unknown member ids joining the group in PreparingRebalance)
[2020-10-08 21:14:37,756] INFO [GroupCoordinator 3]: Member consumer-2-cd4f7a30-d897-4902-81e7-4211b6a1e233 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:37,757] INFO [GroupCoordinator 3]: Member consumer-1-63909784-c821-4903-a08a-98a250d49b19 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:37,757] INFO [GroupCoordinator 3]: Stabilized group mygroup generation 2 (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:39,625] INFO [GroupMetadataManager brokerId=3] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Member consumer-3-057c4229-7183-4733-b973-9f758b9a69d0 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Preparing to rebalance group mygroup in state PreparingRebalance with old generation 2 (__consumer_offsets-25) (reason: removing member consumer-3-057c4229-7183-4733-b973-9f758b9a69d0 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2020-10-08 21:14:47,758] INFO [GroupCoordinator 3]: Member consumer-3-58c8ca8c-2daa-46c9-964b-7be883193287 in group mygroup has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
... (more members failing and being removed)
[2020-10-08 21:14:47,759] INFO [GroupCoordinator 3]: Group mygroup with generation 3 is now empty (__consumer_offsets-25) (kafka.coordinator.group.GroupCoordinator)
最佳答案
结果证明这是关键:
The third consumer doesn't start to initialize until after the first two have connected and reset their offsets
taskset
限制 CPU 可用性时在我们的测试机器上,我们能够准确地重现问题。允许
spark-submit
三核成功。
关于apache-spark - 组中的一个 Kafka 消费者始终拒绝协调器,但仅当 Spark 和 Kafka 都在 EC2 中时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64285670/
我想在包含点的主要方法中创建一个数组像 p={(3,8),(2,8)}与这个类 public class Point { private float x,y; public Point
在Elasticsearch中,建议将摄取节点设置为专用节点吗?我打算在k8s集群上运行Elasticsearch,并允许将提取容器安排在也运行其他Java服务容器的工作程序节点上。这是用于生产的良好
我最初的想法是使用管道来协调和控制多个C应用程序,并以shell脚本作为执行程序。假设一个 C 应用程序“A”执行 X,另一个 C 应用程序“B”执行 Y。shell 脚本通过 A 和 B 的 IPC
我在 json 文件中收集了推文集合。我想对它们进行操作,例如根据每个数据条目的时间和坐标对推文进行分组。目前,对于对象坐标,我将它们放在列表数据类型中。与时间和日期类似。所以我已经成功地解析了它们,
我有一个网页,用户应该可以在任何地方输入并跟踪他们的输入。一个问题是 firefox 中的斜线键“/”是打开搜索的快捷方式。这对我来说是不可取的。我还没有找到一种方法来捕获搜索功能并且仍然将输入添加到
我正在尝试使用 d3.dispatch 协调多个 View /控件和 Dispatching Events block 来指导我。然而,我遇到了概念上的障碍。考虑以下 View /控件: 国家/地区下
我在一台计算机上使用 Kaldi 工具集进行语音识别,但我无权修改 /var/kaldi 中的安装内容。该目录包含一个脚本文件夹,作为使用示例提供,这些脚本也彼此紧密链接。 结构如下,数据集mydat
在 C 中,“数组语法”只是指针语法的语法糖。那是a[4] 转换为 *(a+4)。 但这并不总是正确的。 当然下面是荒谬的, int a[4] = {1,2,3,4}; int *(a+4) =
我正在寻找一种方法来协调来自 3 个不同来源的元素。我已经将元素简化为只有一个键(字符串)和版本(长)。 列表是同时获得的(2 个来自单独的数据库查询,1 个来自另一个系统上的内存缓存)。 对于我的最
我开始了解 DDD,并担心从持久性中检索实体对象然后在 UI 的 View 模型中重构它们的性能影响。 假设我有两个聚合根: Person Orders ------ ------
我将 jQuery Flotchart 组件打包为 React 组件,每当组件属性发生更改时,我都需要调用图表实例的 plot 方法。 我最终得到的是 shouldComponentUpdate 钩子
如何在.plist中存储省份、城市和坐标信息?这些信息应该随我的应用程序一起提供。我尝试过以下格式,但不起作用。有没有其他方法可以替代plist?因为plist不利于查询。 ProvinceN
我一直在玩弄 SpriteKit,因为我想在一年内制作一款游戏,但最近我遇到了一些奇怪形式的减速带。 这就是问题所在。 我已经创建了一个常规的 SpriteKit 项目并更改了一些默认代码。我在位置
我有两个数组,在我的 C#/.NET Windows Forms 应用程序中使用 ChartDirector( http://www.advsofteng.com/product.html ) 将它们
我正在查看 article on wikipdia对于这个算法,我看到了两个看似矛盾的说法: "it also gives a deterministic way to check that the
我正在尝试将多个 CABasicAnimations 与 AVAudioPlayer 同步。我遇到的问题是 CABasicAnimation 在安排动画时使用 CACurrentMediaTime()
我们的项目正在使用 gitflow 详情 here我的问题是 QA 如何融入其中。 假设我有一个 master 分支和一个 hotfix 分支。一旦修补程序完成,我相信 QA 应该在修补程序发布时完成
有什么方法可以设置默认的 System.Web.Optimization.ScriptBundle 来生成 source maps对于捆绑和缩小的文件?除了必须在每次构建之前预先生成包和源映射之外,是
有什么方法可以设置默认的 System.Web.Optimization.ScriptBundle 来生成 source maps对于捆绑和缩小的文件?除了必须在每次构建之前预先生成包和源映射之外,是
更新到 Git 2.28 后,我意识到 conditional includes .但是,我无法找到一种方法来为 master(main) 设置不同的 pull reconciliation 选项,而
我是一名优秀的程序员,十分优秀!