- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
所以我有一些数据在 Kafka 主题中进行流式传输,我正在获取这些流式数据并将其放入 DataFrame
.我想在 DataFrame 中显示数据:
import os
from kafka import KafkaProducer
from pyspark.sql import SparkSession, DataFrame
import time
from datetime import datetime, timedelta
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0,org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 pyspark-shell'
topic_name = "my-topic"
kafka_broker = "localhost:9092"
producer = KafkaProducer(bootstrap_servers = kafka_broker)
spark = SparkSession.builder.getOrCreate()
terminate = datetime.now() + timedelta(seconds=30)
while datetime.now() < terminate:
producer.send(topic = topic_name, value = str(datetime.now()).encode('utf-8'))
time.sleep(1)
readDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("subscribe", topic_name) \
.load()
readDF = readDF.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")
readDF.writeStream.format("console").start()
readDF.show()
producer.close()
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/spark/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.showString.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
...
Traceback (most recent call last):
File "test2.py", line 30, in <module>
readDF.show()
File "/home/spark/spark/python/pyspark/sql/dataframe.py", line 336, in show
print(self._jdf.showString(n, 20))
File "/home/spark/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/home/spark/spark/python/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'
writeStream.start()
就在
show()
之前.我试图摆脱
selectExpr()
但这并没有什么不同。有谁知道如何显示流来源的 DataFrame?我正在使用 Python 3.6.1、Kafka 0.10.2.1 和 Spark 2.2.0
最佳答案
Streaming DataFrame 不支持 show()
方法。当您调用 start()
方法,它将启动一个后台线程将输入数据流式传输到接收器,并且由于您使用的是 ConsoleSink,它会将数据输出到控制台。您无需调用show()
.
删除 readDF.show()
然后添加一个 sleep ,然后您应该可以在控制台中看到数据,例如
query = readDF.writeStream.format("console").start()
import time
time.sleep(10) # sleep 10 seconds
query.stop()
startingOffsets
至
earliest
,否则,Kafka 源代码将只是从最新的偏移量开始,并且在您的情况下什么也不获取。
readDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", kafka_broker) \
.option("startingOffsets", "earliest") \
.option("subscribe", topic_name) \
.load()
关于apache-spark - 如何显示流数据帧(显示失败并出现 AnalysisException)?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45092445/
我正在使用spark-sql 2.4.1和java 8。 val country_df = Seq( ("us",2001), ("fr",2002), ("jp",2002
我在 Windows 7 计算机上运行 Quickstart VM Cloudera,其中 8Go RAM 和 4Go 专用于 VM。 我使用 Sqoop(Cloudera VM 教程练习 1)将表从
我一直在使用 Spark Dataset API 对 JSON 执行操作以根据需要提取某些字段。但是,当我提供的让 spark 知道要提取哪个字段的规范出错时,spark 会吐出一个 org.apac
所以我有一些数据在 Kafka 主题中进行流式传输,我正在获取这些流式数据并将其放入 DataFrame .我想在 DataFrame 中显示数据: import os from kafka impo
我收到以下错误: 18/03/14 15:31:11 ERROR ApplicationMaster: User class threw exception: org.apache.spark.sql
今天早上我们将 Spark 版本从 2.2.0 更新到 2.3.0,我遇到了相当奇怪的问题。 我有一个 UDF(),计算 2 点之间的距离 private static UDF4 calcDistan
exitTotalDF .filter($"accid" === "dc215673-ef22-4d59-0998-455b82000015") .groupBy("exiturl") .
我正在使用标准的 hdfs 运行 amazon emr 的 spark 作业,而不是 S3 来存储我的文件。我在 hdfs://user/hive/warehouse/中有一个配置单元表,但在运行我的
val rdd = sc.parallelize(Seq(("vskp", Array(2.0, 1.0, 2.1, 5.4)),("hyd",Array(1.5, 0.5, 0.9, 3.7)),(
案例 1: 当我尝试获取“b.no”时出现错误,下面共享代码以及错误消息。我如何从第二个数据帧中获取值(即别名为 b)。此处是否允许从 b 中选择值。如果我删除 b.no 它工作正常。 df1.csv
在 Spark shell 上执行以下查询时,我面临分区错误: Expected only partition pruning predicates: ((((isnotnull(tenant_sui
我有一个这样的 JSON 数据: { "parent":[ { "prop1":1.0, "prop2":"C", "ch
我正在尝试将整个 df 转换为单个向量列,使用 df_vec = vectorAssembler.transform(df.drop('col200')) 我被抛出这个错误: File "/usr/h
我有一个带有 String[] 的数据集,我正在努力从中提取列。这是代码 import static org.apache.spark.sql.functions.col; //Read parque
first 的这种用法有什么问题?我想获取数据框中每个 id 的第一行,但它返回一个错误: Exception in thread "main" org.apache.spark.sql.Analys
我正在使用朴素贝叶斯算法对文章进行分类,并希望访问部分结果的“概率”列: val Array(trainingDF, testDF) = rawDataDF.randomSplit(Array(0.6
我正在使用neo4j-spark connector将neo4j数据提取到spark数据帧中。我能够成功获取它,因为我能够显示数据框。然后我用 createOrReplaceTempView() 注册
我正在尝试在 Impala 中执行查询并收到以下错误(AnalysisException:INT 和 STRING 类型的操作数不可比较:B.COMMENT_TYPE_CD = '100')有人可以帮
SparkSession .builder .master("local[*]") .config("spark.sql.warehouse.dir", "C:/tmp/spark")
我有一个返回 Dataset 的 Java 方法。我想将其转换为 Dataset ,其中该对象名为 StatusChangeDB。我创建了一个 POJO StatusChangeDB.java 并使用
我是一名优秀的程序员,十分优秀!