- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在编写一个库来将 Apache Spark 与自定义环境集成。我正在实现自定义流媒体源和流媒体作家。
我正在开发的一些资源是不可恢复的,至少在应用程序崩溃之后是这样。如果应用程序重新启动,它需要重新加载所有数据。
因此,我们希望避免用户必须明确设置“checkpointLocation”选项。
但如果未提供该选项,我们会看到以下错误:
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
18/06/29 16:36:48 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/mydir/spark-warehouse/').
18/06/29 16:36:48 INFO SharedState: Warehouse path is 'file:/C:/mydir/spark-warehouse/'.
18/06/29 16:36:48 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
org.apache.spark.sql.AnalysisException: checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...);
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:213)
at org.apache.spark.sql.streaming.StreamingQueryManager$$anonfun$3.apply(StreamingQueryManager.scala:208)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:207)
at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:299)
at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:296)
...
18/06/29 16:36:50 INFO SparkContext: Invoking stop() from shutdown hook
spark.readStream().format("mysource").load()
.writeStream().format("mywriter").outputMode(OutputMode.Append()).start();
spark.readStream().format("mysource").load()
.writeStream().format("console").outputMode(OutputMode.Append()).start();
class MySourceProvider extends DataSourceRegister with StreamWriteSupport {
def createStreamWriter(queryId: String, schema: StructType, mode: OutputMode, options: DataSourceOptions): StreamWriter = {
new MyStreamWriter(...)
}
def shortName(): String = {
"mywriter"
}
}
class MyStreamWriter(...) extends StreamWriter {
def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
def createWriterFactory(): DataWriterFactory[Row] = {
new MyDataWriterFactory()
}
}
最佳答案
您需要在代码中添加 checkpointLocation
option("checkpointLocation", "/tmp/vaquarkhan/checkpoint"). // <-- checkpoint directory
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
val q = records.
writeStream.
format("console").
option("truncate", false).
option("checkpointLocation", "/tmp/vaquarkhan/checkpoint"). // <-- checkpoint directory
trigger(Trigger.ProcessingTime(10.seconds)).
outputMode(OutputMode.Update).
start
.option("startingOffsets", "latest") // read data from the end of the stream
checkpointLocation must be specified either through option("checkpointLocation", ...) or SparkSession.conf.set("spark.sql.streaming.checkpointLocation", ...)
private def createQuery(
userSpecifiedName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
df: DataFrame,
extraOptions: Map[String, String],
sink: BaseStreamingSink,
outputMode: OutputMode,
useTempCheckpointLocation: Boolean,
recoverFromCheckpointLocation: Boolean,
trigger: Trigger,
triggerClock: Clock): StreamingQueryWrapper = {
var deleteCheckpointOnStop = false
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
}.orElse {
df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
}
}.getOrElse {
if (useTempCheckpointLocation) {
// Delete the temp checkpoint when a query is being stopped without errors.
deleteCheckpointOnStop = true
Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
"""through option("checkpointLocation", ...) or """ +
s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
}
}
关于java - Spark 流: avoid checkpointLocation check,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50936964/
我正在尝试将网页内容打印到一页纸上。但是,它将内容分成 2 页,所以我在这里做了一些研究,看到有人推荐: #my_print_div{ width:940px; height:770px; page
我目前正在打印一些东西。我有一个动态页面,其中包含可变数量的 block 级元素。有些可能是 1 行,有些可能是 100 多行。 1text 1 line.... 2text 10 lines....
很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visit the help center . 关闭 9
我正在训练一个 randomForest 模型,目的是保存它以进行预测(它将被下载并在外部上下文中使用)。我希望这个模型尽可能最小。 我读到有很多options和 packages减少模型的内存大小。
这个问题在这里已经有了答案: MySQL connection timeout (3 个答案) 关闭 9 年前。 我一直在尝试使用 Tomcat 的 native 连接池功能来避免我的 Java W
我正在使用 Phonegap/Cordova 开发 Android 应用程序。我已经按照这样的百分比安排了我的布局(在 CSS 中): 标题 - 50px; Content_row1 - 30%(剩下
我正在编写一个插件,它将表情符号转换为特定站点文本 block 中的图像。简单的答案是使用正则表达式检测 innerHTML 上的触发文本并插入 img 标签,然后将字符串通过管道返回到 innerH
如何避免在我的 Drupal View 上重复? 我应该添加一个过滤器,指定特定字段(即用户 ID)不应出现两次吗?我找不到这样的选项 看法 http://dl.dropbox.com/u/72686
感谢您查看我的 typescript 问题。 为简单起见,我对 typescript “过度属性检查”行为有疑问。我想确保 TypeScript 不接受具有额外属性的对象。 在我的简单界面示例中,OF
我发现对于某些图表,我从 Prometheus 获得了 doubles 值,其中应该只是一个: 我使用的查询: increase(signups_count[4m]) 抓取间隔设置为 recommen
假设我正在运行N个线程。 每个线程都需要与下一个和上一个同步。 for (i = 0 ; i < NITER; i++){ do_something (); sync_
如今,服务器虚拟化是一件大事,所以我的任务是在虚拟化服务器上安装我们的一些软件,看看会发生什么。长话短说:rsync 传输会立即使虚拟化服务器崩溃。虚拟化主机是一台强大的机器,没有其他负载;我认为
以下正则表达式在应用于大型 html 页面时会创建 StackOverflowError: (.|\s)*? 我的假设是,这是由于逻辑“OR”运算符(|)在匹配器中创建了递归调用,并且由于需要解析的
我在运行时使用表达式树构建委托(delegate): Type type = GetType(); ParameterExpression parameterType = Expression.Par
我正在使用 scikit-learn TfidfVectorizer 找出两个文档中最重要的单词。每个文档大小为 1.9GB(约 9000 万字),并且已采用小写、词干化(使用 nltk.stem.p
我进行了一个中间件调用来获取 String 数组,如下所示: String[] freqwords = MViewer.getWordNames(); 问题是可能没有可用数据,因此任何进一步的操作(如
在 JavaFx 中,我使用以下代码创建一个 StackedBarChart: String[] ACTIVITIES = new String[10]{ ... };// there
我正在尝试制作一个使用类 AnimationTimer 来处理它的游戏。我的代码摘要如下所示: 主类 object Game extends JFXApp{ def showMenu{
我正在用不同的步骤创建一个小的 javascript/jQuery 应用程序。为此,我使用了一个具有不同功能的 js 文件。 在文件的顶部我调用了我的第一个函数。在我的第一个函数中,我在单击按钮时调用
我正在使用表格 View 来显示从服务器加载的文本字段数组,所以我有一个表格 View 字段列表,当我填充这些数据字段并向下滚动以填充其他字段时,当我再次向上滚动时,我发现值发生变化并且存在重复值 -
我是一名优秀的程序员,十分优秀!