gpt4 book ai didi

google-cloud-platform - Dataflow 停止流式传输到 BigQuery 而没有错误

转载 作者:行者123 更新时间:2023-12-01 13:17:17 25 4
gpt4 key购买 nike

我们开始使用 Dataflow 从 PubSub 和 Stream 读取数据到 BigQuery。数据流应该 24/7 全天候工作,因为 pubsub 会不断更新全局多个网站的分析数据。

代码如下所示:

from __future__ import absolute_import

import argparse
import json
import logging

import apache_beam as beam
from apache_beam.io import ReadFromPubSub, WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions

logger = logging.getLogger()

TABLE_IDS = {
'table_1': 0,
'table_2': 1,
'table_3': 2,
'table_4': 3,
'table_5': 4,
'table_6': 5,
'table_7': 6,
'table_8': 7,
'table_9': 8,
'table_10': 9,
'table_11': 10,
'table_12': 11,
'table_13': 12
}


def separate_by_table(element, num):
return TABLE_IDS[element.get('meta_type')]


class ExtractingDoFn(beam.DoFn):
def process(self, element):
yield json.loads(element)


def run(argv=None):
"""Main entry point; defines and runs the wordcount pipeline."""
logger.info('STARTED!')
parser = argparse.ArgumentParser()
parser.add_argument('--topic',
dest='topic',
default='projects/PROJECT_NAME/topics/TOPICNAME',
help='Gloud topic in form "projects/<project>/topics/<topic>"')
parser.add_argument('--table',
dest='table',
default='PROJECTNAME:DATASET_NAME.event_%s',
help='Gloud topic in form "PROJECT:DATASET.TABLE"')
known_args, pipeline_args = parser.parse_known_args(argv)

# We use the save_main_session option because one or more DoFn's in this
# workflow rely on global context (e.g., a module imported at module level).
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)

lines = p | ReadFromPubSub(known_args.topic)
datas = lines | beam.ParDo(ExtractingDoFn())
by_table = datas | beam.Partition(separate_by_table, 13)

# Create a stream for each table
for table, id in TABLE_IDS.items():
by_table[id] | 'write to %s' % table >> WriteToBigQuery(known_args.table % table)

result = p.run()
result.wait_until_finish()


if __name__ == '__main__':
logger.setLevel(logging.INFO)
run()

它工作正常,但一段时间后(2-3 天)由于某种原因停止流式传输。当我检查作业状态时,它在日志部分中没有错误(你知道,在数据流的作业详细信息中用红色“!”标记的错误)。如果我取消作业并再次运行它 - 它会像往常一样再次开始工作。如果我检查 Stackdriver 的额外日志,这里是发生的所有错误: Errors list以下是作业执行时定期出现的一些警告: Warnings list其中之一的详细信息:

 {
insertId: "397122810208336921:865794:0:479132535"

jsonPayload: {
exception: "java.lang.IllegalStateException: Cannot be called on unstarted operation.
at com.google.cloud.dataflow.worker.fn.data.RemoteGrpcPortWriteOperation.getElementsSent(RemoteGrpcPortWriteOperation.java:111)
at com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.updateProgress(BeamFnMapTaskExecutor.java:293)
at com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor$SingularProcessBundleProgressTracker.periodicProgressUpdate(BeamFnMapTaskExecutor.java:280)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
"
job: "2018-11-30_10_35_19-13557985235326353911"
logger: "com.google.cloud.dataflow.worker.fn.control.BeamFnMapTaskExecutor"
message: "Progress updating failed 4 times. Following exception safely handled."
stage: "S0"
thread: "62"
work: "c-8756541438010208464"
worker: "beamapp-vitar-1130183512--11301035-mdna-harness-lft7"
}

labels: {
compute.googleapis.com/resource_id: "397122810208336921"
compute.googleapis.com/resource_name: "beamapp-vitar-1130183512--11301035-mdna-harness-lft7"
compute.googleapis.com/resource_type: "instance"
dataflow.googleapis.com/job_id: "2018-11-30_10_35_19-13557985235326353911"
dataflow.googleapis.com/job_name: "beamapp-vitar-1130183512-742054"
dataflow.googleapis.com/region: "europe-west1"
}
logName: "projects/PROJECTNAME/logs/dataflow.googleapis.com%2Fharness"
receiveTimestamp: "2018-12-03T20:33:00.444208704Z"

resource: {

labels: {
job_id: "2018-11-30_10_35_19-13557985235326353911"
job_name: "beamapp-vitar-1130183512-742054"
project_id: PROJECTNAME
region: "europe-west1"
step_id: ""
}
type: "dataflow_step"
}
severity: "WARNING"
timestamp: "2018-12-03T20:32:59.442Z"
}

这是它似乎开始出现问题的时刻: Problem arised可能有帮助的其他信息消息: Info messages

根据这些消息,我们不会耗尽内存/处理能力等。作业使用这些参数运行:

python -m start --streaming True --runner DataflowRunner --project PROJECTNAME --temp_location gs://BUCKETNAME/tmp/ --region europe-west1 --disk_size_gb 30 --machine_type n1-standard-1 --use_public_ips false --num_workers 1 --max_num_workers 1 --autoscaling_algorithm NONE

这可能是什么问题?

最佳答案

这并不是真正的答案,更多的是帮助确定原因:到目前为止,我使用 python SDK 启动的所有流式数据流作业在几天后都以这种方式停止,无论它们是否使用 BigQuery 作为接收器。所以原因似乎是普遍的事实 streaming jobs with the python SDK are still in beta .

我的个人解决方案:使用 Dataflow 模板从 Pub/Sub 流式传输到 BigQuery(从而避免使用 Python SDK),然后在 BigQuery 中安排查询以定期处理数据。不幸的是,这可能不适合您的用例。

关于google-cloud-platform - Dataflow 停止流式传输到 BigQuery 而没有错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53610876/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com