gpt4 book ai didi

apache-spark - 解释 Spark Structured Streaming executor 和 Kafka partitions 之间的映射

转载 作者:行者123 更新时间:2023-12-04 04:15:14 24 4
gpt4 key购买 nike

我已经在具有 4 个分区的 Kafka 主题上部署了一个具有 4 个工作人员的结构化流。

我假设将为 4 个分区部署 4 个工作人员,并在工作人员<->分区之间进行一对一映射。

但是,事实并非如此。所有分区都由同一个 Executor 提供服务。我通过检查线程 ID 并登录执行程序来确认这一点。

是否有任何文档显示 Kafka 分区和 Spark Structured Streams 之间的相关性。此外,是否有任何我们可以调整的旋钮。

最佳答案

相关性是“1:n(executor:partitions)”:一个Kafka分区只能被一个executor消费,一个executor可以消费多个Kafka分区。

这与 Spark Streaming 一致。

对于Structured Streaming,默认模型为“微批处理模型”,“连续处理模型”仍处于“实验”状态。

对于“微批处理模型”,在“KafkaSource.scala”中,有

 *   - The DF returned is based on [[KafkaSourceRDD]] which is constructed such that the
* data from Kafka topic + partition is consistently read by the same executors across
* batches, and cached KafkaConsumers in the executors can be reused efficiently. See the
* docs on [[KafkaSourceRDD]] for more details.

在“KafkaSourceRDD”中

/**
* An RDD that reads data from Kafka based on offset ranges across multiple partitions.
* Additionally, it allows preferred locations to be set for each topic + partition, so that
* the [[KafkaSource]] can ensure the same executor always reads the same topic + partition
* and cached KafkaConsumers (see [[KafkaDataConsumer]] can be used read data efficiently.
*
* ...
*/
private[kafka010] class KafkaSourceRDD(

我们知道默认位置策略是 LocationStrategies.PreferConsistent .

对于“连续处理模型”,在“KafkaContinuousReader.scala”中

  override def createUnsafeRowReaderFactories(): ju.List[DataReaderFactory[UnsafeRow]] = {
...
startOffsets.toSeq.map {
case (topicPartition, start) =>
KafkaContinuousDataReaderFactory(
topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
.asInstanceOf[DataReaderFactory[UnsafeRow]]
}.asJava
}

/**
* A data reader factory for continuous Kafka processing. This will be serialized and transformed
* into a full reader on executors.
*
* @param topicPartition The (topic, partition) pair this task is responsible for.
* ...
*/
case class KafkaContinuousDataReaderFactory(
topicPartition: TopicPartition,
startOffset: Long,
kafkaParams: ju.Map[String, Object],
pollTimeoutMs: Long,
failOnDataLoss: Boolean) extends DataReaderFactory[UnsafeRow] {
override def createDataReader(): KafkaContinuousDataReader = {
new KafkaContinuousDataReader(
topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
}
}

我们可以知道每个 (topic, partition)将包含在一个工厂中,然后将包含在一个执行器中。

关于apache-spark - 解释 Spark Structured Streaming executor 和 Kafka partitions 之间的映射,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46639824/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com