- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Apache Beam 中的 Flatten 函数添加 header 。但是,似乎没有办法根据文档设置顺序:https://beam.apache.org/documentation/sdks/pydoc/2.4.0/apache_beam.transforms.core.html?highlight=flatten#apache_beam.transforms.core.Flatten 。
有时标题位于数据的末尾,而其他则位于顶部。有没有办法设置顺序?想知道我是否遗漏了一些东西。
with beam.Pipeline(options=options) as p:
header = [
('name', 'number'),
]
phones_list = [
('amy', '111-222-3333'),
('james', '222-333-4444'),
('amy', '333-444-5555'),
('carl', '444-555-6666'),
]
header = p | 'Header' >> beam.Create(header)
phones = p | 'CreatePhones' >> beam.Create(phones_list)
merged = ((phones,header)
| 'MergedPColl' >> beam.Flatten())
output = merged
output | 'Write' >> beam.io.WriteToText('./_output')
输出1:
('amy', '111-222-3333')
('james', '222-333-4444')
('amy', '333-444-5555')
('carl', '444-555-6666')
('name', 'number')
输出2:
('name', 'number')
('amy', '111-222-3333')
('james', '222-333-4444')
('amy', '333-444-5555')
('carl', '444-555-6666')
最佳答案
Flatten
是一个适用于 PCollection 的转换器。为了使合并工作并行进行,我认为他们不能保证顺序被保留;这与生成的 PCollection 的无序性质是一致的。
但是,如果您的唯一目的是在顶部添加 header ,则可以使用 textio.WriteToText()
的 header
参数。 .
> header (str): String to write at beginning of file as a header. If not :data:
None
and append_trailing_newlines is set, `\n`` will be added.
phones | 'Write' >> beam.io.WriteToText(
# Feel free to make your own header format.
'./_output', header="('name', 'number')")
更一般地,为了保留原始输入的序列,我会使用序列号来扩充输入数据。在beam的并行变换(携带每个元素的序列号)之后,您始终可以通过对该序列号进行排序作为后处理步骤(在非并行模式下)来“恢复”原始顺序。
关于python - 按元组顺序展平 PCollection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50063091/
在我进行一些处理和按键分组后,我得到了一个如下所示的数据集。我现在需要对每一行数据进行一些处理以获得下面的输出。我试过 flatmap 它真的很慢,因为“值”列表的长度可以任意长。我想我可以将每一行拆
我有一个无界集合,它是从 PubsubIO 中读取数据的,名为 Trade格式如 { timestamp: 123, type: "", side: "" // sell or buy
我对 Google Cloud Platform 还很陌生,这是我第一次在研究生类(class)的项目中尝试使用 Google Dataflow。我想要做的是编写一个自动加载作业,从我的云存储上的某个
没有关于如何将 pCollections 转换为输入到 .CoGroupByKey() 所需的 pCollections 的文档 语境 基本上我有两个大的 pCollections,我需要能够找到两者
我正在尝试实现一个数据管道,它连接来自 Kafka 主题的多个无限源。我能够连接到主题并获取数据为 PCollection我需要将其转换为 PCollection 。我将逗号分隔的字符串拆分为数组,并
我有一个用例,我需要从 DoFn 输出多个 T。所以 DoFn 函数返回一个 PCollection> 。我想将它转换为 PCollection 以便稍后在管道中我可以像这样过滤: PCollecti
我有两个 P-Collection,如下所示 P1 = ['H','E','L','L','O','W','O','R','L','D'] P2 = ['W','E','L','C','O','M',
Beam 中是否有可能有一个 PCollection 来“保存”同一父类(super class)的不同对象,使得 PCollection result = input.apply(ParDo.of
我有两个 bigquery 表。 表A c_id count_c_id p_id 表B id c_name p_type c_id 根据表 A 中的列,我需要使用 DF 管道从表 B
假设我有一个类型为 KV 的有界 PCollection p 。假设 p 无法放入内存,因此不能成为 DoFn 的侧面输入. 示例p: ("foo", 0) ("bar", 1) ("baz", 2)
我正在尝试使用 Apache Beam 中的 Flatten 函数添加 header 。但是,似乎没有办法根据文档设置顺序:https://beam.apache.org/documentation/
我想将时间戳设置为无限的字符串集合 在我的解决方案中,pcollection 的每一行都是一行 csv 这一行的一个字段中有一个时间戳和其他字段,例如点击次数等。 我想根据它自己的时间戳(事件时间)而
我想要 Pcollection 中的列表值。 PCollection> lst = bqT2.apply(ParDo.of(new UserId())); // line 1 List myL
我有一个用 Dataflow 编写的批处理管道。我想实现以下数据连接。 我有两个 PCollection。首先是代表 session : class Session{ String id
我的管道的这一部分应该接受输入,对其应用适当的元组标记,然后根据它接收到的标记对输入进行进一步处理。 当运行下面的代码时,来自主标签 (tag1) 的 PCollection 工作正常。但是,附加标签
我正在使用 Apache-Beam 和 Python SDK。 目前,我的管道读取多个文件,解析它们并从其数据生成 pandas 数据帧。然后,它将它们分组到一个单个数据帧中。 我现在想要的是检索这个
我正在尝试根据交易数据生成简单的客户摘要。例如,给定目标交易类型,发生了多少笔交易以及总金额是多少? 原始输入示例: custid desc amount 111 coffee
使用 CoGroupsByKey 方法加入两个 Pcollection 需要数小时才能执行超过 8 百万条记录。从另一个 stackoverflow 帖子中注意到 CoGbkResult has mo
我正在使用 Apache Beam 读取数据流并写入 BigQuery,在 tableA 中.我的行存储在 类型的数据集中. 目前我正在按原样读取行并写入表。但是我想根据 timestamp 过滤行
我有一个 PCollection,并且希望在验证条件时应用自定义 PTransform。(该条件不依赖于Pcollection内容) 示例:我有日志,如果 PipelineOptions 中提供了日期
我是一名优秀的程序员,十分优秀!