gpt4 book ai didi

apache-spark - Jupyter Notebook 上未显示结构化流输出

转载 作者:行者123 更新时间:2023-12-03 17:00:35 24 4
gpt4 key购买 nike

我有两本笔记本。第一个笔记本正在使用 tweepy 从 twitter 读取推文并将其写入套接字。其他笔记本正在使用 spark 结构化流 (Python) 从该套接字读取推文并将其结果写入控制台。不幸的是,我没有在 jupyter 控制台上获得输出。代码在 pycharm 上运行良好。

spark = SparkSession \
.builder \
.appName("StructuredStreaming") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

# This is Spark Structured Streaming Code which is reading streams from twitter and showing them on console.
tweets = spark \
.readStream \
.format("socket") \
.option("host", "127.0.0.1") \
.option("port", 7000) \
.load()

query = tweets \
.writeStream \
.option("truncate", "false") \
.outputMode("append") \
.format("console") \
.start()

query.awaitTermination()

最佳答案

我不确定 Jupyter Notebook 是否可以做到这一点。但是,您可以使用内存输出来实现类似的结果。这在 complete 中很简单模式,但可能需要对 append 进行一些更改.

对于 complete模式

complete输出模式,您的查询应该或多或少如下:

query = tweets \
.writeStream \
.outputMode("complete") \
.format("memory") \
.queryName("your_query_name") \
.start()

请注意,没有 query.awaitTermination()在末尾。
现在,查询 your_query_name另一个单元格中的临时表,并根据需要观看不断更新的结果:
from IPython.display import display, clear_output

while True:
clear_output(wait=True)
display(query.status)
display(spark.sql('SELECT * FROM your_query_name').show())
sleep(1)

对于 append模式

如果您想使用 append输出模式,你必须使用水印。您也将无法使用聚合,因此您的代码可能需要进一步更改。
query = tweets \
.withWatermark("timestampColumn", "3 minutes")
.writeStream \
.outputMode("append") \
.format("memory") \
.queryName("your_query_name") \
.start()

显示代码保持不变。
您也可以显示 query.lastProgress以类似的方式获取更详细的信息。

灵感和引用
  • How to get the output from console streaming sink in Zeppelin?
  • Overwrite previous output in jupyter notebook
  • 关于apache-spark - Jupyter Notebook 上未显示结构化流输出,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61463554/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com