- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试使用 Dataflow 流式处理通过 Python 将记录插入 BigQuery。从 PubSub 读取存储桶中更改的文件,然后读取、转换文件并将其插入到 BigQuery 中。
但是,当管道开始处理大约 100 到 200 个元素/秒时,我会收到如下所示的错误,表明我超出了速率限制并链接到 this page .有时错误会提到 tabledata.list
配额,即 500/秒。
我完全不明白为什么我会看到有关这些配额的消息,因为 BigQuery 的流式插入配额是 1,000,000/秒。
> [while running 'generatedPtransform-52321']
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
org.apache.beam.sdk.util.MoreFutures.get(MoreFutures.java:57)
org.apache.beam.runners.dataflow.worker.fn.control.RegisterAndProcessBundleOperation.finish(RegisterAndProcessBundleOperation.java:332)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:85)
org.apache.beam.runners.dataflow.worker.fn.control.BeamFnMapTaskExecutor.execute(BeamFnMapTaskExecutor.java:125)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1350)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1100(StreamingDataflowWorker.java:152)
org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$7.run(StreamingDataflowWorker.java:1073)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Error received from SDK harness for instruction -52327: Traceback (most recent call last):
File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 498, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1024, in process
schema)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1009, in _create_table_if_needed
additional_create_parameters=self.additional_bq_parameters)
File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 712, in get_or_create_table
found_table = self.get_table(project_id, dataset_id, table_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 480, in get_table
response = self.client.tables.Get(request)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 581, in Get
config, request, global_params=global_params)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
http_response, method_config=method_config, request=request)
apitools.base.py.exceptions.HttpForbiddenError: HttpError accessing <https://www.googleapis.com/bigquery/v2/projects/bought-by-many/datasets/mongo_landing_zone/tables/service_user_users_users?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 25 Feb 2020 16:49:25 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 403,
"message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
"errors": [
{
"message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
"domain": "usageLimits",
"reason": "rateLimitExceeded"
}
],
"status": "PERMISSION_DENIED"
}
}
>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 167, in _execute
response = task()
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 223, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 352, in do_instruction
request.instruction_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/sdk_worker.py", line 386, in process_bundle
bundle_processor.process_bundle(instruction_id))
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 812, in process_bundle
data.transform_id].process_encoded(data.data)
File "/usr/local/lib/python3.6/site-packages/apache_beam/runners/worker/bundle_processor.py", line 205, in process_encoded
self.output(decoded_value)
File "apache_beam/runners/worker/operations.py", line 302, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 304, in apache_beam.runners.worker.operations.Operation.output
File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 657, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 658, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 878, in apache_beam.runners.common.DoFnRunner.receive
File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 941, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 497, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1028, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 657, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 658, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 878, in apache_beam.runners.common.DoFnRunner.receive
File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 941, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 497, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "apache_beam/runners/common.py", line 1028, in apache_beam.runners.common._OutputProcessor.process_outputs
File "apache_beam/runners/worker/operations.py", line 178, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
File "apache_beam/runners/worker/operations.py", line 657, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/worker/operations.py", line 658, in apache_beam.runners.worker.operations.DoOperation.process
File "apache_beam/runners/common.py", line 878, in apache_beam.runners.common.DoFnRunner.receive
File "apache_beam/runners/common.py", line 885, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 956, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "/usr/local/lib/python3.6/site-packages/future/utils/__init__.py", line 421, in raise_with_traceback
raise exc.with_traceback(traceback)
File "apache_beam/runners/common.py", line 883, in apache_beam.runners.common.DoFnRunner.process
File "apache_beam/runners/common.py", line 498, in apache_beam.runners.common.SimpleInvoker.invoke_process
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1024, in process
schema)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery.py", line 1009, in _create_table_if_needed
additional_create_parameters=self.additional_bq_parameters)
File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 712, in get_or_create_table
found_table = self.get_table(project_id, dataset_id, table_id)
File "/usr/local/lib/python3.6/site-packages/apache_beam/utils/retry.py", line 226, in wrapper
return fun(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/bigquery_tools.py", line 480, in get_table
response = self.client.tables.Get(request)
File "/usr/local/lib/python3.6/site-packages/apache_beam/io/gcp/internal/clients/bigquery/bigquery_v2_client.py", line 581, in Get
config, request, global_params=global_params)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 731, in _RunMethod
return self.ProcessHttpResponse(method_config, http_response, request)
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 737, in ProcessHttpResponse
self.__ProcessHttpResponse(method_config, http_response, request))
File "/usr/local/lib/python3.6/site-packages/apitools/base/py/base_api.py", line 604, in __ProcessHttpResponse
http_response, method_config=method_config, request=request)
RuntimeError: apitools.base.py.exceptions.HttpForbiddenError: HttpError accessing <https://www.googleapis.com/bigquery/v2/projects/bought-by-many/datasets/mongo_landing_zone/tables/service_user_users_users?alt=json>: response: <{'vary': 'Origin, X-Origin, Referer', 'content-type': 'application/json; charset=UTF-8', 'date': 'Tue, 25 Feb 2020 16:49:25 GMT', 'server': 'ESF', 'cache-control': 'private', 'x-xss-protection': '0', 'x-frame-options': 'SAMEORIGIN', 'x-content-type-options': 'nosniff', 'transfer-encoding': 'chunked', 'status': '403', 'content-length': '560', '-content-encoding': 'gzip'}>, content <{
"error": {
"code": 403,
"message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
"errors": [
{
"message": "Exceeded rate limits: too many api requests per user per method for this user_method. For more information, see https://cloud.google.com/bigquery/troubleshooting-errors",
"domain": "usageLimits",
"reason": "rateLimitExceeded"
}
],
"status": "PERMISSION_DENIED"
}
}
我使用的代码如下:
files = (
p
| "read PubSub"
>> beam.io.ReadFromPubSub(
topic=known_args.input_topic, with_attributes=True, id_label=id_label
)
| "decode message" >> beam.Map(lambda pubsub_msg: json.loads(pubsub_msg.data))
| "filter buckets with unknown encodings"
>> beam.Filter(no_encoding_bucket_filter, encodings)
| "get file from bucket" >> beam.ParDo(GetFileFromBucket())
)
policies = (
files
| f"filter for policies"
>> beam.Filter(lambda msg: 'policies' in msg["bucket"])
| f"encode policies"
>> beam.Map(apply_encoding, encodings['policies'], 'policies')
| f"filter out policies that failed to encode"
>> beam.Filter(lambda item: True if item is not None else False)
| f"insert policies to BigQuery"
>> beam.io.WriteToBigQuery(
project=project_id,
table="service_policy_policies",
dataset="mongo_landing_zone",
insert_retry_strategy="RETRY_ON_TRANSIENT_ERROR",
)
)
beam.io.WriteToBigQuery()
确实可以处理流数据,但从错误来看,我怀疑它正在初始化或获取 BigQuery 表作为每个已处理元素的对象,而不是仅仅插入一个排。我是否以某种不正确的方式使用它?
更新 2020-03-11
我设法改善了,但没有解决问题。我从使用 beam.io.WriteToBigQuery
切换到编写一个名为 WriteToBigQueryCustom
的自定义类来做同样的事情。我仍然遇到错误,但现在只有 500/秒或更高的吞吐量。
更新代码:
class WriteToBigQueryCustom(beam.DoFn):
"""
Stream insert records into a BigQuery table. Intended to work the same way you'd
expect beam.io.WriteToBigQuery to work for streaming.
Even though beam.io.WriteToBigQuery supports streaming, it seemed to be
initialising the BigQuery connection for every element processed. Was
getting throttled and causing errors about hitting BQ api limits at throughput of
100 elements/sec when the streaming inserts limit is 1,000,000/sec.
"""
def __init__(self, project_id, dataset, table_name):
self.project_id = project_id
self.dataset = dataset
self.table_name = table_name
self.table_id = f"{project_id}.{dataset}.{table_name}"
def start_bundle(self):
self.bq_client = bigquery.Client()
self.table = self.bq_client.get_table(self.table_id)
def process(self, dict_to_insert):
"""Insert a dict to the classes BigQuery table"""
errors = self.bq_client.insert_rows(self.table, [dict_to_insert])
if errors:
logging.error(
f"Hit error uploading row to bigquery table {self.table_id}: "
f"{errors}. Was trying to insert dict: {dict_to_insert}"
)
最佳答案
我在运行类似的管道时遇到了同样的麻烦。 Python/Beam SDK 中似乎存在某种错误。
https://issues.apache.org/jira/browse/BEAM-6831
添加 create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER 对我有帮助。
问候迈克尔
关于python - Google Dataflow 流式插入 BigQuery 达到速率限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60400292/
我已经下载了 RStudio,在打开我的代码所在的文件时,我似乎已经达到了容量限制: The file is 2.3MB the maximum file size is 2MB The file i
我有一个按钮,每次单击时,都会将 1 添加到变量中。当此变量超过 5 时,将触发警报。然而,此后触发器仍不断激活。我尝试使用 == 而不是 > 进行检查,但它做同样的事情。有什么想法吗? http:/
我正在将Slick 3.0与HikariCP 2.3.8一起使用(也可以玩2.4) 我做了很多数据库IO,并且不断达到队列限制。 有没有一种方法可以获取当前的队列大小,以及如何增加队列大小? 还是建议
在 Salesforce 中,您可以设置各种工作流程或构建用于发送电子邮件的 API 应用程序。对于大多数标准 Salesforce 组织,每天有 1000 封电子邮件的限制。 (例如,参见 here
我有一个类是这样的: public sealed class Contract { public bool isExpired { get; set; } public DateTim
我有一个带有特殊符号按钮的输入作为附加组件。 HTML
我正在尝试压缩 pdf 文件(有时是图像)。我需要一个 java 压缩器来帮助我压缩文件。我需要尺寸小于原始文档尺寸的一半。我尝试了java api中给出的deflator。但它并不是很成功。请帮我解
我正在使用这条线来创建淡入效果。 $('#div').css({opacity: 0, visibility:"visible"}).animate({opacity: 1}, 500); 可见类达到
我使用 URLCache 来缓存请求响应,最大容量如下: let diskCapacity = 100 * 1024 * 1024 let memoryCapacity = 100
我有一个计数器函数,我从这个 Answer 得到它: function countDown(i) { var int = setInterval(function () {
下面是一段代码,用于检查给定数字是否为 Lychrel 数字。这基本上意味着该程序取一个数及其倒数之和,然后取那个数及其倒数之和,等等,直到找到回文。如果它在一定的迭代次数内没有找到这样的数字(我在这
我即将对这个可怕的旧 Java Web 应用程序做一些工作,这是我的一个 friend 不久前继承的。 在我设置 tomcat、导入项目和所有这些到我的 eclipse 工作区后,我收到此错误,指出
我有一个 NSDictionary 对象,其中包含深层结构,例如包含包含字典的进一步数组的数组... 我想在层次结构中向下获取一个对象。是否有任何直接索引方法可以使用键名或其他方式获取它们? 多次调用
正如标题所说,我的 .border div 的边框跨度比它里面的要宽。它只会在达到 710px 时发生,因此您需要在 this fiddle 中展开结果窗口。 . 我希望边框保持在其内容周围而不超过它
我在 MySQL 中有一个表,通过 Microsoft Access 2013 中的链接表(通过 ODBC) Access 。 此表包含超过 124,000 条记录,我需要一个表单中的 ComboBo
一旦上一个输入达到其最大长度值,我如何才能聚焦下一个输入? a: b: c: 如果用户粘贴的文本大于最大长度,理想情况下它应该溢出到下一个输入。 jsFiddle: http://jsfiddl
我的任务是在客户的 QA 服务器上提供服务器性能报告。理想情况下,客户希望对约 900 个并发用户进行负载测试,因为这是他们在高峰时段通常使用的数量。然而,我一直在做的负载测试正在使他们的 QA 服务
我在 django 应用程序中对我的 celery worker 运行任务,其中每个任务执行大约需要 1-2 秒。通常这些执行都很好,但有时,特别是如果 Django 应用程序已经部署了一段时间,我开
我有一个 one_for_one 主管来处理类似且完全独立的 child 。 当一个 child 出现问题时,反复崩溃并触发: =SUPERVISOR REPORT==== 30-Mar-2011::
根据该网站,他们在免费计划中限制了 100 个并发连接,但是当第 101 个连接尝试连接时,它被拒绝,那么什么时候允许新连接? 例如:用户是否必须等待一定时间或一旦一个连接关闭,另一个连接就有机会连接
我是一名优秀的程序员,十分优秀!