gpt4 book ai didi

apache-spark - 如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?

转载 作者:行者123 更新时间:2023-12-04 11:42:58 26 4
gpt4 key购买 nike

我想使用 Python (PySpark) 从 Kafka 源到 MariaDB 执行 Spark Structured Streaming (Spark 2.4.x)。

我想使用流式 Spark 数据帧,而不是静态或 Pandas 数据帧。

看来得用foreachforeachBatch因为根据 https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks,流数据帧没有可能的数据库接收器.

这是我的尝试:

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructField, StructType, StringType, DoubleType, TimestampType
from pyspark.sql import DataFrameWriter
# configuration of target db
db_target_url = "jdbc:mysql://localhost/database"
db_target_properties = {"user":"writer", "password":"1234"}
# schema
schema_simple = StructType([StructField("Signal", StringType()),StructField("Value", DoubleType())])

# create spark session
spark = SparkSession.builder.appName("streamer").getOrCreate()

# create DataFrame representing the stream
df = spark.readStream \
.format("kafka").option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "mytopic") \
.load() \
.selectExpr("Timestamp", "cast (value as string) as json") \
.select("Timestamp", F.from_json("json", schema_simple).alias('json_wrapper')) \
.selectExpr("Timestamp", "json_wrapper.Signal", "json_wrapper.Value")
df.printSchema()
# Do some dummy processing
df2 = df.filter("Value < 11111111111")
print("df2: ", df2.isStreaming)

def process_row(row):
# Process row
row.write.jdbc(url=db_target_url, table="mytopic", mode="append", properties=db_target_properties)
pass
query = df2.writeStream.foreach(process_row).start()

我收到一个错误:

AttributeError: write



为什么?

最佳答案

tl;博士 替换 foreachforeachBatch .

引用 official documentation :

The foreach and foreachBatch operations allow you to apply arbitrary operations and writing logic on the output of a streaming query. They have slightly different use cases - while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.



换句话说,您的 writeStream.foreach(process_row)作用于没有 write.jdbc 的单行(数据)可用,因此错误。

将行视为一段数据,您可以使用任何您想要的 API 将其保存在您想要的任何位置。

如果您确实需要 Spark 的支持(并且确实使用 write.jdbc ),您实际上应该使用 foreachBatch .

while foreach allows custom write logic on every row, foreachBatch allows arbitrary operations and custom logic on the output of each micro-batch.

关于apache-spark - 如何在 PySpark 中使用 foreach 或 foreachBatch 写入数据库?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58766638/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com