gpt4 book ai didi

java - 数据流 : string to pubsub message

转载 作者:行者123 更新时间:2023-12-05 09:11:56 25 4
gpt4 key购买 nike

我正在尝试在 Dataflow 中进行单元测试。

对于该测试,在开始时,我将从一个简单的硬编码字符串开始。

问题是我需要将该字符串转换为发布订阅消息。我得到了以下代码来做到这一点:

    // Create a PCollection from string a transform to pubsub message format
PCollection<PubsubMessage> input = p.apply("input string", Create.of("test" +
""))
.apply("convert to Pub/Sub message", ParDo.of(new DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new PubsubMessage(c.element().getBytes(), null));
}
}));

但是我得到以下错误:

 java.lang.IllegalArgumentException: unable to serialize DoFnWithExecutionInformation{doFn=com.xxx.pipeline.TesterPipeline$1@7b64240d, mainOutputTag=Tag<output>, sideInputMapping={}, schemaInformation=DoFnSchemaInformation{elementConverters=[]}}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:55)
<...>
Caused by: java.io.NotSerializableException: com.xxx.pipeline.TesterPipeline
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
... 50 more

我应该如何从字符串创建 pubsub 消息?

最佳答案

Beam Programming Guide在用户 ParDos 的可序列化要求下,它提到了这一点:

Take care when declaring your function object inline by using an anonymous inner class instance. In a non-static context, your inner class instance will implicitly contain a pointer to the enclosing class and that class’ state. That enclosing class will also be serialized, and thus the same considerations that apply to the function object itself also apply to this outer class.

发生的事情是您的匿名 DoFn 隐式包含一个指向您正在其中构建管道的类的指针,这导致了此序列化失败。您可以通过使您的 DoFn 成为命名子类而不是匿名子类来避免这种情况:

public class MyDoFn extends DoFn<String, PubsubMessage>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(new PubsubMessage(c.element().getBytes(), null));
}
}

关于java - 数据流 : string to pubsub message,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59375772/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com