- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试从 Google Cloud Storage (GCS) 读取大约 90 个 gzipped JSON 日志文件,每个大约 2GB(未压缩 10GB),解析它们,然后通过 Google 将它们写入日期分区表到 BigQuery(BQ)云数据流 (GCDF)。
每个文件包含 7 天的数据,整个日期范围约为 2 年(730 天,并且还在增加)。我当前的管道如下所示:
p.apply("Read logfile", TextIO.Read.from(bucket))
.apply("Repartition", Repartition.of())
.apply("Parse JSON", ParDo.of(new JacksonDeserializer()))
.apply("Extract and attach timestamp", ParDo.of(new ExtractTimestamps()))
.apply("Format output to TableRow", ParDo.of(new TableRowConverter()))
.apply("Window into partitions", Window.into(new TablePartWindowFun()))
.apply("Write to BigQuery", BigQueryIO.Write
.to(new DayPartitionFunc("someproject:somedataset", tableName))
.withSchema(TableRowConverter.getSchema())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
Repartition 是我在尝试制作管道时内置的东西 reshuffle after decompressing ,我尝试过使用和不使用它来运行管道。按照建议通过 Jackon ObjectMapper 和相应的类解析 JSON here . TablePartWindowFun 取自 here , 它用于为 PCollection 中的每个条目分配一个分区。
管道适用于较小的文件而不是太多文件,但对于我的真实数据集会中断。我选择了足够大的机器类型并尝试设置最大工作人员数量,以及使用自动缩放至最多 100 台 n1-highmem-16 机器。我已经尝试过流式和批处理模式以及每个工作人员 250 到 1200 GB 的 disSizeGb 值。
目前我能想到的可能的解决方案是:
选项 2 在我看来像是“围绕”框架进行编程,还有其他解决方案吗?
附录:
使用 Repartition after Reading 以批处理模式处理 gzip JSON 文件,最多 100 个工作人员(类型为 n1-highmem-4),管道运行大约一个小时,有 12 个工作人员,并完成读取以及 Repartition 的第一阶段。然后它扩展到 100 个 worker 并处理重新分区的 PCollection。完成后图形如下所示:
有趣的是,到了这个阶段,先是处理到150万个element/s,然后进度降到0。图中GroupByKey步骤OutputCollection的size从3亿左右先升后降到 0(总共有大约 18 亿个元素)。像是在丢弃什么。此外,ExpandIterable
和 ParDo(Streaming Write)
run-time 最后为 0。图片显示它在“向后”运行之前略微显示。在 worker 的日志中,我看到一些 exception thrown while executing request
来自 com.google.api.client.http.HttpTransport
的消息记录器,但我无法在 Stackdriver 中找到更多信息。
读取后没有重新分区 管道失败使用 n1-highmem-2
在完全相同的步骤(GroupByKey
之后的所有内容)出现内存不足错误的实例 - 使用更大的实例类型会导致异常,如
java.util.concurrent.ExecutionException: java.io.IOException:
CANCELLED: Received RST_STREAM with error code 8 dataflow-...-harness-5l3s
talking to frontendpipeline-..-harness-pc98:12346
最佳答案
感谢 Google Cloud Dataflow 团队的 Dan 和他提供的示例 here ,我能够解决这个问题。我所做的唯一更改:
在 175 =(25 周)大块的日子里循环,一个接一个地运行管道,以免系统不堪重负。在循环中,确保重新处理上一次迭代的最后一个文件,并以与基础数据相同的速度(175 天)向前移动 startDate
。当使用 WriteDisposition.WRITE_TRUNCATE
时, block 末尾的不完整日期将以这种方式被正确的完整数据覆盖。
使用上面提到的 Repartition/Reshuffle 转换,在读取 gzip 文件后,加快进程并允许更平滑的自动缩放
使用 DateTime 而不是 Instant 类型,因为我的数据不是 UTC
随着 Apache Beam 2.0 的发布,解决方案变得更加容易。现在支持对 BigQuery 输出表进行分片 out of the box .
关于google-cloud-dataflow - 通过 Dataflow 从 Google Cloud Storage 读取大型 gzip JSON 文件到 BigQuery,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42246396/
我的问题是,在幕后,对于元素级 Beam DoFn (ParDo),Cloud Dataflow 的并行工作负载如何?例如,在我的 ParDO 中,我向外部服务器发送一个 http 请求以获取一个元素
就 Google Cloud 上 Dataflow 的 HA 而言,最佳架构是什么?我的工作负载在两个区域运行。数据流从一个多区域存储桶中读取并将结果写出到另一个多区域存储桶中。 为了实现高可用性(以
如图 here数据流管道由固定的 DAG 表示。我想知道是否有可能实现一个管道,在该管道中处理继续进行,直到基于到目前为止计算的数据满足动态评估的条件。 这是一些伪代码来说明我想要实现的内容:
在旧的定价页面上,他们提到 Cloud Dataflow 工作人员使用的所有 Google Compute 实例都是根据持续使用价格规则计费的,但新的定价页面不再提及。 我假设由于它在内部使用相同的
批处理 Dataflow 作业处理完所有数据后是否可以执行操作?具体来说,我想将管道刚刚处理的文本文件移动到不同的 GCS 存储桶。我不确定将它放在我的管道中的哪个位置以确保它在数据处理完成后执行一次
我希望能够通过自定义键使用分组,但这是我目前的尝试, 我们为 KV 对象的键使用自定义类,因为我们希望 GroupBy 具有更复杂的条件,而不是使用 String 等进行简单的键匹配。 ```
当尝试在 Dataflow 服务上运行管道时,我在命令行上指定了暂存和临时存储桶(在 GCS 中)。当程序执行时,我在管道运行之前收到一个 RuntimeException,根本原因是我在路径中遗漏了
我试图找到一种优雅地结束我的工作的方法,以免丢失任何数据,从 PubSub 流式传输并写入 BigQuery。 我可以设想的一种可能方法是让作业停止提取新数据,然后运行直到它处理完所有内容,但我不知道
问题: 使用 Cloud Dataflow 时,我们会看到 2 个指标(请参阅 this page): 系统延迟 数据新鲜度 这些在 Stackdriver 中也可用以下名称(摘自 here): sy
我一直在阅读 Dataflow SDK 文档,试图找出当数据到达流作业中的水印时会发生什么。 这一页: https://cloud.google.com/dataflow/model/windowin
有没有办法(或任何类型的黑客)从压缩文件中读取输入数据? 我的输入包含数百个文件,这些文件是用 gzip 压缩生成的,解压缩它们有些乏味。 最佳答案 Dataflow 现在支持从压缩文本源中读取(从
我正在尝试在 Dataflow 中执行联合操作。是否有用于在 Dataflow 中合并两个 PCollections 的示例代码? 最佳答案 一个简单的方法是像这样将 Flatten() 与 Remo
在我的管道上运行“更新”后,我注意到有新创建的永久磁盘在 10 多分钟后未附加到任何实例。 最佳答案 这是 Dataflow 服务的一个持续已知问题,会在管道更新过程中导致孤立磁盘。可以安全地删除这些
是否可以为 Dataflow 工作人员提供自定义包? 我想从计算内部输出到 Debian 打包的二进制文件。 编辑:需要明确的是,包配置非常复杂,仅将文件捆绑在 --filesToStage 中是不可
我想使用 Google Cloud Dataflow 创建 session 窗口,如 dataflow model paper 中所述。 .我想将我的未绑定(bind)数据发送到 Pub/Sub,然后
我正在尝试运行从 pubsub 主题读取并写入 bigquery 的管道。时间戳是从主题消息中解析出来的。但是,我收到了一条关于允许的时间戳偏差的错误,并引用了下面复制的文档。 getAllowedT
我有一个大型数据文件 (1 TB) 的数据要导入 BigQuery。每行包含一个键。在导入数据并创建我的 PCollection 以导出到 BigQuery 时,我想确保我不会基于此键值导入重复记录。
我正在通过 Python API 在 Dataflow 上使用 Apache Beam 从 Bigquery 读取数据,对其进行处理,然后将其转储到 Datastore 接收器中。 不幸的是,作业经常
我一直在研究使用 spring-cloud-dataflow 中的 spring-cloud-task 构建的项目。查看示例项目和文档后,似乎表明任务是通过仪表板或 shell 手动启动的。 spri
我有以下场景: 管道 A 在 BigQuery 中查找表 A,进行一些计算并返回列名列表。 此列名称列表用作管道 B 输出的 BigQuery 架构。 您能否让我知道实现这一目标的最佳选择是什么? 管
我是一名优秀的程序员,十分优秀!