- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
尽管 Dataflow 将目标工作人员设置为 1000,但我的 Dataflow 作业(工作 ID:2020-08-18_07_55_15-14428306650890914471)并未扩展超过 1 个工作人员。
作业配置为查询 Google Patents BigQuery 数据集,使用 ParDo 自定义函数和 transformers (huggingface) 库标记文本,序列化结果,并将所有内容写入一个巨大的 Parquet 文件。
我曾假设(在昨天运行作业后,它映射了一个函数而不是使用 beam.DoFn 类)问题是一些非并行对象消除了缩放;因此,将标记化过程重构为一个类。
这是脚本,使用以下命令从命令行运行:
python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz
脚本:
import os
import re
import argparse
import google.auth
import apache_beam as beam
from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.runners import DataflowRunner
from apache_beam.io.gcp.internal.clients import bigquery
import pyarrow as pa
import pickle
from transformers import AutoTokenizer
print('Defining TokDoFn')
class TokDoFn(beam.DoFn):
def __init__(self, tok_version, block_size=200):
self.tok = AutoTokenizer.from_pretrained(tok_version)
self.block_size = block_size
def process(self, x):
txt = x['abs_text'] + ' ' + x['desc_text'] + ' ' + x['claims_text']
enc = self.tok.encode(txt)
for idx, token in enumerate(enc):
chunk = enc[idx:idx + self.block_size]
serialized = pickle.dumps(chunk)
yield serialized
def run(argv=None, save_main_session=True):
query_big = '''
with data as (
SELECT
(select text from unnest(abstract_localized) limit 1) abs_text,
(select text from unnest(description_localized) limit 1) desc_text,
(select text from unnest(claims_localized) limit 1) claims_text,
publication_date,
filing_date,
grant_date,
application_kind,
ipc
FROM `patents-public-data.patents.publications`
)
select *
FROM data
WHERE
abs_text is not null
AND desc_text is not null
AND claims_text is not null
AND ipc is not null
'''
query_sample = '''
SELECT *
FROM `client_name.patent_data.patent_samples`
LIMIT 2;
'''
print('Start Run()')
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
'''
Configure Options
'''
# Setting up the Apache Beam pipeline options.
# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
options = PipelineOptions(pipeline_args)
options.view_as(SetupOptions).save_main_session = save_main_session
# Sets the project to the default project in your current Google Cloud environment.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()
# Sets the Google Cloud Region in which Cloud Dataflow runs.
options.view_as(GoogleCloudOptions).region = 'us-central1'
# IMPORTANT! Adjust the following to choose a Cloud Storage location.
dataflow_gcs_location = 'gs://client_name/dataset_cleaned_pq_classTok'
# Dataflow Staging Location. This location is used to stage the Dataflow Pipeline and SDK binary.
options.view_as(GoogleCloudOptions).staging_location = f'{dataflow_gcs_location}/staging'
# Dataflow Temp Location. This location is used to store temporary files or intermediate results before finally outputting to the sink.
options.view_as(GoogleCloudOptions).temp_location = f'{dataflow_gcs_location}/temp'
# The directory to store the output files of the job.
output_gcs_location = f'{dataflow_gcs_location}/output'
print('Options configured per GCP Notebook Examples')
print('Configuring BQ Table Schema for Beam')
#Write Schema (to PQ):
schema = pa.schema([
('block', pa.binary())
])
print('Starting pipeline...')
with beam.Pipeline(runner=DataflowRunner(), options=options) as p:
res = (p
| 'QueryTable' >> beam.io.Read(beam.io.BigQuerySource(query=query_big, use_standard_sql=True))
| beam.ParDo(TokDoFn(tok_version='gpt2', block_size=200))
| beam.Map(lambda x: {'block': x})
| beam.io.WriteToParquet(os.path.join(output_gcs_location, f'pq_out'),
schema,
record_batch_size=1000)
)
print('Pipeline built. Running...')
if __name__ == '__main__':
import logging
logging.getLogger().setLevel(logging.INFO)
logging.getLogger("transformers.tokenization_utils_base").setLevel(logging.ERROR)
run()
最佳答案
解决方案有两个:
当我运行我的工作时,超出了以下配额,全部在“Compute Engine API”下(在此处查看您的配额:https://console.cloud.google.com/iam-admin/quotas):
注意:如果您在作业运行时读取控制台输出,任何超过的配额都应作为信息行打印出来。
按照 Peter Kim 的上述建议,我将标志 --max_num_workers 作为命令的一部分传递:
python bq_to_parquet_pipeline_w_class.py --extra_package transformers-3.0.2.tar.gz --max_num_workers 22
然后我开始缩放!
总而言之,如果有一种方法可以在达到配额时通过 Dataflow 控制台提示用户,并提供一种简单的方法来请求增加配额(和推荐的补充配额),以及关于应要求增加的金额的建议。
关于python - 数据流批处理作业不缩放,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/63471735/
我正在尝试制作一个基于文本的批处理游戏。但是我刚开始写我以前从未遇到过的问题时遇到了一个问题。 :menu :: the game menu - opens when the game starts
我正在构建一个社交媒体应用程序,用户需要发布一些内容,然后将发布的内容传播给他/她的 4 个圈子内的所有成员。这意味着查询进入循环。它就像一个家谱。逻辑工作得很好。但现在,当每个圈子中的成员数量不断增
1. DECLARE TYPE ref_cursor_type IS ref CURSOR; v_mycursor ref_cursor_type;
我想在这里做的是循环直到按下“x”。我知道 CHOICE 带有 /T 选项。 但是 CHOICE 对我要播放的动画的超时时间太长。这是一个例子: @echo off cls set frame=2 :
我已经寻找解决方案,但我仍然遇到问题。我有两个文件: File1.txt 1111 2222 3333 File2.txt 1111 2222 3333 4444 我想要一个只有差异的输
我正在做一个批处理脚本,必须检查计算机上是否安装了一些程序。为此,我执行 programName --version我将输出存储在一个变量中。问题是当我尝试与正则表达式进行比较时(只知道该程序是否存在
我知道如何从同一个批处理文件中的函数返回值,但我发现从不同的批处理文件返回值时存在一些问题。下面是一个例子: 文件 1.cmd SETLOCAL ENABLEEXTENSIONS SETLOCAL E
我相信这个问题的答案应该很简单。我从一个地方获取目录列表并将它们存储到文本文档中。然后我读取文本文档名称并将它们存储到一个数组中。在此过程结束时,我希望删除数组中的所有条目。 我想这样做的原因是因为我
我家有两个摄像头,几乎每天都在创建图像。他们将它们保存到我的FTP服务器(Fritz.Box\Nas驱动器)。 文件夹结构如下: +-2016-08-24 +-+Subfolder +----+Ano
在Windows Batch中执行此操作。我有一个名称列表,并要求用户输入其名称。 我想检查该名称是否已经存在于列表中,如果存在,则直接进入goto,否则它将名称添加到列表中。 @echo off s
我正在编写一个批处理文件,我想运行一个 for 循环,将它的第一个值设置为一个变量。我只需要命令的第一个值,但我找不到另一种方法来做到这一点。我设置它的方式是使用一个 for 循环,然后是一个 do
我需要创建一个批处理文件,使用tracert命令跟踪一些IP,并将跟踪写入txt文件。我希望它很快,所以我想为每个跟踪启动一个新命令,以使所有跟踪请求立即启动。 这是我的 ping.bat: @ech
我想在批处理文件中使用PowerShell命令发送电子邮件。为此,我实现了一个名为 sendMail 的函数。我这样称呼它: setlocal enabledelayedexpansion call:
想要使用 java 执行 selenium 脚本/批处理脚本。根据输入参数调用脚本/批处理脚本。 了解如何使用 java 代码运行脚本/批处理。 请帮帮我。 最佳答案 要运行 java 项目中文件中包
我正在练习 JDBC 批处理并遇到错误: 错误1:不支持的功能错误2:执行不能为空或为null Property files include: itemsdao.updateBookName = Up
我从 json 文件中得到了以下字符串: 39468856, 现在,我想用这些数字进行计算..因此,我必须删除末尾的 , 。 此时我使用以下代码: for /f "skip=24 tokens=2"
我有一堆 SQL 查询作为文件存储在磁盘上。 它们都是纯 SELECT 查询,换句话说,它们只做读操作。 我正在连接到 Oracle 11g 数据库,我想测量所有这些查询的大致执行时间。有没有办法以编
我正在使用 java 来存储属性文件的目录路径。 然后在 bat 文件中我使用属性作为变量。 问题出在 Java 中,文件路径存储为 SOME_VAR=D\:\\Madhan\\Program Fil
我想用“%”替换字符串中的“mod”:set string=%string:mod=x%我应该输入什么作为“x”? 最佳答案 您可以通过启用延迟扩展来做到这一点,以便您可以使用 !作为分隔符。然后,将
在我目前正在处理的批处理文件中,我遇到了一个小问题。我有一个名为 Dimensions(number from 1-5, defined in a for /l loop).txt 的文件,其中包含
我是一名优秀的程序员,十分优秀!