gpt4 book ai didi

apache-spark - 如何理解apache spark中的queueStream API?

转载 作者:行者123 更新时间:2023-12-01 09:22:39 25 4
gpt4 key购买 nike

pyspark 有一个 api queueStream 用于从一系列 rdd 构造 dstream。

queueStream(rdds, oneAtATime=True, default=None)
Create an input stream from an queue of RDDs or list. In each batch, it will process either one or all of the RDDs returned by the queue.

NOTE: changes to the queue after the stream is created will not be recognized.

Parameters:
rdds – Queue of RDDs
oneAtATime – pick one rdd each time or pick all of them once.
default – The default rdd if no more in rdds

问题 1:

在分布式环境中,如果我定义了一个队列对象 q1.我做q1.add(RDD)之类的操作。 q1 对象是否会转移到所有工作节点? q1.add(RDD)这个对象复制到其他节点上会不会有问题?

问题2:

在我运行 dstream = queueStream(q1) 之后。
如果我继续将 RDD 放入队列中。这些 RDDS 会添加到 dstream 中吗?

最佳答案

我相信以下注意事项:

changes to the queue after the stream is created will not be recognized.



几乎可以回答这个问题,但要了解为什么会出现这种情况,您必须在 PySpark 代码中这样做,尤其是 following line :
queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds])

如果这还不够,您可以查看 the corresponding Scala code 以查看它需要一个静态列表:
def toRDDQueue(rdds: JArrayList[JavaRDD[Array[Byte]]])

并将其转换为队列:
val queue = new java.util.LinkedList[JavaRDD[Array[Byte]]]
rdds.asScala.foreach(queue.add)

因此,Python 端的任何更改根本无法反射(reflect)在流中。

关于第一个问题,答案是否定的。队列不会被分发,因为 RDD 在 Driver 上下文之外根本没有意义。

注意 :

需要明确的是,Scala queueStream 将反射(reflect)添加到队列中的内容。 There is even an example in the Spark source

关于apache-spark - 如何理解apache spark中的queueStream API?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36324991/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com