gpt4 book ai didi

python - 如何使用 pyspark 从 Kafka 获取并打印一行?必须使用 writeStream.start() 执行流式源查询

转载 作者:行者123 更新时间:2023-12-05 06:47:56 28 4
gpt4 key购买 nike

我正在尝试从 Kafka 读取一些数据以查看那里有什么。

我写了

builder = SparkSession.builder\
.appName("PythonTest01")

spark = builder.getOrCreate()

# Subscribe to 1 topic
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", config["kafka"]["bootstrap.servers"]) \
.option("subscribe", dataFlowTopic) \
.load()

# df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

df.printSchema()

df = df.first()

query = df \
.writeStream \
.outputMode('complete') \
.format('console') \
.start()

query.awaitTermination()

不幸的是,它发誓

pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();

它想要什么以及如何满足它?


如果我删除 first() 它发誓

Complete output mode not supported when there are no streaming aggregations on streaming DataFrames/Datasets;

我要写

#df = df.first()

query = df \
.writeStream \
.outputMode('append') \
.format('console') \
.start()

query.awaitTermination()

它不是首先打印,而是最后一行并且不会终止。

最佳答案

and not terminates.

是 Steam ;它并不意味着终止

printing not first, but last row

引用startingOffsets选项。默认为latest

https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#reading-data-from-kafka

关于python - 如何使用 pyspark 从 Kafka 获取并打印一行?必须使用 writeStream.start() 执行流式源查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/66953694/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com