gpt4 book ai didi

java - Google App Engine 无法启动 Dataflow 作业

转载 作者:行者123 更新时间:2023-12-02 10:06:45 25 4
gpt4 key购买 nike

这是我在失败时打印出来的错误消息。我正在使用开发本地服务器,然后转到 http://localhost:8080/dataflow/schedule调用 doGet() 来启动数据流管道。我还使用应用程序引擎默认服务帐户 (@appspot.gserviceaccount.com) 作为凭据。

这是我开始工作的代码,

@WebServlet(name = "dataflowscheduler", value = "/dataflow/schedule")
public class DataflowSchedulingServlet extends HttpServlet {
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException {


Properties properties = System.getProperties();


try{
String[] args = {
"--project=<project_name>",
"--runner=DataflowRunner",
"--stagingLocation=gs://<project_name>/temp/cronjobs",
"--maxNumWorkers=1",
"--gcpTempLocation=gs://<project_name>/temp/gcpTempLocation",
"--tempLocation=gs://<project_name>/temp/tempLocation",
"--driverClassName=org.postgresql.Driver",
"--connectionURL=jdbc:postgresql://example.com:port/production",
"--username=<username>",
"--password=<password>",
"--driverJars=gs://<project_name>/temp/postgresql-42.2.5.jar",
"--bigQueryLoadingTemporaryDirectory=gs://<project_name>/temp/",
"--connectionProperties='unicode=true&characterEncoding=UTF-8'",
"--query=SELECT * FROM public.view_relations",
"--datasetname=flyhomes_production",
"--schemaname=public",
"--tablename=view_relations",
"--outputTable=<project_name>:<dataset>.view_relations",
"--dataflowJobFile=/Users/annasun/GoogleCloudProject/postgres-to-bigquery/out.json"};

JdbcToBigQuery.main(args);

System.out.println("STARTJOB() !!! ");

} catch (InterruptedException e) {
response.getWriter().println( "Exception: " + Arrays.toString(e.getStackTrace()));
}

response.setContentType("text/plain");
response.getWriter().println("Hello App Engine - Standard using "
// + SystemProperty.version.get()
+ " Java " + properties.get("java.specification.version"));

}

这是主要功能,

public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("HelloWorld()!" );

// Parse the user options passed from the command-line
JdbcConverters.JdbcToBigQueryOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(JdbcConverters.JdbcToBigQueryOptions.class);

String datasetName = options.getDatasetname().toString();
String jobName = options.getJobName();
String tableName = options.getTablename().toString().replace("_", "-");
jobName = jobName + "-" + tableName;
options.setJobName(jobName);

System.out.println("run_updateTable_production(options)");
run_updateTable_production(options);
System.out.println("AFTER -- run_updateTable_production(options) ");

}


private static void run_updateTable_production(JdbcConverters.JdbcToBigQueryOptions options)
throws InterruptedException{

Timestamp lastUpdatedTime = SchemaCreator.getLastUpdatedTimeFromBigQuery(options);
System.out.println("LAST UPDATED TIME IS " + lastUpdatedTime);

if(lastUpdatedTime != null ) {
System.out.println("!! LAST UPDATED TIME IS " + lastUpdatedTime);

String query_base = options.getQuery().toString();
String query_update = query_base + " WHERE updated_at > '" + lastUpdatedTime
+ "' ORDER BY updated_at, id ";
String jobName = options.getJobName();
// select * from public.listings WHERE updated_at > lastUpdatedTime
// ORDER BY updated_at, id OFFSET 100 LIMIT 50

options.setQuery(ValueProvider.StaticValueProvider.of(query_update));
System.out.println("QUERY IS : " + options.getQuery());
options.setJobName(jobName + "-UPDATE-"
+ lastUpdatedTime.toString().replace(":", "-").replace(".", "-"));
System.out.println(jobName + "-UPDATE-"
+ lastUpdatedTime.toString().replace(":", "-").replace(".", "-"));
run(options);

} else {
run_createTable_Recursive(options);
}

System.out.println("FINISHED -- run_updateTable_production(options) ");
}



/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
private static PipelineResult run(JdbcConverters.JdbcToBigQueryOptions options) {
System.out.println("BEFORE Pipeline.create!!!!");
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);

System.out.println("AFTER Pipeline.create!!!!");

/*
* Steps: 1) Read records via JDBC and convert to TableRow via RowMapper
* 2) Append TableRow to BigQuery via BigQueryIO
*/
pipeline
/*
* Step 1: Read records via JDBC and convert to TableRow
* via {@link org.apache.beam.sdk.io.jdbc.JdbcIO.RowMapper}
*/
.apply(
"Read from JdbcIO",
DynamicJdbcIO.<TableRow>read()
.withDataSourceConfiguration(
DynamicJdbcIO.DynamicDataSourceConfiguration.create(
options.getDriverClassName(), options.getConnectionURL())
.withUsername(options.getUsername())
.withPassword(options.getPassword())
.withDriverJars(options.getDriverJars())
.withConnectionProperties(options.getConnectionProperties()))
.withQuery(options.getQuery())
.withCoder(TableRowJsonCoder.of())
.withRowMapper(JdbcConverters.getResultSetToTableRow()))
/*
* Step 2: Append TableRow to an existing BigQuery table
*/
.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.to(options.getOutputTable()));

System.out.println("AFTER Pipeline.APPLY!!!!");
// Execute the pipeline and return the result.
return pipeline.run();
}

但是,我遇到服务器错误。

HTTP 错误 500访问/dataflow/schedule 时出现问题。

Server Error
Caused by:

java.lang.RuntimeException: Error while staging packages
at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:398)
at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:271)
at org.apache.beam.runners.dataflow.util.GcsStager.stageFiles(GcsStager.java:80)
at org.apache.beam.runners.dataflow.util.GcsStager.stageDefaultFiles(GcsStager.java:68)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:713)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:179)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:313)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:299)
at com.flyhomes.cloud.dataflow.JdbcToBigQuery.run(JdbcToBigQuery.java:258)
at com.flyhomes.cloud.dataflow.JdbcToBigQuery.run_updateTable_production(JdbcToBigQuery.java:140)
at com.flyhomes.cloud.dataflow.JdbcToBigQuery.main(JdbcToBigQuery.java:104)
at com.flyhomes.cloud.dataflow.DataflowSchedulingServlet.doGet(DataflowSchedulingServlet.java:64)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:865)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1655)
at com.google.appengine.tools.development.ResponseRewriterFilter.doFilter(ResponseRewriterFilter.java:134)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
at com.google.appengine.tools.development.HeaderVerificationFilter.doFilter(HeaderVerificationFilter.java:34)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
at com.google.appengine.api.blobstore.dev.ServeBlobFilter.doFilter(ServeBlobFilter.java:63)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
at com.google.apphosting.utils.servlet.TransactionCleanupFilter.doFilter(TransactionCleanupFilter.java:48)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
at com.google.appengine.tools.development.jetty9.StaticFileFilter.doFilter(StaticFileFilter.java:123)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
at com.google.appengine.tools.development.DevAppServerModulesFilter.doDirectRequest(DevAppServerModulesFilter.java:366)
at com.google.appengine.tools.development.DevAppServerModulesFilter.doDirectModuleRequest(DevAppServerModulesFilter.java:349)
at com.google.appengine.tools.development.DevAppServerModulesFilter.doFilter(DevAppServerModulesFilter.java:116)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)
at com.google.appengine.tools.development.DevAppServerRequestLogFilter.doFilter(DevAppServerRequestLogFilter.java:44)
at org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1634)
at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:146)
at org.eclipse.jetty.security.SecurityHandler.handle(SecurityHandler.java:524)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:257)
at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595)
at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)
at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564)
at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219)
at com.google.appengine.tools.development.jetty9.DevAppEngineWebAppContext.doScope(DevAppEngineWebAppContext.java:94)
at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at com.google.appengine.tools.development.jetty9.JettyContainerService$ApiProxyHandler.handle(JettyContainerService.java:595)
at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
at org.eclipse.jetty.server.Server.handle(Server.java:531)
at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352)
at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)
at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281)
at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102)
at org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.produce(EatWhatYouKill.java:132)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)
at org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at com.google.appengine.tools.development.RequestEndListenerHelper.getListeners(RequestEndListenerHelper.java:52)
at com.google.appengine.tools.development.RequestEndListenerHelper.register(RequestEndListenerHelper.java:39)
at com.google.appengine.tools.development.RequestThreadFactory$1$1.start(RequestThreadFactory.java:65)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357)
at org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:530)
at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
at org.apache.beam.sdk.util.MoreFutures.supplyAsync(MoreFutures.java:101)
at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackage(PackageUtil.java:170)
at org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stageClasspathElements$2(PackageUtil.java:359)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by:
java.lang.NullPointerException
at com.google.appengine.tools.development.RequestEndListenerHelper.getListeners(RequestEndListenerHelper.java:52)
at com.google.appengine.tools.development.RequestEndListenerHelper.register(RequestEndListenerHelper.java:39)
at com.google.appengine.tools.development.RequestThreadFactory$1$1.start(RequestThreadFactory.java:65)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:950)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1357)
at org.apache.beam.repackaged.beam_runners_google_cloud_dataflow_java.com.google.common.util.concurrent.MoreExecutors$ListeningDecorator.execute(MoreExecutors.java:530)
at java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
at java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
at org.apache.beam.sdk.util.MoreFutures.supplyAsync(MoreFutures.java:101)
at org.apache.beam.runners.dataflow.util.PackageUtil.stagePackage(PackageUtil.java:170)
at org.apache.beam.runners.dataflow.util.PackageUtil.lambda$stageClasspathElements$2(PackageUtil.java:359)
at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:952)
at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
at java.util.concurrent.CompletableFuture$Completion.exec(CompletableFuture.java:443)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)

日志中的错误消息,

{
"error": {
"errors": [
{
"domain": "global",
"reason": "required",
"message": "Anonymous caller does not have storage.buckets.list access to project <project_number>.",
"locationType": "header",
"location": "Authorization"
}
],
"code": 401,
"message": "Anonymous caller does not have storage.buckets.list access to project <project_number>."
}
}

最佳答案

您看到的错误。您的应用程序引擎实例应以特定用户或“Service Account ”身份运行。您需要在该帐户上启用 storage.buckets.list 的权限。您使用的默认服务帐户可能没有启用这些权限,您可以找到 instructions here 。我还建议首先尝试在应用程序引擎之外运行管道,并验证其是否成功运行。创建模板管道并以这种方式启动它也可能更简单。

另外,仅供引用,可能还值得仔细检查这篇博文中的所有内容:

此博客展示了如何启动 DF 模板作业,将其作为 cron 作业执行,您只需要从 RPC 运行它即可。这些说明至少应该对大部分设置有帮助。

https://amygdala.github.io/dataflow/app_engine/2017/10/24/gae_dataflow.html

关于java - Google App Engine 无法启动 Dataflow 作业,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55290086/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com