gpt4 book ai didi

python - 在大型数据集上执行 session 窗口时 Apache Beam 作业失败

转载 作者:行者123 更新时间:2023-12-02 03:01:32 28 4
gpt4 key购买 nike

我正在处理一个 Python Apache Beam 作业,该作业在有界数据集上使用 session 窗口。它适用于小型数据集,但当我增加输入数据的大小时,这项工作就会终止。

作业 ID 为 2019-06-10_07_28_32-2942508228086251217 .

elements = (p | 'IngestData' >> beam.io.Read(big_query_source))

elements | 'AddEventTimestamp' >> beam.ParDo(AddTimestampDoFn()) \
| 'SessionWindow' >> beam.WindowInto(window.Sessions(10 * 60)) \
| 'CreateTuple' >> beam.Map(lambda row: (row['id'], {'attribute1': row['attribute1'], 'date': row['date']})) \
| 'GroupById1' >> beam.GroupByKey() \
| 'AggregateSessions' >> beam.ParDo(AggregateTransactions()) \
| 'MergeWindows' >> beam.WindowInto(window.GlobalWindows()) \
| 'GroupById2' >> beam.GroupByKey() \
| 'MapSessionsToLists' >> beam.Map(lambda x: (x[0], [y for y in x[1]])) \
| 'BiggestSession' >> beam.ParDo(MaximumSession()) \
| "PrepForWrite" >> beam.Map(lambda x: x[1].update({"id": x[0]}) or x[1]) \
| 'WriteResult' >> WriteToText(known_args.output)

DoFn 类为

class AddTimestampDoFn(beam.DoFn):
def process(self, element):
date = datetime.datetime.strptime(element['date'][:-4], '%Y-%m-%d %H:%M:%S.%f')
unix_timestamp = float(date.strftime('%s'))
yield beam.window.TimestampedValue(element, unix_timestamp)


class AggregateTransactions(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam):
session_count = len(element[1])
attributes = list(map(lambda row: row['attribute1'], element[1]))
std = np.std(amounts)

return [(element[0], {'session_count': session_count, 'session_std': std, 'window_start': window.start
.to_utc_datetime()
.strftime('%d-%b-%Y %H:%M:%S')})]


class MaximumSession(beam.DoFn):
def process(self, element):
sorted_counts = sorted(element[1], key = lambda x: x['session_count'], reverse=True)

return [(element[0], {'session_count': sorted_counts[0]['session_count'],
'session_std': sorted_counts[0]['session_std'],
'window_start_time': sorted_counts[0]['window_start']})]

作业失败并给出以下错误:The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers:

stackdriver 上的特定工作日志没有任何暗示。我只是得到这些条目的组合:

processing lull for over 431.44 seconds in state process-msecs in step s5

Refusing to split <dataflow_worker.shuffle.GroupedShuffleRangeTracker object at 0x7f82e970cbd0> at '\n\xaaG\t\x00\x01': proposed split position is out of range

Retry with exponential backoff: waiting for 4.69305060273 seconds before retrying lease_work because we caught exception: SSLError: ('The read operation timed out',)

其余条目仅供引用。

该特定工作线程的最新内存使用量为 43413 MB。因为我正在使用 n1-highmem-32机器,我不认为内存可能是一个问题。

在客户端,Cloud Shell,我触发了这项工作,我刚刚得到了很多

INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 1/2)
INFO:oauth2client.transport:Refreshing due to a 401 (attempt 2/2)

在作业崩溃之前。

有什么想法吗?

谢谢

最佳答案

默认情况下,如果在 BATCH 模式下出现任何错误,Dataflow 会重试管道 4 次,而在 STREAM 模式下运行时会无限期重试管道。

请在用于管道的计算引擎计算机的堆栈驱动程序中创建仪表板,以分析正在发生多少内存、CPU 消耗和 IO 操作。仔分割析上述因素后,应该提高管道的配置。

请确保所有转换都根据您提供的数据正常工作,并应用异常处理。

关于python - 在大型数据集上执行 session 窗口时 Apache Beam 作业失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56531872/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com