gpt4 book ai didi

apache-spark - spark.streaming.kafka.maxRatePerPartition 如何与 spark.streaming.backpressure.enabled incase 与 Kafka 进行 Spark 流相关?

转载 作者:行者123 更新时间:2023-12-04 13:24:51 28 4
gpt4 key购买 nike

在阅读如下配置单元表后,我试图将数据写入 Kafka 主题。

write_kafka_data.py:
read_df = spark.sql("select * from db.table where some_column in ('ASIA', 'Europe')")
final_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))

final_df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("kafka.batch.size", 51200)\
.option("retries", 3)\
.option("kafka.max.request.size", 500000)\
.option("kafka.max.block.ms", 120000)\
.option("kafka.metadata.max.age.ms", 120000)\
.option("kafka.request.timeout.ms", 120000)\
.option("kafka.linger.ms", 0)\
.option("kafka.delivery.timeout.ms", 130000)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", 'topic_name')\
.save()
成功写入后(记录数为 29000),我正在另一个文件中读取与下面相同主题的数据:
read_kafka_data.py:
    # SCHEMA
schema = StructType([StructField("col1", StringType()),
StructField("col2", IntegerType())
])

# READ FROM TOPIC
jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \
+ " oauth.token.endpoint.uri=" + '"' + "uri" + '"' \
+ " oauth.client.id=" + '"' + "client_id" + '"' \
+ " oauth.client.secret=" + '"' + "secret_key" + '" ;'

stream_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', kafka_broker) \
.option('subscribe', 'topic_name') \
.option('kafka.security.protocol', 'SASL_SSL') \
.option('kafka.sasl.mechanism', 'OAUTHBEARER') \
.option('kafka.sasl.jaas.config', jass_config) \
.option('kafka.sasl.login.callback.handler.class', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \
.option('startingOffsets', 'latest') \
.option('group.id', 'group_id') \
.option('maxOffsetsPerTrigger', 200) \
.option('fetchOffset.retryIntervalMs', 200) \
.option('fetchOffset.numRetries', 3) \
.load()\
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')

stream_df.writeStream.outputMode('append')
.format(HiveWarehouseSession.STREAM_TO_STREAM)
.option("database", "database_name")
.option("table", "table_name")
.option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", "/path/to/checkpoint/dir")
.start().awaitTermination()
我是 Kafka 的初学者,一直在阅读 Kafka 性能优化技术并遇到了这两个。

spark.streaming.backpressure.enabled and spark.streaming.kafka.maxRatePerPartition


要启用第一个参数:
sparkConf.set("spark.streaming.backpressure.enabled",”true”)
上述参数的解释在官方文档中给出为:

Enables or disables Spark Streaming's internal backpressure mechanism(since 1.5). This enables the Spark Streaming to control the receivingrate based on the current batch scheduling delays and processing timesso that the system receives only as fast as the system can process.Internally, this dynamically sets the maximum receiving rate ofreceivers. This rate is upper bounded by the valuesspark.streaming.receiver.maxRate andspark.streaming.kafka.maxRatePerPartition


现在我是第一次运行应用程序并且没有以前的微批处理,我应该为: spark.streaming.backpressure.initialRate 指定一些值吗?
如果是这样,我应该如何确定 spark.streaming.backpressure.initialRate 的值.
文档还说如果 spark.streaming.backpressure.enabled设置为 true最大接收速率是动态设置的。
如果是这样,我们是否还需要配置: spark.streaming.receiver.maxRatespark.streaming.kafka.maxRatePerPartition如果 spark.streaming.backpressure.enabled设置为 true ?
link说使用 spark.streaming.backpressure.initialRate 没有影响当施加背压时。
任何有助于消除困惑的帮助将不胜感激。

最佳答案

配置spark.streaming.[...]您指的是属于 Direct Streaming(又名 Spark Streaming)和 不是 到结构化流媒体。
如果您不知道其中的区别,我建议您查看单独的编程指南:

  • Structured Streaming: processing structured data streams with relation queries (using Datasets and DataFrames, newer API than DStreams)
  • Spark Streaming: processing data streams using DStreams (old API)

  • Structured Streaming 不提供背压机制。当您从 Kafka 消费时,您可以使用(正如您已经在做的那样)选项 maxOffsetsPerTrigger在每个触发器上设置读取消息的限制。此选项记录在 Structured Streaming and Kafka Integration Guide 中。作为:

    "Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume."



    如果您仍然对标题问题感兴趣

    How is spark.streaming.kafka.maxRatePerPartition related to spark.streaming.backpressure.enabled in case of spark streaming with Kafka?


    此关系在 Spark's Configuration 的文档中进行了解释。 :

    "Enables or disables Spark Streaming's internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if they are set (see below)."


    有关 Spark Streaming(DStream,而不是 Structured Streaming)中可用的背压机制的所有详细信息都在您已链接的博客中进行了解释 Enable Back Pressure To Make Your Spark Streaming Application Production Ready .
    通常,如果您启用背压,您将设置 spark.streaming.kafka.maxRatePerPartition为最佳估计率的 150% ~ 200%。
    PID的精确计算 Controller 可以在类 PIDRateEstimator 的代码中找到.
    Spark Streaming 的背压示例
    正如您所要求的一个示例,这是我在我的一个生产应用程序中完成的一个示例:
    设置
  • Kafka 主题有 16 个分区
  • Spark 使用 16 个工作内核运行,因此每个分区可以并行使用
  • 使用 Spark 流(非结构化流)
  • 批处理间隔为 10 秒
  • spark.streaming.backpressure.enabled设置为真
  • spark.streaming.kafka.maxRatePerPartition设置为 10000
  • spark.streaming.backpressure.pid.minRate保持默认值 100
  • 该作业每个分区每秒可以处理大约 5000 条消息
  • 在开始流式作业之前,Kafka 主题在每个分区中包含数百万条消息

  • 观察
  • 在第一批中,流式作业获取 16000(= 10 秒 * 16 个分区 * 100 pid.minRate)消息。
  • 作业处理这 16000 条消息的速度非常快,因此 PID Controller 估计的最佳速率大于 10000 的 masRatePerPartition。
  • 因此,在第二批中,流式作业获取 16000(= 10 秒 * 16 个分区 * 10000 maxRatePerPartition)消息。
  • 现在,第二批大约需要 22 秒才能完成
  • 因为我们的批处理间隔设置为 10 秒,10 秒后流作业已经调度了第三个微批处理,再次是 1600000。原因是 PID Controller 只能使用来自已完成微批处理的性能信息。
  • 只有在第六个或第七个微批次中,PID Controller 才能找到每个分区每秒大约 5000 条消息的最佳处理速率。
  • 关于apache-spark - spark.streaming.kafka.maxRatePerPartition 如何与 spark.streaming.backpressure.enabled incase 与 Kafka 进行 Spark 流相关?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69162574/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com