- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我创建了一个 Maven 项目来执行管道。如果我运行主类,管道就会完美运行。如果我创建一个 fat jar 并执行它,则会出现两种不同的错误,一种是在 Windows 下执行,另一种是在 Linux 下执行。
在 Windows 下:
Exception in thread "main" java.lang.RuntimeException: Error while staging packages
at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:364)
at org.apache.beam.runners.dataflow.util.PackageUtil.stageClasspathElements(PackageUtil.java:261)
at org.apache.beam.runners.dataflow.util.GcsStager.stageFiles(GcsStager.java:66)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:517)
at org.apache.beam.runners.dataflow.DataflowRunner.run(DataflowRunner.java:170)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289)
at ....
Caused by: java.nio.file.InvalidPathException: Illegal char <:> at index 2: gs://MY_BUCKET/staging
at sun.nio.fs.WindowsPathParser.normalize(Unknown Source)
at sun.nio.fs.WindowsPathParser.parse(Unknown Source)
at sun.nio.fs.WindowsPathParser.parse(Unknown Source)
at sun.nio.fs.WindowsPath.parse(Unknown Source)
at sun.nio.fs.WindowsFileSystem.getPath(Unknown Source)
at java.nio.file.Paths.get(Unknown Source)
at org.apache.beam.sdk.io.LocalFileSystem.matchNewResource(LocalFileSystem.java:196)
at org.apache.beam.sdk.io.LocalFileSystem.matchNewResource(LocalFileSystem.java:78)
at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:563)
at org.apache.beam.runners.dataflow.util.PackageUtil$PackageAttributes.forFileToStage(PackageUtil.java:452)
at org.apache.beam.runners.dataflow.util.PackageUtil$1.call(PackageUtil.java:147)
at org.apache.beam.runners.dataflow.util.PackageUtil$1.call(PackageUtil.java:138)
at org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:111)
at org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:58)
at org.apache.beam.runners.dataflow.repackaged.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:75)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
在Linux下:
Exception in thread "main" java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:233)
at org.apache.beam.sdk.util.InstanceBuilder.build(InstanceBuilder.java:162)
at org.apache.beam.sdk.PipelineRunner.fromOptions(PipelineRunner.java:52)
at org.apache.beam.sdk.Pipeline.create(Pipeline.java:142)
at ....
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:222)
... 8 more
Caused by: java.lang.IllegalArgumentException: Expected a valid 'gs://' path but was given '/home/USER/gs:/MY_BUCKET/temp/staging/'
at org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.getGcsPath(GcsPathValidator.java:101)
at org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPath(GcsPathValidator.java:75)
at org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.validateOutputFilePrefixSupported(GcsPathValidator.java:60)
at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:237)
... 13 more
Caused by: java.lang.IllegalArgumentException: Invalid GCS URI: /home/USER/gs:/MY_BUCKET/temp/staging/
at org.apache.beam.sdks.java.extensions.google.cloud.platform.core.repackaged.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
at org.apache.beam.sdk.util.gcsfs.GcsPath.fromUri(GcsPath.java:116)
at org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.getGcsPath(GcsPathValidator.java:99)
... 16 more
这是我的 pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>xxxxxxxxxxx</groupId>
<artifactId>xxxxxxxxx</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<!-- https://mvnrepository.com/artifact/com.google.cloud.dataflow/google-cloud-dataflow-java-sdk-all -->
<dependency>
<groupId>com.google.cloud.dataflow</groupId>
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.9.3</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.3</version>
</dependency>
<dependency>
<groupId>com.google.appengine</groupId>
<artifactId>appengine-api-1.0-sdk</artifactId>
<version>1.9.60</version>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-datastore</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>javax.servlet-api</artifactId>
<version>4.0.0</version>
</dependency>
</dependencies>
<build>
<finalName>myFatJar</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<transformers>
<transformer implementation= "org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.myclass.MyClass</mainClass>
</transformer>
</transformers>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
这些是我的管道选项:
...
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(DataflowPipelineOptions.class);
//options.setGcpTempLocation("gs://MY_BUCKET/temp");
options.setTempLocation("gs://MY_BUCKET/temp");
options.setStagingLocation("gs://MY_BUCKET/staging");
options.setProject("xxxxxxxx");
options.setJobName("asd");
options.setRunner(DataflowRunner.class);
Pipeline.create(options);
...
我尝试使用 GcpTempLocation 更改 tempLocation,但是如果这样做,则会出现此错误:
java.lang.IllegalArgumentException: BigQueryIO.Write needs a GCS temp location to store temp files.
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:122)
at org.apache.beam.sdk.io.gcp.bigquery.BatchLoads.validate(BatchLoads.java:191)
at org.apache.beam.sdk.Pipeline$ValidateVisitor.enterCompositeTransform(Pipeline.java:621)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:651)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:655)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:655)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:446)
at org.apache.beam.sdk.Pipeline.validate(Pipeline.java:563)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:302)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:289)
at ...
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
at java.lang.Thread.run(Thread.java:748)
我应该做什么?
最佳答案
此评论解决了我的问题:
Did you try explicitly adding the Apache Beam artifact for DataflowRunner to pom.xml? – Andrew
关于java - 从 fat jar 启动 Dataflow 作业时暂存包时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48689089/
我的问题是,在幕后,对于元素级 Beam DoFn (ParDo),Cloud Dataflow 的并行工作负载如何?例如,在我的 ParDO 中,我向外部服务器发送一个 http 请求以获取一个元素
就 Google Cloud 上 Dataflow 的 HA 而言,最佳架构是什么?我的工作负载在两个区域运行。数据流从一个多区域存储桶中读取并将结果写出到另一个多区域存储桶中。 为了实现高可用性(以
如图 here数据流管道由固定的 DAG 表示。我想知道是否有可能实现一个管道,在该管道中处理继续进行,直到基于到目前为止计算的数据满足动态评估的条件。 这是一些伪代码来说明我想要实现的内容:
在旧的定价页面上,他们提到 Cloud Dataflow 工作人员使用的所有 Google Compute 实例都是根据持续使用价格规则计费的,但新的定价页面不再提及。 我假设由于它在内部使用相同的
批处理 Dataflow 作业处理完所有数据后是否可以执行操作?具体来说,我想将管道刚刚处理的文本文件移动到不同的 GCS 存储桶。我不确定将它放在我的管道中的哪个位置以确保它在数据处理完成后执行一次
我希望能够通过自定义键使用分组,但这是我目前的尝试, 我们为 KV 对象的键使用自定义类,因为我们希望 GroupBy 具有更复杂的条件,而不是使用 String 等进行简单的键匹配。 ```
当尝试在 Dataflow 服务上运行管道时,我在命令行上指定了暂存和临时存储桶(在 GCS 中)。当程序执行时,我在管道运行之前收到一个 RuntimeException,根本原因是我在路径中遗漏了
我试图找到一种优雅地结束我的工作的方法,以免丢失任何数据,从 PubSub 流式传输并写入 BigQuery。 我可以设想的一种可能方法是让作业停止提取新数据,然后运行直到它处理完所有内容,但我不知道
问题: 使用 Cloud Dataflow 时,我们会看到 2 个指标(请参阅 this page): 系统延迟 数据新鲜度 这些在 Stackdriver 中也可用以下名称(摘自 here): sy
我一直在阅读 Dataflow SDK 文档,试图找出当数据到达流作业中的水印时会发生什么。 这一页: https://cloud.google.com/dataflow/model/windowin
有没有办法(或任何类型的黑客)从压缩文件中读取输入数据? 我的输入包含数百个文件,这些文件是用 gzip 压缩生成的,解压缩它们有些乏味。 最佳答案 Dataflow 现在支持从压缩文本源中读取(从
我正在尝试在 Dataflow 中执行联合操作。是否有用于在 Dataflow 中合并两个 PCollections 的示例代码? 最佳答案 一个简单的方法是像这样将 Flatten() 与 Remo
在我的管道上运行“更新”后,我注意到有新创建的永久磁盘在 10 多分钟后未附加到任何实例。 最佳答案 这是 Dataflow 服务的一个持续已知问题,会在管道更新过程中导致孤立磁盘。可以安全地删除这些
是否可以为 Dataflow 工作人员提供自定义包? 我想从计算内部输出到 Debian 打包的二进制文件。 编辑:需要明确的是,包配置非常复杂,仅将文件捆绑在 --filesToStage 中是不可
我想使用 Google Cloud Dataflow 创建 session 窗口,如 dataflow model paper 中所述。 .我想将我的未绑定(bind)数据发送到 Pub/Sub,然后
我正在尝试运行从 pubsub 主题读取并写入 bigquery 的管道。时间戳是从主题消息中解析出来的。但是,我收到了一条关于允许的时间戳偏差的错误,并引用了下面复制的文档。 getAllowedT
我有一个大型数据文件 (1 TB) 的数据要导入 BigQuery。每行包含一个键。在导入数据并创建我的 PCollection 以导出到 BigQuery 时,我想确保我不会基于此键值导入重复记录。
我正在通过 Python API 在 Dataflow 上使用 Apache Beam 从 Bigquery 读取数据,对其进行处理,然后将其转储到 Datastore 接收器中。 不幸的是,作业经常
我一直在研究使用 spring-cloud-dataflow 中的 spring-cloud-task 构建的项目。查看示例项目和文档后,似乎表明任务是通过仪表板或 shell 手动启动的。 spri
我有以下场景: 管道 A 在 BigQuery 中查找表 A,进行一些计算并返回列名列表。 此列名称列表用作管道 B 输出的 BigQuery 架构。 您能否让我知道实现这一目标的最佳选择是什么? 管
我是一名优秀的程序员,十分优秀!