gpt4 book ai didi

apache-spark - 在不停止应用程序的情况下重新启动流式查询

转载 作者:行者123 更新时间:2023-12-04 17:50:17 25 4
gpt4 key购买 nike

我尝试使用下面的代码代替 query.awaitTermination() 在 spark 中重新启动流式查询,下面的代码将在一个无限循环内并寻找触发器以重新启动查询然后执行下面的代码。基本上我试图刷新缓存的 df .

 query.processAllavaialble()
query.stop()
//oldDF is a cached Dataframe created from GlobalTempView which is of size 150GB.
oldDF.unpersist()
val inputDf: DataFrame = readFile(spec, sparkSession) //read file from S3
or anyother source
val recreateddf = inputDf.persist()
//Start the query// here should i start query again by invoking readStream ?

但是当我查看 spark 文档时它说

void processAllAvailable() ///documentation says This method is intended for testing/// Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing. Note that in the case of continually arriving data, this method may block forever. Additionally, this method is only guaranteed to block until data that has been synchronously appended data to a Source prior to invocation. (i.e. getOffset must immediately reflect the addition).


stop() Stops the execution of this query if it is running. This method blocks until the threads performing execution has stopped.

那么在不停止我的 spark 流应用程序的情况下重新启动查询的更好方法是什么

最佳答案

这对我有用。

下面是我在 spark 2.4.5 中针对左外连接和左连接所遵循的场景。下面的过程是推送 spark 以读取最新的维度数据更改。

Process 用于 Stream Join 和 batch 维度(始终更新)

第 1 步:-

在启动 Spark 流作业之前:- 确保维度批处理数据文件夹只有一个文件,并且该文件应该至少有一条记录(由于某种原因,放置空文件不起作用)/

第 2 步:- 开始您的流媒体作业并在 kafka 流中添加流记录

第 3 步:- 用值覆盖暗数据(文件应同名,不要更改,维度文件夹应只有一个文件)注意:- 不要使用 spark 使用 Java 或 Scala 写入此文件夹filesystem.io 覆盖文件或 bash 删除文件并替换为同名的新数据文件。

第 4 步:- 在下一批中,spark 能够在加入 kafka 流时读取更新的维度数据...

示例代码:-

package com.databroccoli.streaming.streamjoinupdate

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}

object BroadCastStreamJoin3 {

def main(args: Array[String]): Unit = {
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)

Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
Logger.getLogger("io.netty").setLevel(Level.ERROR)

val spark = SparkSession
.builder()
.master("local")
.getOrCreate()

val schemaUntyped1 = StructType(
Array(
StructField("id", StringType),
StructField("customrid", StringType),
StructField("customername", StringType),
StructField("countrycode", StringType),
StructField("timestamp_column_fin_1", TimestampType)
))

val schemaUntyped2 = StructType(
Array(
StructField("id", StringType),
StructField("countrycode", StringType),
StructField("countryname", StringType),
StructField("timestamp_column_fin_2", TimestampType)
))

val factDf1 = spark.readStream
.schema(schemaUntyped1)
.option("header", "true")
.csv("src/main/resources/broadcasttest/fact")


val dimDf3 = spark.read
.schema(schemaUntyped2)
.option("header", "true")
.csv("src/main/resources/broadcasttest/dimension")
.withColumnRenamed("id", "id_2")
.withColumnRenamed("countrycode", "countrycode_2")

import spark.implicits._

factDf1
.join(
dimDf3,
$"countrycode_2" <=> $"countrycode",
"inner"
)
.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination

}
}

关于apache-spark - 在不停止应用程序的情况下重新启动流式查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45617892/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com