gpt4 book ai didi

azure - 从 PySpark 写入大型 DataFrame 到 Kafka 遇到超时

转载 作者:行者123 更新时间:2023-12-01 03:09:15 25 4
gpt4 key购买 nike

我正在尝试将一个包含大约 2.3 亿条记录的数据帧写入 Kafka。更具体地说是Kafka-enable Azure Event Hub ,但我不确定这是否真的是我的问题的根源。

EH_SASL = 'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://myeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=****";'

dfKafka \
.write \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("topic", "mytopic") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()

启动正常,并成功(而且速度相当快)将大约 3-4 百万条记录写入队列。但几分钟后工作就会停止,并显示如下消息:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 7.0 failed 4 times, most recent failure: Lost task 6.3 in stage 7.0 (TID 248, 10.139.64.5, executor 1): kafkashaded.org.apache.kafka.common.errors.TimeoutException: Expiring 61 record(s) for mytopic-18: 32839 ms has passed since last append

org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 8.0 failed 4 times, most recent failure: Lost task 13.3 in stage 8.0 (TID 348, 10.139.64.5, executor 1): kafkashaded.org.apache.kafka.common.errors.TimeoutException: The request timed out.

此外,我从未看到检查点文件被创建/写入。我还尝试了 .option("kafka.delivery.timeout.ms", 30000) 和不同的值,但这似乎没有任何效果。

我在 Azure Databricks 群集版本 5.0 中运行此程序(包括 Apache Spark 2.4.0、Scala 2.11)

我在事件中心上没有看到任何诸如限制之类的错误,所以应该没问题。

最佳答案

终于弄清楚了(大部分):

事实证明,大约 16000 条消息的默认批量大小对于端点来说太大了。在我将 batch.size 参数设置为 5000 后,它开始工作,并且以每分钟约 700k 条消息写入事件中心。另外,上面的超时参数是错误的并且被忽略了。它是kafka.request.timeout.ms

唯一的问题是,它仍然会随机地超时运行,并且显然会再次从头开始,因此我最终会得到重复的结果。即将开通another question为此。

dfKafka \
.write \
.format("kafka") \
.option("kafka.sasl.mechanism", "PLAIN") \
.option("kafka.security.protocol", "SASL_SSL") \
.option("kafka.sasl.jaas.config", EH_SASL) \
.option("kafka.batch.size", 5000) \
.option("kafka.bootstrap.servers", "myeventhub.servicebus.windows.net:9093") \
.option("kafka.request.timeout.ms", 120000) \
.option("topic", "raw") \
.option("checkpointLocation", "/mnt/telemetry/cp.txt") \
.save()

关于azure - 从 PySpark 写入大型 DataFrame 到 Kafka 遇到超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53765133/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com