- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在尝试将 PTransform 应用于 PCollectionTuple,但无法弄清楚编译器为什么会提示。
我想这样做是为了将连接某些 csv 行所需的多个步骤抽象为单个 PTransform(PCollectionTuple 中的每个 PCollection 包含要连接的 csv 行),我遇到的问题不在于连接本身,而是如何将 PTransform 应用于 PCollectionTuple。
这是我的代码:
static class JoinCsvLines extends DoFn<PCollectionTuple, String[]> {
@ProcessElement
public void processElement(ProcessContext context) {
PCollectionTuple element = context.element();
// TODO: Implement the output
}
}
我这样称呼 PTransform:
TupleTag<String[]> tag1 = new TupleTag<>();
TupleTag<String[]> tag2 = new TupleTag<>();
PCollectionTuple toJoin = PCollectionTuple.of(tag1, csvLines1).and(tag2, csvLines2);
// Can't compile this line
PCollection<String[]> joinedLines = toJoin.apply("JoinLines", ParDo.of(new JoinCsvLines()));
当我将鼠标悬停在未编译的行上方时,IntelliJ IDEA 会输出以下内容:
Required type:
PTransform
<? super PCollectionTuple,
OutputT>
Provided:
SingleOutput
<PCollectionTuple,
String[]>
reason: no instance(s) of type variable(s) InputT exist so that PCollectionTuple conforms to PCollection<? extends InputT>
如何将 PTransform 应用于 PCollectionTuple?
最佳答案
DoFn<PCollectionTuple, String[]>
意味着您想要为每个记录应用“DoFn”,因此您不应该使用 PCollectionTuple 作为输入类型。相反,您应该使用“csvLines1”和“csvLines2”的类型。
如果您的目的是合并两个 PCollection,您可以选中 Flatten 变换:https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java#L41
关于java - 如何将 DoFn PTransform 应用于 Apache Beam 中的 PCollectionTuple,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60491804/
这两个注释有什么区别? DoFn.Setup用于准备处理元素束的实例的方法的注释。 使用单词“bundle”,接受零个参数。 DoFn.StartBundle用于准备处理一批元素的实例的方法的注释。
在 Beam 2.3.0 中,DoFn 进程 API 需要通过 yield 返回一个生成器,而在 Beam 2.4.0 中,从进程 API 返回一个对象,如字典工作正常。 编程模型从 2.3.0 到
我正在尝试通过在 DoFn 步骤中查询数据存储来增强管道中的数据。来自类 CustomClass 的对象的字段用于对数据存储区表执行查询,返回的值用于增强对象。 代码如下所示: public clas
我想通过 DoFn 为在 Dataflow 上运行的 Apache Beam 管道发出 POST 请求。 为此,我创建了一个客户端,它实例化了一个在 PoolingHttpClientConnecti
我正在开始一个从 AWS Kinesis 读取数据的 Beam 项目,因此我有一个简单的 DoFn 来接受 KinesisRecord 并记录内容。我想编写一个单元测试来运行这个 DoFn 并证明它有
如果我的类扩展了 DoFn,如何访问侧输入的元素? 例如: 假设我有一个 ParDo 变换,如: PCollection data = myData.apply("Get data", Par
在阅读有关使用 Java 处理 Apache Beam 中的流媒体元素时,我遇到了 DoFn然后穿过SimpleFunction . 这两个看起来都与我相似,我发现很难理解其中的区别。 有人可以用外行
我正在尝试从 DoFn 方法获得两个输出,以下是 Apache Beam programming guide 的示例 基本上在示例中你传递一个 TupleTag 然后指定输出到哪里,这对我有用问题是我
我正在编写一段使用 org.apache.beam.sdk.state.MapState 的数据流转换实现缓存功能。然而在介绍MapState ,单元测试开始功能障碍。异常显示:java.lang.U
当我运行我的 Dataflow 管道时,我收到以下异常,提示我的 DoFn 无法序列化。我该如何解决? 这是堆栈跟踪: Caused by: java.lang.IllegalArgumentExce
我正在使用 Apache Beam 和 Kotlin 构建一个简单的 ETL 管道,我正在尝试创建一种 Either类型: @DefaultCoder(SerializableCoder::class
在Beam Python DoFn中进行昂贵的一次性初始化的推荐方法是什么? Java SDK具有DoFn.Setup,但在Beam Python中似乎没有等效项。 当前是DoFn初始化程序中atta
上下文 我正在使用一个流管道,它在 pubsub 中有一个 protobuf 数据源。我希望将此 protobuf 解析为 python 字典,因为数据接收器要求输入是字典的集合。我通过初始化proc
我知道有像 CRUNCH_BYTES_PER_REDUCE_TASK 或 mapred.reduce.tasks 这样的属性来设置 reducer 的数量。 任何人都可以建议为需要更多时间执行的特定
我有一个管道从 GCS 读取文件通过Pub\Sub , class ExtractFileNameFn(beam.DoFn): def process(self, element):
我正在尝试将 PTransform 应用于 PCollectionTuple,但无法弄清楚编译器为什么会提示。 我想这样做是为了将连接某些 csv 行所需的多个步骤抽象为单个 PTransform(P
有什么方法可以在 2 个 Dofn 之间创建依赖关系,以便它等待第一个 Dofn 方法完成,然后运行第二个 Dofn 方法。 只是想知道我们如何实现这个用例。 最佳答案 可能有一种更简洁的方法可以做到
我正在尝试使用 Google Cloud Dataflow 将 Google PubSub 消息写入 Google Cloud Storage。我知道 TextIO/AvroIO 不支持流管道。但是,
我是一名优秀的程序员,十分优秀!