gpt4 book ai didi

python - 使用数据流在 Google Cloud Platform 中加入两个 json

转载 作者:行者123 更新时间:2023-12-04 22:09:58 24 4
gpt4 key购买 nike

我想从两个不同的 JSON 文件中仅找出女性员工,并仅选择我们感兴趣的字段并将输出写入另一个 JSON。

此外,我正在尝试使用 Dataflow 在 Google 的云平台中实现它。有人可以提供任何可以实现以获得结果的示例Java代码。

员工 JSON

{"emp_id":"OrgEmp#1","emp_name":"Adam","emp_dept":"OrgDept#1","emp_country":"USA","emp_gender":"female","emp_birth_year":"1980","emp_salary":"$100000"}
{"emp_id":"OrgEmp#1","emp_name":"Scott","emp_dept":"OrgDept#3","emp_country":"USA","emp_gender":"male","emp_birth_year":"1985","emp_salary":"$105000"}

部门JSON
{"dept_id":"OrgDept#1","dept_name":"Account","dept_start_year":"1950"}
{"dept_id":"OrgDept#2","dept_name":"IT","dept_start_year":"1990"}
{"dept_id":"OrgDept#3","dept_name":"HR","dept_start_year":"1950"}

预期的输出 JSON 文件应该类似于
{"emp_id":"OrgEmp#1","emp_name":"Adam","dept_name":"Account","emp_salary":"$100000"}

最佳答案

您可以使用 CoGroupByKey 执行此操作(将使用 shuffle 的地方),或使用侧输入,如果您的部门集合明显较小。

我将为您提供 Python 代码,但您可以在 Java 中使用相同的管道。

通过侧面输入,您将:

  • 将您的部门 PCollection 转换为可映射的字典
    dept_id 到部门 JSON 字典。
  • 然后你拿
    员工 PCollection 作为主要输入,您可以在其中使用 dept_id
    获取部门 PCollection 中每个部门的 JSON。

  • 像这样:
    departments = (p | LoadDepts()
    | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

    deps_si = beam.pvalue.AsDict(departments)

    employees = (p | LoadEmps())

    def join_emp_dept(employee, dept_dict):
    return employee.update(dept_dict[employee['dept_id']])

    joined_dicts = employees | beam.Map(join_dicts, dept_dict=deps_si)

    CoGroupByKey ,您可以使用 dept_id 作为键对两个集合进行分组。这将导致键值对的 PCollection,其中键是 dept_id,值是部门和该部门的员工的两个可迭代对象。
    departments = (p | LoadDepts()
    | 'key_dept' >> beam.Map(lambda dept: (dept['dept_id'], dept)))

    employees = (p | LoadEmps()
    | 'key_emp' >> beam.Map(lambda emp: (emp['dept_id'], emp)))

    def join_lists((k, v)):
    itertools.product(v['employees'], v['departments'])

    joined_dicts = (
    {'employees': employees, 'departments': departments}
    | beam.CoGroupByKey()
    | beam.FlatMap(join_lists)
    | 'mergedicts' >> beam.Map(lambda (emp_dict, dept_dict): emp_dict.update(dept_dict))
    | 'filterfields'>> beam.Map(filter_fields)
    )

    关于python - 使用数据流在 Google Cloud Platform 中加入两个 json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45287530/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com