gpt4 book ai didi

python - 没有 Web 服务的 Bigquery cron 作业

转载 作者:太空宇宙 更新时间:2023-11-04 02:40:20 25 4
gpt4 key购买 nike

是否可以在不运行 google app engine webservice 的情况下运行处理用户数据的脚本?

对于较小的脚本,它运行良好,但是当我的脚本持续大约 40 分钟时,我收到错误:DeadlineExceededError

我的临时解决方案是在 Windows VM 上使用 Windows 调度程序,并使用 Python 脚本在命令行中使用

编辑:添加代码

jobs = []
jobs_status = []
jobs_error = []
# The project id whose datasets you'd like to list
PROJECT_NUMBER = 'project'
scope = ('https://www.googleapis.com/auth/bigquery',
'https://www.googleapis.com/auth/cloud-platform',
'https://www.googleapis.com/auth/drive',
'https://spreadsheets.google.com/feeds')

credentials = ServiceAccountCredentials.from_json_keyfile_name('client_secrets.json', scope)

# Create the bigquery api client
service = googleapiclient.discovery.build('bigquery', 'v2', credentials=credentials)

def load_logs(source):
body = {"rows": [
{"json": source}
]}

response = service.tabledata().insertAll(
projectId=PROJECT_NUMBER,
datasetId='test',
tableId='test_log',
body=body).execute()
return response

def job_status():
for job in jobs:
_jobId = job['jobReference']['jobId']
status = service.jobs().get(projectId=PROJECT_NUMBER, jobId=_jobId).execute()
jobs_status.append(status['status']['state'])
if 'errors' in status['status'].keys():
query = str(status['configuration']['query']['query'])
message = str(status['status']['errorResult']['message'])
jobs_error.append({"query": query, "message": message})
return jobs_status


def check_statues():
while True:
if all('DONE' in job for job in job_status()):
return


def insert(query, tableid, disposition):
job_body = {
"configuration": {
"query": {
"query": query,
"useLegacySql": True,
"destinationTable": {
"datasetId": "test",
"projectId": "project",
"tableId": tableid
},
"writeDisposition": disposition
}
}
}

r = service.jobs().insert(
projectId=PROJECT_NUMBER,
body=job_body).execute()
jobs.append(r)
return r



class MainPage(webapp2.RequestHandler):
def get(self):
query = "SELECT * FROM [gdocs_users.user_empty]"
insert(query, 'users_data_p1', "WRITE_TRUNCATE")
check_statues()
query = "SELECT * FROM [gdocs_users.user_empty]"
insert(query, 'users_data_p2', "WRITE_TRUNCATE")
query = "SELECT * FROM [gdocs_users.user_%s]"
for i in range(1, 1000):
if i <= 600:
insert(query % str(i).zfill(4), 'users_data_p1', "WRITE_APPEND")
else:
insert(query % str(i).zfill(4), 'user_data_p2', "WRITE_APPEND")
for error in jobs_error:
load_logs(error)


app = webapp2.WSGIApplication([
('/', MainPage),
], debug=True)

最佳答案

默认情况下,App Engine 服务使用 automatic scaling ,它对 HTTP 请求有 60 秒的限制,对任务队列请求有 10 分钟的限制。如果您将服务更改为使用基本或手动缩放,则您的任务队列请求最多可以运行 24 小时。

听起来你可能只需要一个实例来完成这项工作,所以也许创建第二个 service除了默认服务。在子文件夹中创建一个 bqservice 文件夹,其中包含以下 app.yaml 设置,这些设置使用最多一个实例的基本缩放:

# bqsservice/app.yaml
# Possibly use a separate service for your BQ code than
# the rest of your app:
service: bqservice
runtime: python27
api_version: 1
# Keep low memory/cost B1 class?
instance_class: B1
# Limit max services to 1 to keep costs down. There is an
# 8 instance hour limit to the free tier. This option still
# scales to 0 when not in use.
basic_scaling:
max_instances: 1

# Handlers:
handlers:
- url: /.*
script: main.app

然后创建一个cron.yaml在同一服务中安排您的脚本运行。使用我上面的示例配置,您可以将 BigQuery 逻辑放入一个 main.py 文件中,其中定义了一个 WSGI 应用程序:

# bqservice/main.py
import webapp2

class CronHandler(webapp2.RequestHandler):

def post(self):
# Handle your cron work
# ....

app = webapp2.WSGIApplication([
#('/', MainPage), # If you needed other handlers
('/mycron', CronHandler),
], debug=True)

如果您不打算将 App Engine 应用用于其他用途,则可以将所有这些都用于默认服务。如果除了默认服务之外还执行此操作,则需要先将某些内容部署到默认服务,即使它只是一个带有静态文件的简单 app.yaml

关于python - 没有 Web 服务的 Bigquery cron 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46738644/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com