- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用谷歌云 Dataproc Spark 集群来运行 Spark 流作业,它从多个 PubSub 订阅中读取数据并写入 BigQuery。 PubSub 有 500 万个元素,滑动窗口为 2 分钟,批处理/窗口为 30 秒,我每批处理只得到大约 200,000 个元素。我希望第一批全部拿到500万。每个元素的大小约为 140 字节,采用 Avro 消息格式。
我在 Dataflow 中实现了每秒 100 万个元素的速度,但我想在 Dataproc 中实现同样的速度。我尝试使用 Dataproc 的自动缩放选项,还尝试使用在 Dataflow 上工作的相同 Beam 管道代码。如果我增加订阅数量,那么它可能会给我更多的吞吐量。是否有可能从单个订阅中获得 1M 元素/秒的吞吐量?
以下是我的 Scala 代码:
// Reading from multiple PubSub.
for (a <- 0 to Integer.parseInt(subs)) {
logger.info("SKCHECK : Creating stream : " + subscription + a)
val everysub = PubsubUtils.createStream(
ssc, projectId, None, subscription + a,
SparkGCPCredentials.builder.jsonServiceAccount(jsonPath).build(),
StorageLevel.MEMORY_ONLY_SER).map(message => {
// Method to send avro bytes message and get row
val row : Row = avroMsgToRow(message.getData())
row
})
}
我的 build.sbt
看起来像:
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
// "org.apache.spark" %% "spark-mllib" % sparkVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion,
// "org.apache.spark" %% "spark-hive" % sparkVersion,
"com.google.cloud" % "google-cloud-bigquery" % bigQueryVersion,
"com.google.apis" % "google-api-services-bigquery" % googleApiBigQueryVersion,
"com.google.cloud" % "google-cloud-nio" % gcsNioVersion,
"com.sksamuel.avro4s" %% "avro4s-core" % avro4sVersion
)
// https://mvnrepository.com/artifact/com.google.cloud.bigdataoss/bigquery-connector
libraryDependencies += "com.google.cloud.bigdataoss" % "bigquery-connector" % "0.10.0-hadoop2"
// https://mvnrepository.com/artifact/com.spotify/spark-bigquery
libraryDependencies += "com.spotify" %% "spark-bigquery" % "0.2.2"
libraryDependencies += "com.google.apis" % "google-api-services-pubsub" % "v1-rev425-1.25.0"
// https://mvnrepository.com/artifact/org.apache.bahir/spark-streaming-pubsub
libraryDependencies += "org.apache.bahir" %% "spark-streaming-pubsub" % "2.3.0"
// https://mvnrepository.com/artifact/org.scala-lang/scala-library
libraryDependencies += "org.scala-lang" % "scala-library" % "2.10.0-M3"
// https://mvnrepository.com/artifact/org.apache.spark/spark-avro
libraryDependencies += "org.apache.spark" %% "spark-avro" % "2.4.0"
如果您需要更多信息,请告诉我。
我希望通过单个 PubSub 订阅获得每秒 100 万个元素的数据摄取速度。
最佳答案
我认为您需要首先确定您的 Spark Streaming 作业的瓶颈。是 CPU 限制、内存限制、IO 限制还是因为 Spark 的某些参数导致它没有充分利用资源?我建议您首先检查资源利用率,然后尝试不同的 machine types .
关于apache-spark - 从 google pubsub 到 spark streaming 的数据摄取很慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57145527/
我有这个 json 模式 { "name":"Pete" "age":24, "subjects":[ { "name":"maths" "grade":"
测量海浪周期的变量的“单位”属性以“秒”为单位。这不是日期时间字段,但 xarray 会自动将此变量作为 timedelta64 摄取。由于单位不是“自...以来的秒数”,我会假设 xarray 应该
我尝试使用 geomesa-accumulo 摄取 geotiff 数据,但出现以下错误: WARNING: Failed to load the GDAL native libs. This is
我有一个很大的 JSON 字符串,包含 10 条记录,每条记录都有自己的属性。我需要使用 Javascript 将它们提取到我的 MongoDB 中。我对 Javascript 基本上没什么用,谷歌也
在谈到 MongoDB 时,我完全是个新手,但我以前确实有使用 Hbase 和 Accumulo 等 nosql 存储的经验。当我使用这些其他 nosql 平台时,我最终编写了自己的数据摄取框架(通常
我正在尝试为我正在开发的应用构建我自己的客户端 RTMP 库。到目前为止,一切都非常成功,因为我能够连接到 RTMP 服务器协商握手,然后发送所有必要的数据包(FCPublish Publish ET
我将 pandas 与 pandera 一起用于模式验证,但我遇到了一个问题,因为数据中有一个空整数列。 from prefect import task, Flow #type:i
我将 pandas 与 pandera 一起用于模式验证,但我遇到了一个问题,因为数据中有一个空整数列。 from prefect import task, Flow #type:i
我无法在网络服务中正确读取输入 JSON 文件。我正在尝试将一些输入参数从简单的字符串更改为字符串数组 我的输入 JSON 看起来像这样: { "inputParams" : { "speck
Snowflake 建议在摄取之前拆分大文件: To optimize the number of parallel operations for a load, we recommend aimin
我可以在linux中成功执行以下命令: ffmpeg -i "rtmp://42.62.95.48/live?vhost=hls/livestream timeout=2" -vcodec copy
您好,我需要从数据库中读取多个表并连接这些表。一旦表加入,我想将它们推送到 Elasticsearch。 这些表是从外部进程连接的,因为数据可以来自多个源。这不是问题,事实上我有 3 个单独的进程以平
如何根据 Kafka 消息中的消息类型使用水槽写入自定义 hdfs 目录? 说 kafka 消息:{"type": "A", "data": "blah"} 在类型字段中有 "A"应该写入 /data
我正在寻找一种方法,使 Google DataFlow 作业在(特定)异常发生时停止从 Pub/Sub 摄取。 来自 Pub/Sub 的事件是通过 PubsubIO.Read.Bound 读取的 JS
我运行了一个 docker-compose up,我在我的 golang 容器上收到一条错误消息,提示“Error establishing Mongo session”,然后容器退出。我不确定问题是
我是一名优秀的程序员,十分优秀!