作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我尝试将 MQTT 与 PySpark Structured Streaming 结合使用。
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
spark = SparkSession \
.builder \
.appName("Test") \
.master("local[4]") \
.getOrCreate()
# Custom Structured Streaming receiver
lines = spark\
.readStream\
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")\
.option("topic","uwb/distances")\
.option('brokerUrl', 'tcp://127.0.0.1:1883')\
.load()
# Split the lines into words
words = lines.select(explode(split(lines.value, ' ')).alias('word'))
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the console
query = wordCounts \
.writeStream \
.outputMode('complete') \
.format('console') \
.start()
query.awaitTermination()
错误信息:
Logical Plan:
Aggregate [word#7], [word#7, count(1) AS count#11L]
+- Project [word#7]
+- Generate explode(split(value#2, )), false, [word#7]
+- StreamingExecutionRelation org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource@383ccec1, [value#2, timestamp#3]
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource@383ccec1 did not have isStreaming=true
我不明白我的代码有什么问题。此外,根据this post Bahir MQTT 实际上支持 Structured Streaming 2.1.0。我也尝试了 Spark 2.2.1 并遇到了同样的问题。
这是我运行代码的方式:
spark-submit \
--jars lib/spark-streaming-mqtt_2.11-2.2.1.jar, \
lib/spark-sql-streaming-mqtt_2.11-2.2.1.jar, \
lib/org.eclipse.paho.client.mqttv3-1.2.0.jar \
TestSpark.py
我该如何解决这个问题?
最佳答案
我下载了 Spark 2.2.0 二进制文件并执行了如下代码:
~/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit \
--jars lib/spark-streaming-mqtt_2.11-2.2.1.jar, \
lib/spark-sql-streaming-mqtt_2.11-2.2.1.jar, \
lib/org.eclipse.paho.client.mqttv3-1.2.0.jar \
TestSpark.py
这解决了问题。以前我只是更改 MQTT jar 文件的版本,例如spark-streaming-mqtt_2.11-2.2.1.jar,但显然还不够。
关于python - getBatch 从 MQTTTextStreamSource 返回的 DataFrame 没有 isStreaming=true,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51537178/
我尝试将 MQTT 与 PySpark Structured Streaming 结合使用。 from pyspark.sql import SparkSession from pyspark.sql
我是一名优秀的程序员,十分优秀!