gpt4 book ai didi

python-2.7 - 在 Google Cloud Dataflow 中访问模板化运行时参数 - Python

转载 作者:行者123 更新时间:2023-12-05 04:05:25 46 4
gpt4 key购买 nike

我正在尝试为 Google Cloud Dataflow 创建自己的模板,以便可以从 GUI 执行作业,让其他人更容易执行。我按照教程创建了自己的 PipelineOptions 类,并使用 parser.add_value_provider_argument() 方法填充它。然后,当我尝试使用 my_options.argname.get() 将这些参数传递到管道中时,我收到一个错误,告诉我该项目不是从运行时上下文中调用的。我不明白这一点。 args 不是定义管道图本身的一部分,它们只是参数,例如输入文件名、输出表名等。

下面是代码。如果我对输入文件名、输出表名、写入处置和分隔符进行硬编码,它就会起作用。如果我将它们替换为等效的 my_options.argname.get() ,它将失败。在显示的片段中,我对除 outputBQTable 名称之外的所有内容进行了硬编码,我在其中使用了 my_options.outputBQTable.get()。这失败了,并显示以下消息。

apache_beam.error.RuntimeValueProviderError: RuntimeValueProvider(option: outputBQTable, type: str, default_value: 'dataflow_csv_reader_testing.names').get() 未从运行时上下文调用

我很感激任何关于如何让它发挥作用的指导。

import apache_beam
from apache_beam.io.gcp.gcsio import GcsIO
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.value_provider import RuntimeValueProvider
import csv
import argparse

class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls,parser):
parser.add_value_provider_argument('--inputGCS', type=str,
default='gs://mybucket/df-python-csv-test/test-dict.csv',
help='Input gcs csv file, full path and filename')
parser.add_value_provider_argument('--delimiter', type=str,
default=',',
help='Character used as delimiter in csv file, default is ,')
parser.add_value_provider_argument('--outputBQTable', type=str,
default='dataflow_csv_reader_testing.names',
help='Output BQ Dataset.Table to write to')
parser.add_value_provider_argument('--writeDisposition', type=str,
default='WRITE_APPEND',
help='BQ write disposition, WRITE_TRUNCATE or WRITE_APPEND or WRITE_EMPTY')

def main():
optlist=PipelineOptions()
my_options=optlist.view_as(MyOptions)
p = apache_beam.Pipeline(options=optlist)
(p
| 'create' >> apache_beam.Create(['gs://mybucket/df-python-csv-test/test-dict.csv'])
| 'read gcs csv dict' >> apache_beam.FlatMap(lambda file: csv.DictReader(apache_beam.io.gcp.gcsio.GcsIO().open(file,'r'), delimiter='|'))
| 'write bq record' >> apache_beam.io.Write(apache_beam.io.BigQuerySink(my_options.outputBQTable.get(), write_disposition='WRITE_TRUNCATE'))
)
p.run()

if __name__ == '__main__':
main()

最佳答案

指定管道时不能使用my_options.outputBQTable.get()。 BigQuery 接收器已经知道如何使用运行时提供的参数,所以我认为您可以传递 my_options.outputBQTable

根据我从文档中收集到的内容,您应该只在 DoFnprocess 方法中使用 options.runtime_argument.get()传递给 ParDo 步骤。

注意:我使用 Apache Beam SDK 的 2.8.0 进行了测试,因此我使用了 WriteToBigQuery 而不是 BigQuerySink

关于python-2.7 - 在 Google Cloud Dataflow 中访问模板化运行时参数 - Python,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51269838/

46 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com