gpt4 book ai didi

apache-spark - pyspark 中的转换 DStream 在调用 pprint 时出错

转载 作者:行者123 更新时间:2023-12-04 05:03:40 24 4
gpt4 key购买 nike

我正在通过 PySpark 探索 Spark Streaming,并在尝试使用 transform 时遇到错误功能与 take .

我可以成功使用sortBy对抗DStream通过 transformpprint结果。

author_counts_sorted_dstream = author_counts_dstream.transform\
(lambda foo:foo\
.sortBy(lambda x:x[0].lower())\
.sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()

但是如果我使用 take遵循相同的模式并尝试 pprint它:

top_five = author_counts_sorted_dstream.transform\
(lambda rdd:rdd.take(5))
top_five.pprint()

工作失败

Py4JJavaError: An error occurred while calling o25.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call
return r._jrdd
AttributeError: 'list' object has no attribute '_jrdd'


您可以在 the notebook here 中看到完整的代码和输出。 .

我究竟做错了什么?

最佳答案

您传递给 transform 的函数应该从 RDD 转换而来至 RDD .如果您使用操作,例如 take ,您必须将结果转换回 RDD :

sc: SparkContext = ...

author_counts_sorted_dstream.transform(
lambda rdd: sc.parallelize(rdd.take(5))
)

相比之下 RDD.sortBy used 是一个转换(返回一个 RDD)所以不需要进一步的并行化。

附带说明以下功能:

lambda foo: foo \
.sortBy(lambda x:x[0].lower()) \
.sortBy(lambda x:x[1], ascending=False)

没有多大意义。请记住,Spark 按 shuffle 排序,因此它不稳定。如果要按多个字段排序,则应使用复合键,例如:

lambda x: (x[0].lower(), -x[1])

关于apache-spark - pyspark 中的转换 DStream 在调用 pprint 时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41483746/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com