- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在构建一个示例数据流管道,主要基于以下代码 https://cloud.google.com/dataflow/java-sdk/combine
但是当我运行代码时,遇到以下异常:
Exception in thread "main" java.lang.IllegalArgumentException: unable to serialize com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$TestCombineDoFn@139982de at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51) at com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:81) at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.ensureSerializable(DirectPipelineRunner.java:784) at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1025) at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:963) at com.google.cloud.dataflow.sdk.transforms.ParDo.access$000(ParDo.java:441) at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:951) at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:946) at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:611) at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:200) at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196) at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196) at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196) at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196) at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:109) at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:204) at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:584) at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:328) at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:70) at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:145) at com.google.cloud.dataflow.examples.CalcMeanExample.main(CalcMeanExample.java:50) Caused by: java.io.NotSerializableException: org.apache.avro.io.DecoderFactory at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:47) ... 20 more
我的代码如下:
package com.google.cloud.dataflow.examples;
import java.io.Serializable;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.AvroCoder;
import com.google.cloud.dataflow.sdk.coders.DefaultCoder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.io.TextIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.DefaultValueFactory;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.Combine;
import com.google.cloud.dataflow.sdk.transforms.Combine.CombineFn;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import com.google.cloud.dataflow.sdk.values.PCollection;
public class CalcMeanExample
{
public static void main(String[] args)
{
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
PCollection<String> numbers = p.apply(TextIO.Read.named("ReadLines").withCoder(StringUtf8Coder.of()).from(options.getInput()));
numbers.apply( ParDo.of( new DoFn<String,String>(){
@Override
public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
System.out.println( c.element() );
}
}));
PCollection<String> average = numbers.apply( Combine.globally( new AverageFn()));
average.apply(TextIO.Write.named("WriteAverage")
.to(options.getOutput())
.withNumShards(options.getNumShards()));
p.run();
System.out.println( "done" );
}
public static class AverageFn extends CombineFn<String, AverageFn.Accum, String> {
@DefaultCoder(AvroCoder.class)
public static class Accum implements Serializable {
int sum = 0;
int count = 0;
}
public Accum createAccumulator() { return new Accum(); }
public void addInput(Accum accum, String input) {
accum.sum += Integer.parseInt(input );
accum.count++;
}
public Accum mergeAccumulators(Iterable<Accum> accums) {
Accum merged = createAccumulator();
for (Accum accum : accums) {
merged.sum += accum.sum;
merged.count += accum.count;
}
return merged;
}
public String extractOutput(Accum accum) {
return Double.toString( ((double) accum.sum) / accum.count );
}
}
/**
* Options supported by {@link WordCount}.
* <p>
* Inherits standard configuration options.
*/
public static interface Options extends PipelineOptions {
@Description("Path of the file to read from")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInput();
void setInput(String value);
@Description("Path of the file to write to")
@Default.InstanceFactory(OutputFactory.class)
String getOutput();
void setOutput(String value);
/**
* Returns gs://${STAGING_LOCATION}/"sorts.txt" as the default destination.
*/
public static class OutputFactory implements DefaultValueFactory<String> {
@Override
public String create(PipelineOptions options) {
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
if (dataflowOptions.getStagingLocation() != null) {
return GcsPath.fromUri(dataflowOptions.getStagingLocation())
.resolve("sorts.txt").toString();
} else {
throw new IllegalArgumentException("Must specify --output or --stagingLocation");
}
}
}
/**
* By default (numShards == 0), the system will choose the shard count.
* Most programs will not need this option.
*/
@Description("Number of output shards (0 if the system should choose automatically)")
@Default.Integer(1)
int getNumShards();
void setNumShards(int value);
}
}
有什么想法会导致这种情况吗?
最佳答案
我们已意识到此问题,并正在努力修复,预计很快就会推出。
目前,您应该能够使用 SerializedCoder 而不是 AvroCoder 作为累加器。
关于java - Google Cloud Dataflow 管道中的 NotSerializedException : org. apache.avro.io.DecoderFactory,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28371702/
我的问题是,在幕后,对于元素级 Beam DoFn (ParDo),Cloud Dataflow 的并行工作负载如何?例如,在我的 ParDO 中,我向外部服务器发送一个 http 请求以获取一个元素
就 Google Cloud 上 Dataflow 的 HA 而言,最佳架构是什么?我的工作负载在两个区域运行。数据流从一个多区域存储桶中读取并将结果写出到另一个多区域存储桶中。 为了实现高可用性(以
如图 here数据流管道由固定的 DAG 表示。我想知道是否有可能实现一个管道,在该管道中处理继续进行,直到基于到目前为止计算的数据满足动态评估的条件。 这是一些伪代码来说明我想要实现的内容:
在旧的定价页面上,他们提到 Cloud Dataflow 工作人员使用的所有 Google Compute 实例都是根据持续使用价格规则计费的,但新的定价页面不再提及。 我假设由于它在内部使用相同的
批处理 Dataflow 作业处理完所有数据后是否可以执行操作?具体来说,我想将管道刚刚处理的文本文件移动到不同的 GCS 存储桶。我不确定将它放在我的管道中的哪个位置以确保它在数据处理完成后执行一次
我希望能够通过自定义键使用分组,但这是我目前的尝试, 我们为 KV 对象的键使用自定义类,因为我们希望 GroupBy 具有更复杂的条件,而不是使用 String 等进行简单的键匹配。 ```
当尝试在 Dataflow 服务上运行管道时,我在命令行上指定了暂存和临时存储桶(在 GCS 中)。当程序执行时,我在管道运行之前收到一个 RuntimeException,根本原因是我在路径中遗漏了
我试图找到一种优雅地结束我的工作的方法,以免丢失任何数据,从 PubSub 流式传输并写入 BigQuery。 我可以设想的一种可能方法是让作业停止提取新数据,然后运行直到它处理完所有内容,但我不知道
问题: 使用 Cloud Dataflow 时,我们会看到 2 个指标(请参阅 this page): 系统延迟 数据新鲜度 这些在 Stackdriver 中也可用以下名称(摘自 here): sy
我一直在阅读 Dataflow SDK 文档,试图找出当数据到达流作业中的水印时会发生什么。 这一页: https://cloud.google.com/dataflow/model/windowin
有没有办法(或任何类型的黑客)从压缩文件中读取输入数据? 我的输入包含数百个文件,这些文件是用 gzip 压缩生成的,解压缩它们有些乏味。 最佳答案 Dataflow 现在支持从压缩文本源中读取(从
我正在尝试在 Dataflow 中执行联合操作。是否有用于在 Dataflow 中合并两个 PCollections 的示例代码? 最佳答案 一个简单的方法是像这样将 Flatten() 与 Remo
在我的管道上运行“更新”后,我注意到有新创建的永久磁盘在 10 多分钟后未附加到任何实例。 最佳答案 这是 Dataflow 服务的一个持续已知问题,会在管道更新过程中导致孤立磁盘。可以安全地删除这些
是否可以为 Dataflow 工作人员提供自定义包? 我想从计算内部输出到 Debian 打包的二进制文件。 编辑:需要明确的是,包配置非常复杂,仅将文件捆绑在 --filesToStage 中是不可
我想使用 Google Cloud Dataflow 创建 session 窗口,如 dataflow model paper 中所述。 .我想将我的未绑定(bind)数据发送到 Pub/Sub,然后
我正在尝试运行从 pubsub 主题读取并写入 bigquery 的管道。时间戳是从主题消息中解析出来的。但是,我收到了一条关于允许的时间戳偏差的错误,并引用了下面复制的文档。 getAllowedT
我有一个大型数据文件 (1 TB) 的数据要导入 BigQuery。每行包含一个键。在导入数据并创建我的 PCollection 以导出到 BigQuery 时,我想确保我不会基于此键值导入重复记录。
我正在通过 Python API 在 Dataflow 上使用 Apache Beam 从 Bigquery 读取数据,对其进行处理,然后将其转储到 Datastore 接收器中。 不幸的是,作业经常
我一直在研究使用 spring-cloud-dataflow 中的 spring-cloud-task 构建的项目。查看示例项目和文档后,似乎表明任务是通过仪表板或 shell 手动启动的。 spri
我有以下场景: 管道 A 在 BigQuery 中查找表 A,进行一些计算并返回列名列表。 此列名称列表用作管道 B 输出的 BigQuery 架构。 您能否让我知道实现这一目标的最佳选择是什么? 管
我是一名优秀的程序员,十分优秀!