gpt4 book ai didi

hadoop - Kafka 结构化流检查点

转载 作者:可可西里 更新时间:2023-11-01 14:45:13 26 4
gpt4 key购买 nike

我正在尝试从 Kafka 进行结构化流式传输。我打算在 HDFS 中存储检查点。我读了一篇 Cloudera 博客,建议不要在 HDFS 中为 Spark 流存储检查点。结构流式检查点是否存在相同的问题。 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/ .

在结构化流式传输中,如果我的 spark 程序停机了一段时间,我如何从检查点目录获取最新的偏移量并在该偏移量之后加载数据。我将检查点存储在如下所示的目录中。

 df.writeStream\
.format("text")\
.option("path", '\files') \
.option("checkpointLocation", 'checkpoints\chkpt') \
.start()

更新:

这是我的结构化流程序读取 Kafka 消息、解压缩并写入 HDFS。

df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.option("failOnDataLoss", "false")\
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()

decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream

query = decomp.writeStream\
.format("text")\
.option("path", \Data_directory_inHDFS) \
.option("checkpointLocation", \pathinDHFS\) \
.start()

query.awaitTermination()

最佳答案

在长期存储(HDFS、AWS S3 等)上存储检查点是首选。我想在这里补充一点,属性“failOnDataLoss”不应设置为 false,因为这不是最佳做法。数据丢失是没有人愿意承受的。休息吧,你走在正确的道路上。

关于hadoop - Kafka 结构化流检查点,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46612159/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com