gpt4 book ai didi

python - GAE Python 上的 Mapreduce - 导致 ReducePipeline 在最终确定时发出回调?

转载 作者:太空宇宙 更新时间:2023-11-03 18:40:13 25 4
gpt4 key购买 nike

我想在 MapReduce 作业完成/完成后执行自定义回调函数。

我找到的关于此问题的唯一有用的引用是 a somewhat outdated Google site和一个相关的,但又似乎过时的Stackoverflow question .

这两个来源都假设我使用 control.start_map 来启动 Mapreduce 作业,并依赖于 start_map 采用关键字参数 mapreduce_parameters 这一事实code> 其中可以指定一个 done_callback 参数来指定完成时应调用的 url。但是,我使用的是一种不同的方法(据我所知,是最近的首选方法),其中自定义管道的 run 方法生成 Mapreduce 管道:

yield mapreduce_pipeline.MapreducePipeline(
"word_count",
"main.word_count_map",
"main.word_count_reduce",
"mapreduce.input_readers.BlobstoreZipInputReader",
"mapreduce.output_writers.BlobstoreOutputWriter",
mapper_params={
"blob_key": blobkey,
},
reducer_params={
"mime_type": "text/plain",
},
shards=16)

MapreducePipeline 的签名不允许使用 mapreduce_parameters 参数。我唯一可以看到源代码中出现的回调引用的地方是 mapper_pipeline.MapperPipeline.run,但它似乎仅在内部使用。

那么,有没有办法获取其中的回调参数?

如果没有,是否有人对在何处以及如何扩展库以提供此类功能有好的想法?

最佳答案

我将 Mapreduce 管道范例设置为如下所示:

class MRRecalculateSupportsPipeline(base_handler.PipelineBase):

def run(self, user_key):
# ...
yield mapreduce_pipeline.MapreducePipeline('user_recalculate_supports',
'myapp.mapreduces.user_recalculate_supports_map',
'myapp.mapreduces.user_recalculate_supports_reduce',
'mapreduce.input_readers.DatastoreInputReader', output_writer_spec=None,
mapper_params={"""..."""})

如果您想捕获此管道的完成情况,您有两种选择。

A) 使用 pipeline.After 在 MR 管道完成后运行完成管道。

        pipe_future = yield mapreduce_pipeline.MapreducePipeline('user_recalculate_supports',
'myapp.mapreduces.user_recalculate_supports_map',
'myapp.mapreduces.user_recalculate_supports_reduce',
'mapreduce.input_readers.DatastoreInputReader', output_writer_spec=None,
mapper_params={"""..."""})
with pipeline.After(pipe_future):
yield CalcCompletePipeline(...) # this could be a mapreduce pipeline, or any pipeline using the same base_handler.PipelineBase parent class.

B) 使用顶级管道的finalized 方法来处理完成。就我个人而言,我会坚持使用选项 A,因为您可以在 /_ah/*/status?root= View 中跟踪路径。

class EmailNewReleasePipeline(base_handler.PipelineBase):
"""Email followers about a new release"""
# TODO: product_key is the name of the parameter, but it's built for albums ...

def run(self, product_key, testing=False):
# Send those emails ...
yield mapreduce_pipeline.MapreducePipeline(...)

def finalized(self):
"""Save product as launched"""
...
product.launched = True
product.put()

以下是 finalization of a pipeline 上的文档.

关于python - GAE Python 上的 Mapreduce - 导致 ReducePipeline 在最终确定时发出回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20688593/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com