- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
基于 Spark 3.0 中的介绍,https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html .应该可以设置“kafka.group.id”来跟踪偏移量。对于我们的用例,如果流式 Spark 作业失败并重新启动,我想避免潜在的数据丢失。根据我之前的问题,我觉得 Spark 3.0 中的 kafka.group.id 会有所帮助。
How to specify the group id of kafka consumer for spark structured streaming?
How to ensure no data loss for kafka data ingestion through Spark Structured Streaming?
但是,我在 spark 3.0 中尝试了如下设置。
package com.example
/**
* @author ${user.name}
*/
import scala.math.random
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType, BooleanType, LongType}
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SaveMode
import org.apache.spark.SparkFiles
import java.util.Properties
import org.postgresql.Driver
import org.apache.spark.sql.streaming.Trigger
import java.time.Instant
import org.apache.hadoop.fs.{FileSystem, Path}
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement
//import org.apache.spark.sql.hive.HiveContext
import scala.io.Source
import java.nio.charset.StandardCharsets
import com.amazonaws.services.kms.{AWSKMS, AWSKMSClientBuilder}
import com.amazonaws.services.kms.model.DecryptRequest
import java.nio.ByteBuffer
import com.google.common.io.BaseEncoding
object App {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder()
.appName("MY-APP")
.getOrCreate()
import spark.sqlContext.implicits._
spark.catalog.clearCache()
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
spark.sparkContext.setLogLevel("ERROR")
spark.sparkContext.setCheckpointDir("/home/ec2-user/environment/spark/spark-local/checkpoint")
System.gc()
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "mybroker.io:6667")
.option("subscribe", "mytopic")
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.ssl.truststore.location", "/home/ec2-user/environment/spark/spark-local/creds/cacerts")
.option("kafka.ssl.truststore.password", "changeit")
.option("kafka.ssl.truststore.type", "JKS")
.option("kafka.sasl.kerberos.service.name", "kafka")
.option("kafka.sasl.mechanism", "GSSAPI")
.option("kafka.group.id","MYID")
.load()
df.printSchema()
val schema = new StructType()
.add("id", StringType)
.add("x", StringType)
.add("eventtime", StringType)
val idservice = df.selectExpr("CAST(value AS STRING)")
.select(from_json(col("value"), schema).as("data"))
.select("data.*")
val monitoring_df = idservice
.selectExpr("cast(id as string) id",
"cast(x as string) x",
"cast(eventtime as string) eventtime")
val monitoring_stream = monitoring_df.writeStream
.trigger(Trigger.ProcessingTime("120 seconds"))
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
if(!batchDF.isEmpty)
{
batchDF.persist()
printf("At %d, the %dth microbatch has %d records and %d partitions \n", Instant.now.getEpochSecond, batchId, batchDF.count(), batchDF.rdd.partitions.size)
batchDF.show()
batchDF.write.mode(SaveMode.Overwrite).option("path", "/home/ec2-user/environment/spark/spark-local/tmp").saveAsTable("mytable")
spark.catalog.refreshTable("mytable")
batchDF.unpersist()
spark.catalog.clearCache()
}
}
.start()
.awaitTermination()
}
}
使用以下 spark-submit 命令在独立模式下测试 spark 作业,但在 AWS EMR 中以集群模式部署时存在同样的问题。
spark-submit --master local[1] --files /home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf,/home/ec2-user/environment/spark/spark-localreds/cacerts,/home/ec2-user/environment/spark/spark-local/creds/krb5.conf,/home/ec2-user/environment/spark/spark-local/creds/my.keytab --driver-java-options "-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.dynamicAllocation.enabled=false --conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf "spark.driver.extraJavaOptions=-Djava.security.auth.login.config=/home/ec2-user/environment/spark/spark-local/creds/client_jaas.conf -Djava.security.krb5.conf=/home/ec2-user/environment/spark/spark-local/creds/krb5.conf" --conf spark.yarn.maxAppAttempts=1000 --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 --class com.example.App ./target/sparktest-1.0-SNAPSHOT-jar-with-dependencies.jar
然后,我开始流式作业以从 Kafka 主题读取流式数据。一段时间后,我终止了 Spark 作业。然后,我等待 1 小时再次开始工作。如果我理解正确,新的流数据应该从我终止 spark 作业时的偏移量开始。但是,它仍然以最新的偏移量开始,这在我停止作业期间导致数据丢失。
{"8":109904920,"2":109905750,"5":109905789,"4":109905621,"7":109905330,"1":109905746,"9":109905750,"3":109905936,"6":109905531,"0":109905583}}
一种建议是将检查点放置到某个外部存储,例如 s3。即使在您需要重建 EMR 集群本身的情况下,它也可以帮助恢复偏移量。
最佳答案
根据Spark Structured Integration Guide ,Spark 本身会跟踪偏移量,并且没有提交回 Kafka 的偏移量。这意味着如果您的 Spark Streaming 作业失败并且您重新启动它,所有关于偏移量的必要信息都存储在 Spark 的检查点文件中。
即使您使用 kafka.group.id
设置了 ConsumerGroup 名称,您的应用程序仍不会将消息提交回 Kafka。有关要读取的下一个偏移量的信息仅在 Spark 应用程序的检查点文件中可用。
如果您在没有重新部署的情况下停止并重新启动您的应用程序,并确保您没有删除旧的检查点文件,您的应用程序将从停止的地方继续读取。
在 Recovering from Failures with Checkpointing 上的 Spark Structured Streaming 文档中它写道:
"In case of a failure or intentional shutdown, you can recover the previous progress and state of a previous query, and continue where it left off. This is done using checkpointing and write-ahead logs. You can configure a query with a checkpoint location, and the query will save all the progress information (i.e. range of offsets processed in each trigger) [...]"
writeStream
中设置以下选项来实现。查询(在 SparkContext 配置中设置检查点目录是不够的):
.option("checkpointLocation", "path/to/HDFS/dir")
在文档中还指出“此检查点位置必须是
HDFS 兼容文件系统 中的路径,并且可以在启动查询时设置为 DataStreamWriter 中的选项。”
ForeachBatch
接收器,您的应用程序中可能没有重新启动功能。
关于scala - 如何在 spark 3.0 结构化流媒体中使用 kafka.group.id 和检查点以继续从 Kafka 中读取它在重启后停止的位置?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64003405/
我想编写一个 linq 表达式,该表达式将返回不包含特定值的 ID。例如,我想返回所有不具有 Value = 30 的不同 ID。 ID, Value 1, 10 1, 20 1, 30 2,
我正在尝试使用 Regexp 匹配 Nmap 命令的输出。可以有两种不同的格式。 第一种格式(当 nmap 可以找到主机名时) Nmap scan report for 2u4n32t-n4 (192
我正在 Visual Studio 2012 上使用 C# 开发一个软件。我使用 MySQL Connector 6.9.1 进行 MySQL 连接。我的软件在我的操作系统(Win8 x64)上运行顺
在 Django 中(使用 django.contrib.auth 时)我可以添加一个 Group到另一个 Group ?即一个Group成为另一个成员(member) Group ? 如果是这样,我
我试图通过使用动态组参数对数据进行分组来循环。 我们可以在循环的 WHERE 条件上使用动态查询,但我不知道是否可以在组条件中使用动态字符串。 以下是用户决定按哪个字段分组,然后根据决定放置其他逻辑的
我有这样的字符串 s = 'MR1|L2-S1x' 模式总是相同的:一个或两个字符,在 [|.+:x-] 中可选地后跟一个数字和一个分隔符。此模式可以重复 6 次。 所以匹配模式很明确。 p = r'
我有一个带有时间戳字段“bar”的表“foo”。如何仅获取查询的最旧时间戳,例如: SELECT foo.bar from foo?我尝试执行以下操作: SELECT MIN(foo.bar) fro
在我的 Django 项目中,我有一个 user_manage 应用程序。 我在 user_manage 应用的 model.py 中创建了一个名为 UserManage 的模型: from djan
所以我有这样的输入: 还有一个模板指令,例如: 看来我只获得了 foo 和 bar 的组。 (为什么?我预计我可能会得到第三组 current-group-key() = '')。
我正在尝试扩展 django.contrib.auth 并遇到将用户添加到组中的情况,这可以通过两种方式完成。我只是想知道为什么会这样,以及其中一种相对于另一种的优势是什么。 最佳答案 他们做完全相同
我使用的是旧的 PHP 脚本,并且此查询有错误。由于我没有使用 mysql 的经验,因此无法修复它。 "SELECT COUNT(p.postid) AS pid, p.*, t.* FROM ".T
我有几行 Objective-C 代码,例如: ABAddressBookRef addressBook; CFErrorRef error = NULL; addressBook = ABAddre
我正在使用 MariaDB IMDB 电影数据集,我试图解决以下问题。电影表包含 id、名称、排名和年份列 A decade is a sequence of 10 consecutive years
让我从数据开始,以便更好地描述我的需求。我有一个名为 SUPERMARKET 的表,其中包含以下字段: Field 1: StoreID Field 2: ProductCategory Field
你好我有这个查询: SELECT DISTINCT a.id, a.runcd, (SELECT SUM(b.CALVAL) FROM GRS b WHERE b.PCode=11000 AND a.
我想在 xquery 中使用 Group By。有人可以告诉我如何在 Marklogic 中使用 Group By 吗? 最佳答案 或者,您可以使用 xdmp:xslt-invoke 调用 XSLT或
因此,当通过 from sequelize 请求组时,如下所示: return models.WorkingCalendar .findAll({
我希望我解释正确。 我有 2 个表,有 第一个表(table1) +------------+------+-------+-------+ | Date | Item | Block |
我的表 MYTABLE 有 2 列:A 和 B 我有以下代码片段: SELECT MYTABLE.A FROM MYTABLE HAVING SUM(MYTABLE.B) > 100
我有一个简单的行分组查询,需要 0.0045 秒。 300.000 行 从表 GROUP BY cid 中选择 cid 当我添加 MAX() 进行查询时,需要 0.65 秒才能返回。 从表 GROUP
我是一名优秀的程序员,十分优秀!