gpt4 book ai didi

java - 数据流SDK 2.x : how to consume from PubSubIO using java serialization

转载 作者:行者123 更新时间:2023-12-02 02:14:17 24 4
gpt4 key购买 nike

我是 Dataflow 新手,我要将以下代码片段从 Java SDK 1.9.0 迁移到 2.3.0:

//SDK 1.9.0
PCollection<MyType> pubsub = p.apply(
PubsubIO.Read.named("Read from Pubsub")
.topic(myTopic)
.withCoder(SerializableCoder.of(MyType.class))
.timestampLabel("myDate"));

我会将其转换为

//SDK 2.3.0
PCollection<MyType> pubsub = p.apply("Read from Pubsub",
PubsubIO.<MyType>read () // <-- COMPILE ERROR here, private method
.fromTopic(myTopic)
.withTimestampAttribute ("myDate"))
.setCoder(SerializableCoder.of(MyType.class));

但从 java SDK 2.3.0 开始,PubsubIO.read() 方法是私有(private)的。

因此,我需要使用带有 MyType 序列化实例的消息,但 PubsubIO 公开的方法似乎仅适用于短信、avro、protobuf 等。

如何从消息包含序列化 Java 对象的 PubsubIO 主题中读取内容?

更新:

我可以这样调整它(尚未尝试)...

PCollection<MyType> pubsub = p.apply("Read from Pubsub",
PubsubIO.readMessagesWithAttributes ()
.fromTopic(myTopic)
.withTimestampAttribute ("myDate"))
.apply (MapElements.via(new SimpleFunction<PubsubMessage, MyType> () {
@Override
public MyType apply (final PubsubMessage message) {
final byte[] payload = message.getPayload ();
try {
try (final ObjectInputStream stream = new ObjectInputStream (new ByteArrayInputStream (payload))) {
return (MyType) stream.readObject ();
}
} catch (IOException e) {
throw new RuntimeException (e);
} catch (ClassNotFoundException e) {
throw new RuntimeException (e);
}
}
}))

最佳答案

您更新的代码看起来应该可以工作。请注意,如果您不使用属性映射,还有 PubsubIO.readPubsubMessagesWithoutAttributes()

之前的功能已在 PR#2634 中删除,它将其替换为最常见编码类型(proto、avro、字符串)的专用方法。

我怀疑由于依赖 Java 序列化的固有危险,未保留通过 SerializedCoder 进行的任意对象解码。请参阅SerializableCoder javadoc 或相关问题Java serialization - advantages and disadvantages, use or avoid? 。但是,如果您觉得 API 有所欠缺,Beam SDK 是开源的,社区欢迎 contributions .

关于java - 数据流SDK 2.x : how to consume from PubSubIO using java serialization,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49510560/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com