gpt4 book ai didi

python - 数据流批处理作业不缩放

转载 作者:行者123 更新时间:2023-12-04 01:15:35 27 4
gpt4 key购买 nike

尽管 Dataflow 将目标工作人员设置为 1000,但我的 Dataflow 作业(工作 ID:2020-08-18_07_55_15-14428306650890914471)并未扩展超过 1 个工作人员。

作业配置为查询 Google Patents BigQuery 数据集,使用 ParDo 自定义函数和 transformers (huggingface) 库标记文本,序列化结果,并将所有内容写入一个巨大的 Parquet 文件。

我曾假设(在昨天运行作业后,它映射了一个函数而不是使用 beam.DoFn 类)问题是一些非并行对象消除了缩放;因此,将标记化过程重构为一个类。

这是脚本,使用以下命令从命令行运行:

python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz

脚本:

    import os
import re
import argparse

import google.auth
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.runners import DataflowRunner


from apache_beam.io.gcp.internal.clients import bigquery
import pyarrow as pa
import pickle
from transformers import AutoTokenizer


print('Defining TokDoFn')
class TokDoFn(beam.DoFn):
def __init__(self, tok_version, block_size=200):
self.tok = AutoTokenizer.from_pretrained(tok_version)
self.block_size = block_size

def process(self, x):
txt = x['abs_text'] + ' ' + x['desc_text'] + ' ' + x['claims_text']
enc = self.tok.encode(txt)

for idx, token in enumerate(enc):
chunk = enc[idx:idx + self.block_size]
serialized = pickle.dumps(chunk)
yield serialized


def run(argv=None, save_main_session=True):
query_big = '''
with data as (
SELECT
(select text from unnest(abstract_localized) limit 1) abs_text,
(select text from unnest(description_localized) limit 1) desc_text,
(select text from unnest(claims_localized) limit 1) claims_text,
publication_date,
filing_date,
grant_date,
application_kind,
ipc
FROM `patents-public-data.patents.publications`
)

select *
FROM data
WHERE
abs_text is not null
AND desc_text is not null
AND claims_text is not null
AND ipc is not null
'''

query_sample = '''
SELECT *
FROM `client_name.patent_data.patent_samples`
LIMIT 2;
'''

print('Start Run()')
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)

'''
Configure Options
'''
# Setting up the Apache Beam pipeline options.
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = save_main_session

# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'


# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = 'gs://client_name/dataset_cleaned_pq_classTok'
# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = f'{dataflow_gcs_location}/staging'

# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = f'{dataflow_gcs_location}/temp'

# The directory to store the output files of the job.
output_gcs_location = f'{dataflow_gcs_location}/output'

print('Options configured per GCP Notebook Examples')
print('Configuring BQ Table Schema for Beam')


#Write Schema (to PQ):
schema = pa.schema([
('block', pa.binary())
])

print('Starting pipeline...')
with beam.Pipeline(runner=DataflowRunner(), options=options) as p:
res = (p
| 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_big, use_standard_sql=True))
| beam.ParDo(TokDoFn(tok_version='gpt2', block_size=200))
| beam.Map(lambda x: {'block': x})
| beam.io.WriteToParquet(os.path.join(output_gcs_location, f'pq_out'),
schema,
record_batch_size=1000)
)
print('Pipeline built. Running...')

if __name__ == '__main__':
import logging
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR)
run()

最佳答案

解决方案有两个:

当我运行我的工作时,超出了以下配额,全部在“Compute Engine API”下(在此处查看您的配额:https://console.cloud.google.com/iam-admin/quotas):

  • CPU(我要求增加到 50 个)
  • 永久性磁盘标准 (GB)(我请求增加到 12,500)
  • In_Use_IP_Address(我要求增加到 50)

注意:如果您在作业运行时读取控制台输出,任何超过的配额都应作为信息行打印出来。

按照 Peter Kim 的上述建议,我将标志 --max_num_workers 作为命令的一部分传递:

python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz --max_num_workers 22

然后我开始缩放!

总而言之,如果有一种方法可以在达到配额时通过 Dataflow 控制台提示用户,并提供一种简单的方法来请求增加配额(和推荐的补充配额),以及关于应要求增加的金额的建议。

关于python - 数据流批处理作业不缩放,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63471735/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com