gpt4 book ai didi

pyspark - kafka到pyspark结构化流,将json解析为数据帧

转载 作者:行者123 更新时间:2023-12-01 18:30:15 25 4
gpt4 key购买 nike

我正在尝试使用 Spark 结构化流(spark v2.2.0)来使用来自 kafka 的 json 数据。但是我遇到了以下错误。

pyspark.sql.utils.StreamingQueryException: 'Missing required configuration "partition.assignment.strategy" which has no default value.

有人知道为什么吗?该作业是使用下面的 Spark-submit 提交的。

spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 sparksstream.py

这是整个 python 脚本。

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

spark = SparkSession \
.builder \
.appName("test") \
.getOrCreate()

# Define schema of json
schema = StructType() \
.add("Session-Id", StringType()) \
.add("TransactionTimestamp", IntegerType()) \
.add("User-Name", StringType()) \
.add("ID", StringType()) \
.add("Timestamp", IntegerType())

# load data into spark-structured streaming
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xxxx:9092") \
.option("subscribe", "topicName") \
.load() \
.select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

# Print output
query = df.writeStream \
.outputMode("append") \
.format("console") \
.start()

最佳答案

使用它来提交:

spark-submit \
--conf "spark.driver.extraClassPath=$SPARK_HOME/jars/kafka-clients-1.1.0.jar" \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.2.0 \
sparksstream.py

假设您已将 kafka-clients*jar 下载到 $SPARK_HOME/jars 文件夹中

关于pyspark - kafka到pyspark结构化流,将json解析为数据帧,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46660790/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com