gpt4 book ai didi

Google-cloud-dataflow:为什么管道使用 DirectRunner 运行两次?

转载 作者:行者123 更新时间:2023-12-01 16:23:41 24 4
gpt4 key购买 nike

鉴于数据集如下

{"slot":"reward","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42544}
{"slot":"reward_dlg","result":1,"rank":1,"isLandscape":false,"p_type":"main","level":1276,"type":"ba","seqNum":42545}

我尝试通过 type:ba 过滤那些 json 数据并使用 python sdk 将它们插入 bigquery
ba_schema = 'slot:STRING,result:INTEGER,play_type:STRING,level:INTEGER'

class ParseJsonDoFn(beam.DoFn):
B_TYPE = 'tag_B'
def process(self, element):
text_line = element.trip()
data = json.loads(text_line)

if data['type'] == 'ba':
ba = {'slot': data['slot'], 'result': data['result'], 'p_type': data['p_type'], 'level': data['level']}
yield pvalue.TaggedOutput(self.B_TYPE, ba)

def run():
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='data/path/data',
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DirectRunner',
'--project=project-id',
'--job_name=data-job',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
lines = p | ReadFromText(known_args.input)

multiple_lines = (
lines
| 'ParseJSON' >> (beam.ParDo(ParseJsonDoFn()).with_outputs(
ParseJsonDoFn.B_TYPE)))

ba_line = multiple_lines.tag_B

(ba_line
| "output_ba" >> beam.io.Write(
beam.io.BigQuerySink(
table = 'ba',
dataset = 'temp',
project = 'project-id',
schema = ba_schema,
# write_disposition = beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED
)
))

p.run().wait_until_finish()

输出是
/usr/local/lib/python2.7/site-packages/apache_beam/runners/direct/direct_runner.py:342: DeprecationWarning: options is deprecated since First stable release.. References to <pipeline>.options will not be supported
pipeline.replace_all(_get_transform_overrides(pipeline.options))
INFO:root:Running pipeline with DirectRunner.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Writing 2 rows to project-id:temp.ba table.
INFO:root:Running pipeline with DirectRunner.
INFO:oauth2client.transport:Attempting refresh to obtain initial access_token
INFO:oauth2client.client:Refreshing access_token
INFO:root:Writing 2 rows to project-id:temp.ba table.

我们注意到有两行 INFO:root:Writing 2 rows to project-id:temp.ba table. , 并在 bigquery 中查询此表
select * from `temp.ba`;

该表中有4条重复数据记录。

我想知道为什么管道运行同一个作业两次?

最佳答案

with Pipeline 的声明运行管道。具体来说:

with beam.Pipeline(...) as p:
[...code...]

相当于:
p = beam.Pipeline(...)
[...code...]
p.run().wait_until_finish()

the implementation .

关于Google-cloud-dataflow:为什么管道使用 DirectRunner 运行两次?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52270674/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com