gpt4 book ai didi

python - 如何在spark并行处理中返回字典?

转载 作者:行者123 更新时间:2023-11-30 23:04:46 25 4
gpt4 key购买 nike

我有一个要处理的对象数组:Objects,并且我有一个函数,它接受一个字典和一个对象,并返回相同的字典,修改后:

new_dict = modify_object_dict(object_dict, object)

modify_object_dict 执行以下操作:

  • 向字典添加一个键,它是处理的对象的名称

  • 创建一个字典作为该键的值(字典中的字典),其中添加和删除了元素。

  • 例如,对象可能是一个文件:object_dict['file_name']=sub_dictionary,并且子词典可能包含 sub_dictionary['file_attribute']=attribute

modify_object_dict 填充这些子词典,如上所示,结果是一个保存子词典的词典。

请注意,子词典不会相互交互。即一个对象的字典不与另一对象的字典交互。

我希望使用 Spark 并行处理这些对象:

object_dict = {}   # dictionary is initially empty
RDD = (sc.parallelize(Objects)
.map(lambda object: modify_object_dict(object_dict, object))

这是执行此操作的正确方法吗?如果不是,返回每次调用映射函数时都会修改的字典的正确方法是什么?

最佳答案

what is the correct way to return a dictionary that is modified every time the mapping function is called?

简短的回答是没有。由于每个分区都是单独处理的,因此无法创建具有读/写访问权限的共享对象。 Spark 仅支持两种类型的共享变量:累加器和广播,分别具有只写和只读访问权限。

长答案取决于modify_object_dict内部究竟发生了什么。如果您使用的操作是关联和可交换的,并且可以在键的基础上执行(每个对象可以映射到特定键上的操作),您可以使用aggregateByKey的某种变体。还可以使用mapPartitions对数据进行分区和本地处理。

如果 modify_object_dict 不满足上述条件,那么 Spark 很可能不是一个好的选择。可以将状态推送到外部系统,但这通常没有意义,除非 Spark 用于繁重的工作,并且您推送到外部的只是最终结果。

此外,您不应该使用 map 来产生副作用。这种情况下正确的方法通常是foreach。这里还有一个更微妙的问题。无法保证 map (或 foreach)对于每个元素仅执行一次。这意味着您执行的每个操作都必须是幂等的。

编辑:

根据您的描述,您似乎可以尝试以下方法:

  • 首先让我们创建RDD一个虚拟类:

    class Foobar(object):
    def __init__(self, name, x=None, y=None, z=None):
    self.name = name
    self.x = x
    self.y = y
    self.z = z

    和对象的 RDD:

    objects = sc.parallelize([
    {"name": "foo", "x": 1}, {"name": "foo", "y": 3},
    {"name": "bar", "z": 4}
    ]).map(lambda x: Foobar(**x))
  • 接下来让我们将其转换为 PairwiseRDD,其中名称作为键,对象作为值。如果对象很大,您可以仅提取感兴趣的字段并将其用作值。我假设每个对象都有 name 属性。

    pairs = objects.map(lambda obj: (obj.name, obj))
  • groupByKey 和转换值:

    rdd = pairs.groupByKey().mapValues(lambda iter: ...)

    aggregateByKey(推荐):

    def seq_op(obj_dict, obj):
    # equivalent to modify_object_dict
    # Lets assume it is as simple as this
    obj_dict.update((k, getattr(obj, k)) for k in ("x", "y", "z"))
    return obj_dict

    def comb_op(obj_dict_1, obj_dict_2):
    # lets it is a simple union
    obj_dict_1.update(obj_dict_2)
    return obj_dict_1

    dicts = pairs.aggregateByKey({}, seq_op, comb_op)
  • 此时您有一个由 (name, dict) 对组成的 RDD。它可以用于进一步处理,或者如果您确实需要收集作为 map 的本地结构:

    dicts.collectAsMap()
    ## {'bar': {'x': None, 'y': None, 'z': 4},
    ## 'foo': {'x': None, 'y': 3, 'z': None}}

关于python - 如何在spark并行处理中返回字典?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33550805/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com