gpt4 book ai didi

scala - 如何在 spark 3.0 结构化流媒体中使用 kafka.group.id 和检查点以继续从 Kafka 中读取它在重启后停止的位置?

转载 作者:行者123 更新时间:2023-12-04 02:31:23 24 4
gpt4 key购买 nike

基于 Spark 3.0 中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html .应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失。根据我之前的问题,我觉得 Spark 3.0 中的 kafka.group.id 会有所帮助。
How to specify the group id of kafka consumer for spark structured streaming?
How to ensure no data loss for kafka data ingestion through Spark Structured Streaming?
但是,我在 spark 3.0 中尝试了如下设置。

package com.example

/**
* @author ${user.name}
*/
import scala.math.random

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement


//import org.apache.spark.sql.hive.HiveContext

import scala.io.Source

import java.nio.charset.StandardCharsets

import com.amazonaws.services.kms.{AWSKMS, AWSKMSClientBuilder}
import com.amazonaws.services.kms.model.DecryptRequest
import java.nio.ByteBuffer
import com.google.common.io.BaseEncoding


object App {

def main(args: Array[String]): Unit = {

val spark: SparkSession = SparkSession.builder()
.appName("MY-APP")
.getOrCreate()

import spark.sqlContext.implicits._

spark.catalog.clearCache()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")

spark.sparkContext.setLogLevel("ERROR")
spark.sparkContext.setCheckpointDir("/home/ec2-user/environment/spark/spark-local/checkpoint")

System.gc()

val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "mybroker.io:6667")
.option("subscribe", "mytopic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", "/home/ec2-user/environment/spark/spark-local/creds/cacerts")
.option("kafka.ssl.truststore.password", "changeit")
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("kafka.group.id","MYID")
.load()

df.printSchema()


val schema = new StructType()
.add("id", StringType)
.add("x", StringType)
.add("eventtime", StringType)

val idservice = df.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).as("data"))
.select("data.*")


val monitoring_df = idservice
.selectExpr("cast(id as string) id",
"cast(x as string) x",
"cast(eventtime as string) eventtime")

val monitoring_stream = monitoring_df.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty)
{
batchDF.persist()
printf("At %d, the %dth microbatch has %d records and %d partitions \n", Instant.now.getEpochSecond, batchId, batchDF.count(), batchDF.rdd.partitions.size)
batchDF.show()

batchDF.write.mode(SaveMode.Overwrite).option("path", "/home/ec2-user/environment/spark/spark-local/tmp").saveAsTable("mytable")
spark.catalog.refreshTable("mytable")

batchDF.unpersist()
spark.catalog.clearCache()
}
}
.start()
.awaitTermination()
}

}
使用以下 spark-submit 命令在独立模式下测试 spark 作业,但在 AWS EMR 中以集群模式部署时存在同样的问题。
spark-submit --master local[1] --files /home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf,/home/ec2-user/environment/spark/spark-localreds/cacerts,/home/ec2-user/environment/spark/spark-local/creds/krb5.conf,/home/ec2-user/environment/spark/spark-local/creds/my.keytab --driver-java-options "-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.dynamicAllocation.enabled=false --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.yarn.maxAppAttempts=1000 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 --class com.example.App ./target/sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar
然后,我开始流式作业以从 Kafka 主题读取流式数据。一段时间后,我终止了 Spark 作业。然后,我等待 1 小时再次开始工作。如果我理解正确,新的流数据应该从我终止 spark 作业时的偏移量开始。但是,它仍然以最新的偏移量开始,这在我停止作业期间导致数据丢失。
我是否需要配置更多选项以避免数据丢失?还是我对 Spark 3.0 有什么误解?谢谢!
问题解决
这里的关键问题是检查点必须专门添加到查询中。仅仅为 SparkContext 添加检查点是不够的。添加检查点后,它正在工作。在checkpoint文件夹中,会创建一个offset子文件夹,里面有offset文件,0,1,2,3....对于每个文件,会显示不同分区的offset信息。
{"8":109904920,"2":109905750,"5":109905789,"4":109905621,"7":109905330,"1":109905746,"9":109905750,"3":109905936,"6":109905531,"0":109905583}}
一种建议是将检查点放置到某个外部存储,例如 s3。即使在您需要重建 EMR 集群本身的情况下,它也可以帮助恢复偏移量。

最佳答案

根据Spark Structured Integration Guide ,Spark 本身会跟踪偏移量,并且没有提交回 Kafka 的偏移量。这意味着如果您的 Spark Streaming 作业失败并且您重新启动它,所有关于偏移量的必要信息都存储在 Spark 的检查点文件中。
即使您使用 kafka.group.id 设置了 ConsumerGroup 名称,您的应用程序仍不会将消息提交回 Kafka。有关要读取的下一个偏移量的信息仅在 Spark 应用程序的检查点文件中可用。
如果您在没有重新部署的情况下停止并重新启动您的应用程序,并确保您没有删除旧的检查点文件,您的应用程序将从停止的地方继续读取。
Recovering from Failures with Checkpointing 上的 Spark Structured Streaming 文档中它写道:

"In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) [...]"


这可以通过在您的 writeStream 中设置以下选项来实现。查询(在 SparkContext 配置中设置检查点目录是不够的):
.option("checkpointLocation", "path/to/HDFS/dir")
在文档中还指出“此检查点位置必须是 HDFS 兼容文件系统 中的路径,并且可以在启动查询时设置为 DataStreamWriter 中的选项。”
此外,Spark Structured Streaming 的容错能力还取决于您的输出接收器,如 Output Sinks 部分所述。 .
由于您目前正在使用 ForeachBatch接收器,您的应用程序中可能没有重新启动功能。
enter image description here

关于scala - 如何在 spark 3.0 结构化流媒体中使用 kafka.group.id 和检查点以继续从 Kafka 中读取它在重启后停止的位置?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64003405/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com