- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在阅读如下配置单元表后,我试图将数据写入 Kafka 主题。
write_kafka_data.py:
read_df = spark.sql("select * from db.table where some_column in ('ASIA', 'Europe')")
final_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))
final_df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("kafka.batch.size", 51200)\
.option("retries", 3)\
.option("kafka.max.request.size", 500000)\
.option("kafka.max.block.ms", 120000)\
.option("kafka.metadata.max.age.ms", 120000)\
.option("kafka.request.timeout.ms", 120000)\
.option("kafka.linger.ms", 0)\
.option("kafka.delivery.timeout.ms", 130000)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", 'topic_name')\
.save()
成功写入后(记录数为 29000),我正在另一个文件中读取与下面相同主题的数据:
# SCHEMA
schema = StructType([StructField("col1", StringType()),
StructField("col2", IntegerType())
])
# READ FROM TOPIC
jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \
+ " oauth.token.endpoint.uri=" + '"' + "uri" + '"' \
+ " oauth.client.id=" + '"' + "client_id" + '"' \
+ " oauth.client.secret=" + '"' + "secret_key" + '" ;'
stream_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', kafka_broker) \
.option('subscribe', 'topic_name') \
.option('kafka.security.protocol', 'SASL_SSL') \
.option('kafka.sasl.mechanism', 'OAUTHBEARER') \
.option('kafka.sasl.jaas.config', jass_config) \
.option('kafka.sasl.login.callback.handler.class', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \
.option('startingOffsets', 'latest') \
.option('group.id', 'group_id') \
.option('maxOffsetsPerTrigger', 200) \
.option('fetchOffset.retryIntervalMs', 200) \
.option('fetchOffset.numRetries', 3) \
.load()\
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
stream_df.writeStream.outputMode('append')
.format(HiveWarehouseSession.STREAM_TO_STREAM)
.option("database", "database_name")
.option("table", "table_name")
.option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", "/path/to/checkpoint/dir")
.start().awaitTermination()
我是 Kafka 的初学者,一直在阅读 Kafka 性能优化技术并遇到了这两个。
spark.streaming.backpressure.enabled
andspark.streaming.kafka.maxRatePerPartition
sparkConf.set("spark.streaming.backpressure.enabled",”true”)
上述参数的解释在官方文档中给出为:
Enables or disables Spark Streaming's internal backpressure mechanism(since 1.5). This enables the Spark Streaming to control the receivingrate based on the current batch scheduling delays and processing timesso that the system receives only as fast as the system can process.Internally, this dynamically sets the maximum receiving rate ofreceivers. This rate is upper bounded by the values
spark.streaming.receiver.maxRate
andspark.streaming.kafka.maxRatePerPartition
spark.streaming.backpressure.initialRate
指定一些值吗?
spark.streaming.backpressure.initialRate
的值.
spark.streaming.backpressure.enabled
设置为
true
最大接收速率是动态设置的。
spark.streaming.receiver.maxRate
和
spark.streaming.kafka.maxRatePerPartition
如果
spark.streaming.backpressure.enabled
设置为
true
?
spark.streaming.backpressure.initialRate
没有影响当施加背压时。
最佳答案
配置spark.streaming.[...]
您指的是属于 Direct Streaming(又名 Spark Streaming)和 不是 到结构化流媒体。
如果您不知道其中的区别,我建议您查看单独的编程指南:
maxOffsetsPerTrigger
在每个触发器上设置读取消息的限制。此选项记录在
Structured Streaming and Kafka Integration Guide 中。作为:
"Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume."
How is
spark.streaming.kafka.maxRatePerPartition
related tospark.streaming.backpressure.enabled
in case of spark streaming with Kafka?
"Enables or disables Spark Streaming's internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values
spark.streaming.receiver.maxRate
andspark.streaming.kafka.maxRatePerPartition
if they are set (see below)."
spark.streaming.kafka.maxRatePerPartition
为最佳估计率的 150% ~ 200%。
spark.streaming.backpressure.enabled
设置为真 spark.streaming.kafka.maxRatePerPartition
设置为 10000 spark.streaming.backpressure.pid.minRate
保持默认值 100 关于apache-spark - spark.streaming.kafka.maxRatePerPartition 如何与 spark.streaming.backpressure.enabled incase 与 Kafka 进行 Spark 流相关?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69162574/
我想知道在 QT 应用程序中存储字符串的选项是什么。我的主要要求之一是不重建整个项目或任何文件,以防万一一个字符串发生变化,并将所有字符串放在一个地方。简而言之,我想将字符串放在一个地方并在应用程序启
我正在尝试将 3 Rails 应用程序的通用功能移动到 gem 中。我已经创建了 gem,在本地对其进行了测试,并将移动到私有(private)存储库。 所以,现在我关心的是如果我更改了 gem 内的
匹配模式foo但如果它出现在模式 bar 之后则不然。基本上给定一个字符串,我“尝试”匹配开始标签 如果匹配位于结束标记 。 注意:我正在“尝试”类似的方法来解决,这可能不是解决方案的实际路径。如果
我在我的 UIImageView 中设置了 Logo 图像并运行并且它工作正常,但如果有人错误地删除/重命名图像然后再次运行并且我的应用程序崩溃,因为 Xcode 找不到该图像。在这里我需要实现,如果
我在 Shiny 的仪表板上有两个 plotly 图。当我点击第一个 plotly 图时,交互式事件工作正常。但是,当我对堆叠条形图的第二个图执行相同的操作时,窗口会自动关闭。 您是否遇到过带有不止一
在阅读如下配置单元表后,我试图将数据写入 Kafka 主题。 write_kafka_data.py: read_df = spark.sql("select * from db.table wher
我是一名优秀的程序员,十分优秀!