gpt4 book ai didi

python - 使用 foreachRDD 和 foreach 迭代 pyspark 中的 rdd

转载 作者:行者123 更新时间:2023-12-01 03:55:44 28 4
gpt4 key购买 nike

Spark 1.6.1、pyspark 的问题

我有流数据传入

{"event":4,"Userid":12345,"time":123456789,"device_model":"iPhone OS", "some_other_property": "value", "row_key": 555}

我有一个名为 writeToHBase(rdd) 的写入 HBase 的函数,期望 rdd 具有以下结构的元组:

(rowkey, [rowkey, column-family, key, value])

正如您从输入格式中看到的,我必须获取原始数据集并迭代所有键,通过发送函数调用发送每个键/值对。

来自阅读 Spark Streaming 编程指南的“使用 foreachRDD 的设计模式”部分 http://spark.apache.org/docs/latest/streaming-programming-guide.html#tab_python_13

似乎建议在数据集外部执行某些操作时使用 foreachRDD。就我而言,我想通过网络将数据写入 HBase,因此我在流数据上使用 foreachRDD 并调用将处理发送数据的函数:

stream.foreachRDD(lambda k: process(k))

我现在对 Spark 函数的理解非常有限,所以我无法找到一种方法来迭代我的原始数据集以使用我的写入函数。如果它是一个 python 可迭代,我可以这样做:

def process(rdd):
for key, value in my_rdd.iteritems():
writeToHBase(sc.parallelize(rowkey, [rowkey, 'column-family', key, value]))

其中 rowkey 可以通过在 rdd 本身中查找来获得

rdd.map(lambda x: x['rowkey'])

如何完成 process() 在 pyspark 中的用途?我看到一些例子使用了 foreach,但我不太能让它做我想做的事。

最佳答案

为什么你想要迭代 rdd,而你的 writeToHBase 函数需要 rdd 作为争论。只需在流程函数中调用 writeToHBase(rdd) 即可。

如果您需要从rdd中获取每条记录,您可以调用

def processRecord(record):
print(record)
rdd.foreach(processRecord)

在 processRecord 函数中,您将获得要处理的单个记录。

关于python - 使用 foreachRDD 和 foreach 迭代 pyspark 中的 rdd,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37492402/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com