- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我的管道的这一部分应该接受输入,对其应用适当的元组标记,然后根据它接收到的标记对输入进行进一步处理。
当运行下面的代码时,来自主标签 (tag1) 的 PCollection 工作正常。但是,附加标签 (tag2, tag3) 将在 .apply()
上抛出此错误:
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Assign Output.out1 [PCollection]. Correct one of the following root causes: No Coder has been manually specified; you may do so using .setCoder(). Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for V.
为什么这个错误出现在tag2上而不出现在tag1上?请注意,如果我将 tag2 作为主要输出,将 tag1/tag3 作为附加输出并适本地重新排序代码,则 tag2 处理成功,但 tag1/tag3 将抛出错误。
主要管道:
PCollectionTuple pct = outputPair.apply("Assign Output", ParDo.of( new output())
.withOutputTags(output.tag1, TupleTagList.of(output.tag2).and(output.tag3)));
//Tag1 Output
PCollection<KV<String, outResultPair>> tagPair1 = pct.get(output.tag1)
.apply("Process", ParDo.of( new ABCOutput()))
//Tag2 Output
PCollection<KV<String, outResultPair>> tagPair2 = pct.get(output.tag2)
.apply("Process", ParDo.of( new DEFOutput())) //Error Thrown here
支持类:
//ABCOutput Class
@DefaultCoder(AvroCoder.class)
public class ABCOutput extends DoFn<KV<String, inResultPair>, KV<String, outResultPair>> {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, inResultPair> e = c.element();
c.output( processInput(e) );
}
}
//XYZOutput Class
@DefaultCoder(AvroCoder.class)
public class XYZOutput extends DoFn<KV<String, inResultPair>, KV<String, outResultPair>> {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, inResultPair> e = c.element();
c.output( processInput(e) );
}
}
//Output Splitter
@DefaultCoder(AvroCoder.class)
public class output {
private final static Logger LOG = LoggerFactory.getLogger(OutputHandler.class);
final static TupleTag<KV<String,inResultPair>> tag1 = new TupleTag();
final static TupleTag<KV<String,inResultPair>> tag2 = new TupleTag();
final static TupleTag<KV<String,inResultPair>> tag3 = new TupleTag();
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, inResultPair> e = c.element();
KV<String, outResultPair> out = process(e);
switch(e.getValue().type){
case 1:
c.output(tag1, out);
break;
case 2:
c.output(tag2, out);
break;
case 3:
c.output(tag3, out);
break;
}
c.output();
}
}
最佳答案
您需要以一种方式构造 TupleTag
,以便它们的类型信息将由 Java 编译器保留,而目前您正在将它们构造为原始类型,因此 Beam 的编码器推断不会不知道输出到这个标签中的元素是什么类型。
改变:
final static TupleTag<KV<String,inResultPair>> tag1 = new TupleTag();
到:
final static TupleTag<KV<String,inResultPair>> tag1 =
new TupleTag<KV<String, inResultPair>>() {};
{}
对于在此处保留类型信息至关重要。
关于java - 处理多个 PCollection 输出时找不到编码器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48430306/
在我进行一些处理和按键分组后,我得到了一个如下所示的数据集。我现在需要对每一行数据进行一些处理以获得下面的输出。我试过 flatmap 它真的很慢,因为“值”列表的长度可以任意长。我想我可以将每一行拆
我有一个无界集合,它是从 PubsubIO 中读取数据的,名为 Trade格式如 { timestamp: 123, type: "", side: "" // sell or buy
我对 Google Cloud Platform 还很陌生,这是我第一次在研究生类(class)的项目中尝试使用 Google Dataflow。我想要做的是编写一个自动加载作业,从我的云存储上的某个
没有关于如何将 pCollections 转换为输入到 .CoGroupByKey() 所需的 pCollections 的文档 语境 基本上我有两个大的 pCollections,我需要能够找到两者
我正在尝试实现一个数据管道,它连接来自 Kafka 主题的多个无限源。我能够连接到主题并获取数据为 PCollection我需要将其转换为 PCollection 。我将逗号分隔的字符串拆分为数组,并
我有一个用例,我需要从 DoFn 输出多个 T。所以 DoFn 函数返回一个 PCollection> 。我想将它转换为 PCollection 以便稍后在管道中我可以像这样过滤: PCollecti
我有两个 P-Collection,如下所示 P1 = ['H','E','L','L','O','W','O','R','L','D'] P2 = ['W','E','L','C','O','M',
Beam 中是否有可能有一个 PCollection 来“保存”同一父类(super class)的不同对象,使得 PCollection result = input.apply(ParDo.of
我有两个 bigquery 表。 表A c_id count_c_id p_id 表B id c_name p_type c_id 根据表 A 中的列,我需要使用 DF 管道从表 B
假设我有一个类型为 KV 的有界 PCollection p 。假设 p 无法放入内存,因此不能成为 DoFn 的侧面输入. 示例p: ("foo", 0) ("bar", 1) ("baz", 2)
我正在尝试使用 Apache Beam 中的 Flatten 函数添加 header 。但是,似乎没有办法根据文档设置顺序:https://beam.apache.org/documentation/
我想将时间戳设置为无限的字符串集合 在我的解决方案中,pcollection 的每一行都是一行 csv 这一行的一个字段中有一个时间戳和其他字段,例如点击次数等。 我想根据它自己的时间戳(事件时间)而
我想要 Pcollection 中的列表值。 PCollection> lst = bqT2.apply(ParDo.of(new UserId())); // line 1 List myL
我有一个用 Dataflow 编写的批处理管道。我想实现以下数据连接。 我有两个 PCollection。首先是代表 session : class Session{ String id
我的管道的这一部分应该接受输入,对其应用适当的元组标记,然后根据它接收到的标记对输入进行进一步处理。 当运行下面的代码时,来自主标签 (tag1) 的 PCollection 工作正常。但是,附加标签
我正在使用 Apache-Beam 和 Python SDK。 目前,我的管道读取多个文件,解析它们并从其数据生成 pandas 数据帧。然后,它将它们分组到一个单个数据帧中。 我现在想要的是检索这个
我正在尝试根据交易数据生成简单的客户摘要。例如,给定目标交易类型,发生了多少笔交易以及总金额是多少? 原始输入示例: custid desc amount 111 coffee
使用 CoGroupsByKey 方法加入两个 Pcollection 需要数小时才能执行超过 8 百万条记录。从另一个 stackoverflow 帖子中注意到 CoGbkResult has mo
我正在使用 Apache Beam 读取数据流并写入 BigQuery,在 tableA 中.我的行存储在 类型的数据集中. 目前我正在按原样读取行并写入表。但是我想根据 timestamp 过滤行
我有一个 PCollection,并且希望在验证条件时应用自定义 PTransform。(该条件不依赖于Pcollection内容) 示例:我有日志,如果 PipelineOptions 中提供了日期
我是一名优秀的程序员,十分优秀!