- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在阅读如下配置单元表后,我试图将数据写入 Kafka 主题。
write_kafka_data.py:
read_df = spark.sql("select * from db.table where some_column in ('ASIA', 'Europe')")
final_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))
final_df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("kafka.batch.size", 51200)\
.option("retries", 3)\
.option("kafka.max.request.size", 500000)\
.option("kafka.max.block.ms", 120000)\
.option("kafka.metadata.max.age.ms", 120000)\
.option("kafka.request.timeout.ms", 120000)\
.option("kafka.linger.ms", 0)\
.option("kafka.delivery.timeout.ms", 130000)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", 'topic_name')\
.save()
成功写入后(记录数为 29000),我正在另一个文件中读取与下面相同主题的数据:
# SCHEMA
schema = StructType([StructField("col1", StringType()),
StructField("col2", IntegerType())
])
# READ FROM TOPIC
jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \
+ " oauth.token.endpoint.uri=" + '"' + "uri" + '"' \
+ " oauth.client.id=" + '"' + "client_id" + '"' \
+ " oauth.client.secret=" + '"' + "secret_key" + '" ;'
stream_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', kafka_broker) \
.option('subscribe', 'topic_name') \
.option('kafka.security.protocol', 'SASL_SSL') \
.option('kafka.sasl.mechanism', 'OAUTHBEARER') \
.option('kafka.sasl.jaas.config', jass_config) \
.option('kafka.sasl.login.callback.handler.class', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \
.option('startingOffsets', 'latest') \
.option('group.id', 'group_id') \
.option('maxOffsetsPerTrigger', 200) \
.option('fetchOffset.retryIntervalMs', 200) \
.option('fetchOffset.numRetries', 3) \
.load()\
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
stream_df.writeStream.outputMode('append')
.format(HiveWarehouseSession.STREAM_TO_STREAM)
.option("database", "database_name")
.option("table", "table_name")
.option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", "/path/to/checkpoint/dir")
.start().awaitTermination()
我是 Kafka 的初学者,一直在阅读 Kafka 性能优化技术并遇到了这两个。
spark.streaming.backpressure.enabled
andspark.streaming.kafka.maxRatePerPartition
sparkConf.set("spark.streaming.backpressure.enabled",”true”)
上述参数的解释在官方文档中给出为:
Enables or disables Spark Streaming's internal backpressure mechanism(since 1.5). This enables the Spark Streaming to control the receivingrate based on the current batch scheduling delays and processing timesso that the system receives only as fast as the system can process.Internally, this dynamically sets the maximum receiving rate ofreceivers. This rate is upper bounded by the values
spark.streaming.receiver.maxRate
andspark.streaming.kafka.maxRatePerPartition
spark.streaming.backpressure.initialRate
指定一些值吗?
spark.streaming.backpressure.initialRate
的值.
spark.streaming.backpressure.enabled
设置为
true
最大接收速率是动态设置的。
spark.streaming.receiver.maxRate
和
spark.streaming.kafka.maxRatePerPartition
如果
spark.streaming.backpressure.enabled
设置为
true
?
spark.streaming.backpressure.initialRate
没有影响当施加背压时。
最佳答案
配置spark.streaming.[...]
您指的是属于 Direct Streaming(又名 Spark Streaming)和 不是 到结构化流媒体。
如果您不知道其中的区别,我建议您查看单独的编程指南:
maxOffsetsPerTrigger
在每个触发器上设置读取消息的限制。此选项记录在
Structured Streaming and Kafka Integration Guide 中。作为:
"Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume."
How is
spark.streaming.kafka.maxRatePerPartition
related tospark.streaming.backpressure.enabled
in case of spark streaming with Kafka?
"Enables or disables Spark Streaming's internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values
spark.streaming.receiver.maxRate
andspark.streaming.kafka.maxRatePerPartition
if they are set (see below)."
spark.streaming.kafka.maxRatePerPartition
为最佳估计率的 150% ~ 200%。
spark.streaming.backpressure.enabled
设置为真 spark.streaming.kafka.maxRatePerPartition
设置为 10000 spark.streaming.backpressure.pid.minRate
保持默认值 100 关于apache-spark - spark.streaming.kafka.maxRatePerPartition 如何与 spark.streaming.backpressure.enabled incase 与 Kafka 进行 Spark 流相关?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69162574/
我正在尝试制作带有背压的流动性。我的想法是,在当前项目之一完成处理之前,不会发出新的可流动项目。我正在使用 ResourceSubscriber 和 subscribeWith() 方法来实现这一点。
我一直在阅读一些关于 RxJava 背压的文档,但是我找不到详细的解释,比如它是如何在库内部发生的,每个人都只是总结说“生产者”太快而“消费者”太慢。 例如像下面的代码: Observable.int
我有一个 PublishSubject,它在某些 UI 事件上调用 onNext()。订阅者通常需要 2 秒来完成其工作。我需要在订阅者忙碌时忽略对 onNext() 的所有调用,但最后一个除外。我尝
有这样的情况,上游传递到下游的数据需要进行处理,然而上游推送的速度又很快,下游由于资源等原因来不及处理;如果这时还是通过不限制上游速度的方式推送数据,就会出问题,因此Reactive Streams有
对于一个运算符,输入流比其输出流快,因此其输入缓冲区将阻塞前一个运算符的输出线程,该输出线程将数据传输到该运算符。对吗? Flink和Spark都是通过阻塞线程来处理背压的吗?那么它们有什么区别呢?
嗯,RxJava 中的背压并不是真正的背压,而只是忽略了一些元素集合。 但是如果我不能释放任何元素并且我需要以某种方式减慢发射速度怎么办? RxJava不能影响元素发射,所以开发者需要自己实现。但是如
在 Apache Camel 2.19.0 中,我想在并发 seda 队列上异步生成消息并使用结果,同时如果 seda 队列上的执行程序已满,则阻塞。它背后的用例:我需要处理包含多行的大文件,并且需要
我有一个关于 Combine 中的 zip 运算符与背压结合的问题。 采用以下代码片段: let sequencePublisher = Publishers.Sequence, Never>(seq
我有一个 flink 作业,它从 Kafka 读取数据,执行某些聚合并将结果写入 elasticsearch 索引。我在源上看到高背压。高背压导致数据从 Kafka 读取缓慢,我看到数据在网络堆栈中排
这是我对这个主题的理解。 有发布者和订阅者。 发布者和订阅者的伪代码是这样的 Publisher{ Subscriber s; subscribe(Subscriber s){
我有一个 Spark 流应用程序,它使用 Spark Direct Streaming(不是接收器)方法从 Kafka 读取消息并按分区处理消息。 在我的 Kafka 分区中,有时我们得到的消息需要
在阅读如下配置单元表后,我试图将数据写入 Kafka 主题。 write_kafka_data.py: read_df = spark.sql("select * from db.table wher
我是一名优秀的程序员,十分优秀!