gpt4 book ai didi

pyspark - 每个微批处理 Spark Streaming 中处理的总记录数

转载 作者:行者123 更新时间:2023-12-04 07:27:00 28 4
gpt4 key购买 nike

有没有一种方法可以找到每个微批处理有多少记录被处理到下游增量表中。我有流式作业,使用 trigger.once()append 模式 每小时运行一次。出于审计目的,我想知道每个微批处理了多少条记录。我尝试了以下代码来打印已处理的记录数(显示在第二行)。

ss_count=0 

def write_to_managed_table(micro_batch_df, batchId):
#print(f"inside foreachBatch for batch_id:{batchId}, rows in passed dataframe: {micro_batch_df.count()}")

ss_count = micro_batch_df.count()

saveloc = "TABLE_PATH"
df_final.writeStream.trigger(once=True).foreachBatch(write_to_managed_table).option('checkpointLocation', f"{saveloc}/_checkpoint").start(saveloc)

print(ss_count)

流式处理作业将正常运行,但 micro_batch_df.count() 不会打印任何计数。

如有任何指点,我们将不胜感激。

最佳答案

这是您正在寻找的工作示例(structured_steaming_example.py):

from pyspark.sql import SparkSession

spark = SparkSession \
.builder \
.appName("StructuredStreamTesting") \
.getOrCreate()

# Create DataFrame representing the stream of input
df = spark.read.parquet("data/")
lines = spark.readStream.schema(df.schema).parquet("data/")


def batch_write(output_df, batch_id):
print("inside foreachBatch for batch_id:{0}, rows in passed dataframe: {1}".format(batch_id, output_df.count()))


save_loc = "/tmp/example"
query = (lines.writeStream.trigger(once=True)
.foreachBatch(batch_write)
.option('checkpointLocation', save_loc + "/_checkpoint")
.start(save_loc)
)
query.awaitTermination()

示例 parquet 文件已附上。请将其放入数据文件夹并使用 spark-submit 执行代码

spark-submit --master local structured_steaming_example.py

请将任何示例 parquet 文件放在数据文件夹下进行测试。

enter image description here

关于pyspark - 每个微批处理 Spark Streaming 中处理的总记录数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68164181/

28 4 0