gpt4 book ai didi

elasticsearch - 如何从连续的RDD构造DStream?

转载 作者:行者123 更新时间:2023-12-03 01:56:42 26 4
gpt4 key购买 nike

我每隔5分钟从ElasticSearch读取数据到Spark。因此,每5分钟会有一个RDD。

我希望基于这些RDD构造一个DStream,以便我可以获取过去1天,过去1小时,过去5分钟等内的数据报告。

为了构造DStream,我正在考虑创建自己的接收器,但是spark的官方文档仅使用scala或java来提供信息。我使用python。

那你知道有什么办法吗?我知道可以毕竟DStream是一系列RDD,我们当然应该从连续的RDD创建DStream。我只是不知道如何。请给点建议

最佳答案

正如您所提到的,编写自己的接收器将是一种方法,但似乎会产生很多开销。您可以使用QueueReceiver来创建QueueInputDStream,就像在this example中一样。它是Scala,但您也应该能够在Python中执行类似的操作:

val rddQueue = new Queue[RDD[Map[String, Any]]]()
val inputStream = ssc.queueStream(rddQueue)

然后,您只需查询每个 X sec/min/h/day/whatever的ES实例,然后将结果放入该队列中。

使用Python,我想可能是这样的:
rddQueue = []
rddQueue += es_rdd() // method that returns an RDD from ES
inputStream = ssc.queueStream(rddQueue)

// some kind of loop that adds to rddQueue new RDDS

显然,您需要先在队列中放入一些东西,然后才能在 queueStream中使用它(或者,如果它为空,至少我在 pyspark中会遇到异常)。

关于elasticsearch - 如何从连续的RDD构造DStream?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36001822/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com