gpt4 book ai didi

apache-spark - 将 mqtt 与 pyspark 流结合使用

转载 作者:行者123 更新时间:2023-12-04 04:55:01 27 4
gpt4 key购买 nike

我是 spark 和 mqtt 的新手。我正在尝试使用我在网上获得的名为 wordcount.py 的 MQTTUtils 代码

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.mqtt import MQTTUtils
if __name__ == "__main__":
if len(sys.argv) != 3:
print >> sys.stderr, "Usage: mqtt_wordcount.py <broker url> <topic>"
exit(-1)

sc = SparkContext(appName="PythonStreamingMQTTWordCount")
ssc = StreamingContext(sc, 1)

brokerUrl = sys.argv[1]
topic = sys.argv[2]

lines = MQTTUtils.createStream(ssc, brokerUrl, topic)
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

我按照说明安装了 mosquitto 代理(它正在工作),下载 spark-streaming-mqtt-assembly_2.11-1.6.2.jar 并使用以下命令运行 python 脚本:
~$ spark-submit --jars spark-streaming-mqtt-assembly_*.jar wordcount.py

但显示的错误:

从 pyspark.streaming.mqtt 导入 MQTTUtils

ImportError:没有名为 mqtt 的模块

那是我错过了这里的任何东西吗?
谢谢

最佳答案

对于 spark 版本 2.*,我们可以在 Structured Streaming 中使用 MQTT通过包括Bahir Jar。

从 pyspark 连接到 MQTT 代理:

(spark
.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("topic","mytopic")
.load("tcp://{}".format(broker_uri)))

关于apache-spark - 将 mqtt 与 pyspark 流结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39349068/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com