- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试根据交易数据生成简单的客户摘要。例如,给定目标交易类型,发生了多少笔交易以及总金额是多少?
原始输入示例:
custid desc amount
111 coffee 3.50
111 grocery 23.00
333 coffee 4.00
222 gas station 32.00
222 gas station 55.50
333 coffee 3.00
所需输出的示例:
custid nbr_coffee amt_coffee nbr_gas_station amt_gas_station
111 1 3.50 0 0.00
222 0 0 2 87.50
333 2 7.00 0 0
我的目标运行器将是 Dataflow(但目前使用 DirectRunner 进行测试)。
这是我所拥有的代码片段:
def categorize_coffee(transaction):
if transaction['trx_desc'] == 'coffee':
transaction['coffee'] = True
else:
transaction['coffee'] = False
return transaction
def categorize_gas_station(transaction):
if transaction['trx_desc'] == 'gas station':
transaction['gas_station'] = True
else:
transaction['gas_station'] = False
return transaction
def summarize_coffee(grouping):
key, values = grouping
values = list(values)
nbr = 0
amt = 0
for d in values:
if d['coffee'] == True:
nbr+=1
amt+=d['amount']
ret_val = {}
ret_val['cust'] = d['cust']
ret_val['nbr_coffee'] = nbr
ret_val['amt_coffee'] = amt
return ret_val
def summarize_gas_station(grouping):
key, values = grouping
values = list(values)
nbr = 0
amt = 0
for d in values:
if d['gas_station'] == True:
nbr += 1
amt += d['amount']
ret_val = {}
ret_val['cust'] = d['cust']
ret_val['nbr_gas_station'] = nbr
ret_val['amt_gas_station'] = amt
return ret_val
def create_dict(row):
vars = row.split(',')
return {'cust': vars[0], 'trx_desc': str(vars[1]), 'amount': float(vars[2])}
with beam.Pipeline(options=pipeline_options) as p:
categorized_trx = (
p | 'get data' >> beam.io.ReadFromText('./test.csv')
| beam.Map(create_dict)
| beam.Map(categorize_coffee)
| beam.Map(categorize_gas_station)
| beam.Map(lambda trx: (trx['cust'], trx))
| beam.GroupByKey()
)
coffee_trx = (categorized_trx | beam.Map(summarize_coffee))
gas_station_trx = (categorized_trx | beam.Map(summarize_gas_station))
result = (coffee_trx, gas_station_trx) | beam.Flatten()
现在的实际结果是:
{'amt_coffee': 7.0, 'cust': u'333', 'nbr_coffee': 2}
{'amt_coffee': 0, 'cust': u'222', 'nbr_coffee': 0}
{'amt_coffee': 3.5, 'cust': u'111', 'nbr_coffee': 1}
{'nbr_gas_station': 0, 'cust': u'333', 'amt_gas_station': 0}
{'nbr_gas_station': 2, 'cust': u'222', 'amt_gas_station': 87.5}
{'nbr_gas_station': 0, 'cust': u'111', 'amt_gas_station': 0}
没有像我预期的那样扁平化或连接。我是 Beam 新手 - 不确定我是否理解如何正确解决这个问题,因此希望获得一些见解。
最佳答案
这应该有效:
...
def summarize_coffee(grouping):
...
return (d['cust'], ret_val)
def summarize_gas_station(grouping):
...
return (d['cust'], ret_val)
...
def processJoin(row):
(customer, data) = row
coffee_trx=data['coffee_trx']
gas_station_trx=data['gas_station_trx']
return (customer, coffee_trx, gas_station_trx)
result = ({coffee_trx: coffee_trx, gas_station_trx: gas_station_trx}
| 'Group' >> beam.CoGroupByKey()
| 'Reshape' >> beam.Map(processJoin)
| 'Unwind' >> beam.FlatMap(lambda x: x)
)
关于python - GroupByKey 之后减少 PCollection,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55486163/
在我进行一些处理和按键分组后,我得到了一个如下所示的数据集。我现在需要对每一行数据进行一些处理以获得下面的输出。我试过 flatmap 它真的很慢,因为“值”列表的长度可以任意长。我想我可以将每一行拆
我有一个无界集合,它是从 PubsubIO 中读取数据的,名为 Trade格式如 { timestamp: 123, type: "", side: "" // sell or buy
我对 Google Cloud Platform 还很陌生,这是我第一次在研究生类(class)的项目中尝试使用 Google Dataflow。我想要做的是编写一个自动加载作业,从我的云存储上的某个
没有关于如何将 pCollections 转换为输入到 .CoGroupByKey() 所需的 pCollections 的文档 语境 基本上我有两个大的 pCollections,我需要能够找到两者
我正在尝试实现一个数据管道,它连接来自 Kafka 主题的多个无限源。我能够连接到主题并获取数据为 PCollection我需要将其转换为 PCollection 。我将逗号分隔的字符串拆分为数组,并
我有一个用例,我需要从 DoFn 输出多个 T。所以 DoFn 函数返回一个 PCollection> 。我想将它转换为 PCollection 以便稍后在管道中我可以像这样过滤: PCollecti
我有两个 P-Collection,如下所示 P1 = ['H','E','L','L','O','W','O','R','L','D'] P2 = ['W','E','L','C','O','M',
Beam 中是否有可能有一个 PCollection 来“保存”同一父类(super class)的不同对象,使得 PCollection result = input.apply(ParDo.of
我有两个 bigquery 表。 表A c_id count_c_id p_id 表B id c_name p_type c_id 根据表 A 中的列,我需要使用 DF 管道从表 B
假设我有一个类型为 KV 的有界 PCollection p 。假设 p 无法放入内存,因此不能成为 DoFn 的侧面输入. 示例p: ("foo", 0) ("bar", 1) ("baz", 2)
我正在尝试使用 Apache Beam 中的 Flatten 函数添加 header 。但是,似乎没有办法根据文档设置顺序:https://beam.apache.org/documentation/
我想将时间戳设置为无限的字符串集合 在我的解决方案中,pcollection 的每一行都是一行 csv 这一行的一个字段中有一个时间戳和其他字段,例如点击次数等。 我想根据它自己的时间戳(事件时间)而
我想要 Pcollection 中的列表值。 PCollection> lst = bqT2.apply(ParDo.of(new UserId())); // line 1 List myL
我有一个用 Dataflow 编写的批处理管道。我想实现以下数据连接。 我有两个 PCollection。首先是代表 session : class Session{ String id
我的管道的这一部分应该接受输入,对其应用适当的元组标记,然后根据它接收到的标记对输入进行进一步处理。 当运行下面的代码时,来自主标签 (tag1) 的 PCollection 工作正常。但是,附加标签
我正在使用 Apache-Beam 和 Python SDK。 目前,我的管道读取多个文件,解析它们并从其数据生成 pandas 数据帧。然后,它将它们分组到一个单个数据帧中。 我现在想要的是检索这个
我正在尝试根据交易数据生成简单的客户摘要。例如,给定目标交易类型,发生了多少笔交易以及总金额是多少? 原始输入示例: custid desc amount 111 coffee
使用 CoGroupsByKey 方法加入两个 Pcollection 需要数小时才能执行超过 8 百万条记录。从另一个 stackoverflow 帖子中注意到 CoGbkResult has mo
我正在使用 Apache Beam 读取数据流并写入 BigQuery,在 tableA 中.我的行存储在 类型的数据集中. 目前我正在按原样读取行并写入表。但是我想根据 timestamp 过滤行
我有一个 PCollection,并且希望在验证条件时应用自定义 PTransform。(该条件不依赖于Pcollection内容) 示例:我有日志,如果 PipelineOptions 中提供了日期
我是一名优秀的程序员,十分优秀!