gpt4 book ai didi

azure - 我是否需要为多个接收器拥有多个事件中心消费者组?

转载 作者:行者123 更新时间:2023-12-03 02:14:14 32 4
gpt4 key购买 nike

我正在从 eventhub 接收流数据,并且有 4 种类型的数据来自 eventhub。

我正在从 databricks 集群上的事件中心读取数据,如下所示:

ehConf = {}
ehConf['eventhubs.connectionString'] = sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(EVENT_HUB_INSTANCE_CONNECTION_STRING)
ehConf['eventhubs.consumerGroup'] = 'ConsumerGroup_1'

spark_df = spark.readStream.format("eventhubs").options(**ehConf).load()

在消息属性中,我有消息类型,因此我使用 Spark where 函数分隔数据,如下所示:

df_type_A = spark_df.select(col("body")).where(spark_df.properties["msgType"]=="TypeA")
df_type_B = spark_df.select(col("body")).where(spark_df.properties["msgType"]=="TypeB")
df_type_C = spark_df.select(col("body")).where(spark_df.properties["msgType"]=="TypeC")
df_type_D = spark_df.select(col("body")).where(spark_df.properties["msgType"]=="TypeD")

然后将数据写入不同的接收器,如下所示:

df_type_A.writeStream\
.format("text")\
.trigger(processingTime='10 seconds')\
.option("checkpointLocation", "/mnt/type_A/Checkpoint")\
.option("path", "/mnt/type_A/Data")\
.start()

df_type_B.writeStream\
.format("text")\
.trigger(processingTime='10 seconds')\
.option("checkpointLocation", "/mnt/type_B/Checkpoint")\
.option("path", "/mnt/type_B/Data")\
.start()

df_type_C.writeStream\
.format("text")\
.trigger(processingTime='10 seconds')\
.option("checkpointLocation", "/mnt/type_C/Checkpoint")\
.option("path", "/mnt/type_C/Data")\
.start()

df_type_D.writeStream\
.format("text")\
.trigger(processingTime='10 seconds')\
.option("checkpointLocation", "/mnt/type_D/Checkpoint")\
.option("path", "/mnt/type_D/Data")\
.start()

据我了解,spark 遵循延迟执行,对于多个接收器,它将创建 4 个不同的 dag 图。 Microsoft 表示“建议每个消费者组的分区上只有一个事件接收器。” ( https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-features )

为了使一切正常运行,我是否需要在事件中心创建 4 个不同的消费者组并为每种类型(A、B、C、D)编写单独的作业,或者一个消费者组就足够了?

如果创建多个消费者组是唯一的选择,是否可以避免它并有效地完成相同的任务?

编辑:我尝试使用单一消费者组来完成我的 4 项工作,但没有成功。它正在抛出我的新接收器,该接收器具有更高的 epoch_id 和更高的纪元“0”,因此当前接收器“spark-driver-14”与纪元“0”正在断开连接。如果您要重新创建接收器,请确保使用更高的纪元

最佳答案

消费者组是相同数据的另一种 View 。它不是一个允许您将消息路由到特定消费者组的概念。每个消费者组处理所有分区的所有消息。我认为下图很好地说明了这一点:

enter image description here

(取自 the docs )

现在,关于这一点:

it's recommended that there's only one active receiver on a partition per consumer group

接收器会锁定它正在为特定消费者组读取的分区。如果多个接收者从同一分区和消费者组读取数据,他们将竞争获取锁。这是低效的,这就是为什么建议每个消费者组的每个分区有一个事件接收器的原因。

对我来说,你的方法似乎是有效的。不需要多个消费者组。

关于azure - 我是否需要为多个接收器拥有多个事件中心消费者组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/72001958/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com