gpt4 book ai didi

java - 谷歌云数据流: Submitted job is executing but using old code

转载 作者:行者123 更新时间:2023-12-01 09:03:25 25 4
gpt4 key购买 nike

我正在编写一个数据流管道,它应该做三件事:

  • 从 GCP 存储读取 .csv 文件
  • 将数据解析为 BigQuery 兼容的 TableRow
  • 将数据写入 BigQuery 表

到目前为止,这一切都非常有效。它仍然如此,但是当我更改源变量和目标变量时,没有任何变化。实际运行的作业是旧作业,而不是最近更改(和提交)的代码。不知何故,当我使用 BlockingDataflowPipelineRunner 从 Eclipse 运行代码时,代码本身并未上传,而是使用了旧版本。

通常代码没有任何问题,但要尽可能完整:

public class BatchPipeline {
String source = "gs://sourcebucket/*.csv";
String destination = "projectID:datasetID.testing1";

//Creation of the pipeline with default arguments
Pipeline p = Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

PCollection<String> line = p.apply(TextIO.Read.named("ReadFromCloudStorage")
.from(source));

@SuppressWarnings("serial")
PCollection<TableRow> tablerows = line.apply(ParDo.named("ParsingCSVLines").of(new DoFn<String, TableRow>(){
@Override
public void processElement(ProcessContext c){
//processing code goes here
}
}));

//Defining the BigQuery table scheme
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("datetime").setType("TIMESTAMP").setMode("REQUIRED"));
fields.add(new TableFieldSchema().setName("consumption").setType("FLOAT").setMode("REQUIRED"));
fields.add(new TableFieldSchema().setName("meterID").setType("STRING").setMode("REQUIRED"));
TableSchema schema = new TableSchema().setFields(fields);
String table = destination;

tablerows.apply(BigQueryIO.Write
.named("BigQueryWrite")
.to(table)
.withSchema(schema)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withoutValidation());

//Runs the pipeline
p.run();
}

出现这个问题是因为我刚刚更换了笔记本电脑并且必须重新配置所有内容。我正在开发一个干净的 Ubuntu 16.04 LTS 操作系统,并安装了 GCP 开发的所有依赖项(通常)。通常,一切都配置得很好,因为我能够开始一项工作(如果我的配置错误,这应该是不可能的,对吧?)。顺便说一句,我正在使用 Eclipse Neon。

那么问题可能出在哪里呢?在我看来,上传代码时出现问题,但我已确保我的云 git 存储库是最新的,并且暂存桶已清理...

**** 更新 ****

我从未发现到底出了什么问题,但是当我检查部署的 jar 中文件的创建日期时,我确实发现它们从未真正更新过。然而,jar 文件本身有一个最近的时间戳,这让我完全忽略了这个问题(菜鸟错误)。

我最终通过简单地在 Eclipse 中创建一个新的 Dataflow 项目并将我的 .java 文件从损坏的项目复制到新项目中,使一切再次正常运行。从那时起,一切都变得魅力十足。

最佳答案

提交 Dataflow 作业后,您可以通过检查作业描述中的文件(可通过 DataflowPipelineWorkerPoolOptions#getFilesToStage 获取)来检查哪些工件是作业规范的一部分。 。下面的代码片段提供了如何获取此信息的一些示例。

PipelineOptions myOptions = ...
myOptions.setRunner(DataflowPipelineRunner.class);
Pipeline p = Pipeline.create(myOptions);

// Build up your pipeline and run it.
p.apply(...)
p.run();

// At this point in time, the files which were staged by the
// DataflowPipelineRunner will have been populated into the
// DataflowPipelineWorkerPoolOptions#getFilesToStage
List<String> stagedFiles = myOptions.as(DataflowPipelineWorkerPoolOptions.class).getFilesToStage();
for (String stagedFile : stagedFiles) {
System.out.println(stagedFile);
}

上面的代码应该打印出类似的内容:

/my/path/to/file/dataflow.jar
/another/path/to/file/myapplication.jar
/a/path/to/file/alibrary.jar

您上传的作业的资源部分可能在某种程度上已经过时,其中包含您的旧代码。查看暂存列表的所有目录和 jar 部分,找到 BatchPipeline 的所有实例并验证其年龄。可以使用 jar 提取 jar 文件工具或任何 zip 文件阅读器。或者使用javap或任何其他 class file inspector验证 BatchPipeline 类文件是否与您所做的预期更改一致。

关于java - 谷歌云数据流: Submitted job is executing but using old code,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41486455/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com