gpt4 book ai didi

java - 如何为 Spark 流检查点设置检查点间隔?

转载 作者:搜寻专家 更新时间:2023-11-01 03:00:06 24 4
gpt4 key购买 nike

我想根据官方文档为我的 python spark 流脚本设置检查点间隔:

For stateful transformations that require RDD checkpointing, the default interval is a multiple of the batch interval that is at least 10 seconds. It can be set by using dstream.checkpoint(checkpointInterval). Typically, a checkpoint interval of 5 - 10 times of sliding interval of a DStream is good setting to try.

我的脚本:

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

def functionToCreateContext():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 6)
ssc.checkpoint("./checkpoint")
kvs = KafkaUtils.createDirectStream(ssc, ['test123'], {"metadata.broker.list": "localhost:9092"})

kvs = kvs.checkpoint(60) #set the checkpoint interval

lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
return ssc

if __name__ == "__main__":
ssc = StreamingContext.getOrCreate("./checkpoint", functionToCreateContext)

ssc.start()
ssc.awaitTermination()

运行脚本后的输出:

16/05/25 17:49:03 INFO DirectKafkaInputDStream: Slide time = 6000 ms
16/05/25 17:49:03 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/05/25 17:49:03 INFO DirectKafkaInputDStream: Checkpoint interval = null
16/05/25 17:49:03 INFO DirectKafkaInputDStream: Remember duration = 120000 ms
16/05/25 17:49:03 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1be80174
16/05/25 17:49:03 INFO PythonTransformedDStream: Slide time = 6000 ms
16/05/25 17:49:03 INFO PythonTransformedDStream: Storage level = StorageLevel(false, true, false, false, 1)
16/05/25 17:49:03 INFO PythonTransformedDStream: Checkpoint interval = 60000 ms
16/05/25 17:49:03 INFO PythonTransformedDStream: Remember duration = 120000 ms
16/05/25 17:49:03 INFO PythonTransformedDStream: Initialized and validated org.apache.spark.streaming.api.python.PythonTransformedDStream@69f9a089
16/05/25 17:49:03 INFO PythonTransformedDStream: Slide time = 6000 ms
16/05/25 17:49:03 INFO PythonTransformedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/05/25 17:49:03 INFO PythonTransformedDStream: Checkpoint interval = null
16/05/25 17:49:03 INFO PythonTransformedDStream: Remember duration = 6000 ms
16/05/25 17:49:03 INFO PythonTransformedDStream: Initialized and validated org.apache.spark.streaming.api.python.PythonTransformedDStream@d97386a
16/05/25 17:49:03 INFO PythonTransformedDStream: Slide time = 6000 ms
16/05/25 17:49:03 INFO PythonTransformedDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/05/25 17:49:03 INFO PythonTransformedDStream: Checkpoint interval = null
16/05/25 17:49:03 INFO PythonTransformedDStream: Remember duration = 6000 ms
16/05/25 17:49:03 INFO PythonTransformedDStream: Initialized and validated org.apache.spark.streaming.api.python.PythonTransformedDStream@16c474ad
16/05/25 17:49:03 INFO ForEachDStream: Slide time = 6000 ms
16/05/25 17:49:03 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1)
16/05/25 17:49:03 INFO ForEachDStream: Checkpoint interval = null
16/05/25 17:49:03 INFO ForEachDStream: Remember duration = 6000 ms
..........

DStream 检查点间隔仍然为空。有什么想法吗?

最佳答案

创建流后尝试将此行向下移动几行:ssc.checkpoint("./checkpoint")

基本上,在您完全准备好流之后执行检查点。

关于java - 如何为 Spark 流检查点设置检查点间隔?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37444437/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com