- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
使用 CoGroupsByKey 方法加入两个 Pcollection 需要数小时才能执行超过 8 百万条记录。从另一个 stackoverflow 帖子中注意到 CoGbkResult has more than 10000 elements,reiteration (which may be slow) is required “CoGbkResult 有超过 10000 个元素,需要重复(这可能很慢)。”
使用此方法改进此性能的任何建议。
这是代码片段,
PCollection<TableRow> pc1 = ...;
PCollection<TableRow> pc2 = ...;
WithKeys<String, TableRow> withKeyValue =
WithKeys.of((TableRow row) -> String.format("%s",row.get("KEYNAME")))
.withKeyType(TypeDescriptors.strings());
PCollection<KV<String,TableRow>> keyed_pc1 =
pc1.apply("WithKeys", withKeyValue );
PCollection<KV<String,TableRow>> keyed_pc2 =
pc2.apply("WithKeys", withKeyValue );
// (org.apache.beam.sdk.extensions.joinlibrary.Join class)
PCollection<KV<String,KV<TableRow,TableRow>>> joinedCollection =
Join.innerJoin(keyed_pc1, keyed_pc2);
最佳答案
Apache Beam 规范没有定义连接的执行,除了 SDK 之外,没有更快的自己编写内部连接的方法。因此,这个问题的答案取决于执行连接的是什么,即哪个运行者。我不知道 Flink 或 Spark 运行器,所以这个答案将特定于 Dataflow 运行器。
如果您还没有,请查看关于此 topic 的博文.在博客文章中,它描述了可以手动启用的 Dataflow Shuffle 服务。此服务是比当前默认服务更好的实现,通常执行速度更快,尤其是对于联接。
要启用 Dataflow Shuffle 服务,请传入以下 flags :
--experiments=shuffle_mode=service
--region=<allowed region>
允许随机播放的区域是:“us-central1”、“europe-west1”、“europe-west4”、“asia-northeast1”。
关于java - 加入两个大量的 PCollection 有性能问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56950728/
在我进行一些处理和按键分组后,我得到了一个如下所示的数据集。我现在需要对每一行数据进行一些处理以获得下面的输出。我试过 flatmap 它真的很慢,因为“值”列表的长度可以任意长。我想我可以将每一行拆
我有一个无界集合,它是从 PubsubIO 中读取数据的,名为 Trade格式如 { timestamp: 123, type: "", side: "" // sell or buy
我对 Google Cloud Platform 还很陌生,这是我第一次在研究生类(class)的项目中尝试使用 Google Dataflow。我想要做的是编写一个自动加载作业,从我的云存储上的某个
没有关于如何将 pCollections 转换为输入到 .CoGroupByKey() 所需的 pCollections 的文档 语境 基本上我有两个大的 pCollections,我需要能够找到两者
我正在尝试实现一个数据管道,它连接来自 Kafka 主题的多个无限源。我能够连接到主题并获取数据为 PCollection我需要将其转换为 PCollection 。我将逗号分隔的字符串拆分为数组,并
我有一个用例,我需要从 DoFn 输出多个 T。所以 DoFn 函数返回一个 PCollection> 。我想将它转换为 PCollection 以便稍后在管道中我可以像这样过滤: PCollecti
我有两个 P-Collection,如下所示 P1 = ['H','E','L','L','O','W','O','R','L','D'] P2 = ['W','E','L','C','O','M',
Beam 中是否有可能有一个 PCollection 来“保存”同一父类(super class)的不同对象,使得 PCollection result = input.apply(ParDo.of
我有两个 bigquery 表。 表A c_id count_c_id p_id 表B id c_name p_type c_id 根据表 A 中的列,我需要使用 DF 管道从表 B
假设我有一个类型为 KV 的有界 PCollection p 。假设 p 无法放入内存,因此不能成为 DoFn 的侧面输入. 示例p: ("foo", 0) ("bar", 1) ("baz", 2)
我正在尝试使用 Apache Beam 中的 Flatten 函数添加 header 。但是,似乎没有办法根据文档设置顺序:https://beam.apache.org/documentation/
我想将时间戳设置为无限的字符串集合 在我的解决方案中,pcollection 的每一行都是一行 csv 这一行的一个字段中有一个时间戳和其他字段,例如点击次数等。 我想根据它自己的时间戳(事件时间)而
我想要 Pcollection 中的列表值。 PCollection> lst = bqT2.apply(ParDo.of(new UserId())); // line 1 List myL
我有一个用 Dataflow 编写的批处理管道。我想实现以下数据连接。 我有两个 PCollection。首先是代表 session : class Session{ String id
我的管道的这一部分应该接受输入,对其应用适当的元组标记,然后根据它接收到的标记对输入进行进一步处理。 当运行下面的代码时,来自主标签 (tag1) 的 PCollection 工作正常。但是,附加标签
我正在使用 Apache-Beam 和 Python SDK。 目前,我的管道读取多个文件,解析它们并从其数据生成 pandas 数据帧。然后,它将它们分组到一个单个数据帧中。 我现在想要的是检索这个
我正在尝试根据交易数据生成简单的客户摘要。例如,给定目标交易类型,发生了多少笔交易以及总金额是多少? 原始输入示例: custid desc amount 111 coffee
使用 CoGroupsByKey 方法加入两个 Pcollection 需要数小时才能执行超过 8 百万条记录。从另一个 stackoverflow 帖子中注意到 CoGbkResult has mo
我正在使用 Apache Beam 读取数据流并写入 BigQuery,在 tableA 中.我的行存储在 类型的数据集中. 目前我正在按原样读取行并写入表。但是我想根据 timestamp 过滤行
我有一个 PCollection,并且希望在验证条件时应用自定义 PTransform。(该条件不依赖于Pcollection内容) 示例:我有日志,如果 PipelineOptions 中提供了日期
我是一名优秀的程序员,十分优秀!