gpt4 book ai didi

python - Pyspark directStreams foreachRdd 始终有空 RDD

转载 作者:行者123 更新时间:2023-12-01 02:18:47 29 4
gpt4 key购买 nike

我一直在尝试从 Kafka 主题读取数据并将其写入 parquet 文件。到目前为止,除了 foreachRdd 函数之外,一切都正常。当我在dstream上使用map时可以看到数据,但是下一步使用foreachRdd时,Rdd总是空的,我不知道为什么。

我的环境是 Ubuntu,同时独立运行 Kafka 和 Spark。我正在使用 pyspark shell。我是 python 新手,所以我仍然在语法上遇到很多问题,并且不确定这是否是我的问题所在。

任何帮助或见解将不胜感激。

这是我粘贴到 pyspark shell 中的代码副本

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql.types import *
import json

kafkaBroker = 'localhost:9092'
consumer_group = 'spark-streaming'
topic = 'test'
batchTimeDur=5

ssc = StreamingContext(sc, batchTimeDur)
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": kafkaBroker})

#change string to json string
lines = directKafkaStream.map(lambda v: json.loads(v[1]))

# show what is in the stream
lines.map(lambda x: 'rec in this line: %s\n' % x).pprint()

# save lines to file
lines.foreachRDD(lambda x: saveAsParquet(x))

def saveAsParquet(rdd):
print('in save a parquet')
if not rdd.isEmpty:
df = sqlContext.createDataFrame(rdd, buildSchema())
#df.write.parquet('file:///vagrant/streamed-parquet', mode='overwrite')
print(' writing file')
df.write.parquet('file:///vagrant/streamed-parquet', mode='append')
print('return save as parquet')
return rdd

ssc.start()

最佳答案

RDD.isEmpty 是一种方法,而不是属性,因此根据 language defintion , rdd.isEmpty 在 bool 上下文中将被评估为 true:

the following values are interpreted as false: "False", "None", numeric zero of all types, and empty strings and containers (including strings, tuples, lists, dictionaries, sets and frozensets). All other values are interpreted as true.

随后如果不是 rdd.isEmpty 将为 false。

你应该:

if not rdd.isEmpty(): 
...

关于python - Pyspark directStreams foreachRdd 始终有空 RDD,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48140510/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com