gpt4 book ai didi

java - 如何在 Apache Beam Java 中将 TestStreams 与多输出类一起使用

转载 作者:行者123 更新时间:2023-11-28 20:49:07 26 4
gpt4 key购买 nike

我正在尝试编写一个 Beam Streaming 管道,它只是从 PubSub 队列中读取数据,解析数据并写入两个 BigQuery 表之一。因此,代码利用侧输出从 DoFn 中写入两个表之一。我遇到以下错误消息:java.lang.IllegalArgumentException:无法序列化 DoFnAndMainOutput{doFn=com.pipeline.PubSubToBigQuery$ParsePubSubMessage@50eca7c6, mainOutputTag=Tag}。我将在下面附上完整的错误消息、DoFn 和测试类:

DoFn:

public static class PubsubMessageToTableRow
extends PTransform<PCollection<PubsubMessage>, PCollectionTuple> {

private TupleTag<TableRow> soundEventReadings;
private TupleTag<TableRow> doorEventReadings;

public PubsubMessageToTableRow(TupleTag<TableRow> soundEventReadings, TupleTag<TableRow> doorEventReadings){
this.soundEventReadings = soundEventReadings;
this.doorEventReadings = doorEventReadings;
}

@Override
public PCollectionTuple expand(PCollection<PubsubMessage> input) {

return input.apply("Parse PubSub Message",
ParDo.of(new ParsePubSubMessage(soundEventReadings, doorEventReadings))
.withOutputTags(soundEventReadings, TupleTagList.of(doorEventReadings)));

}
}

/**
* Parse the PubSub Json message and create either a Door or Sound Event. Then create a TableRow object from the
* event objects.
*/
public static class ParsePubSubMessage extends DoFn<PubsubMessage, TableRow> {

private final TupleTag<TableRow> soundEventReadings;
private final TupleTag<TableRow> doorEventReadings;

public ParsePubSubMessage(TupleTag<TableRow> soundEventReadings,
TupleTag<TableRow> doorEventReadings) {
this.soundEventReadings = soundEventReadings;
this.doorEventReadings = doorEventReadings;
}


@ProcessElement
public void processElement(ProcessContext c, MultiOutputReceiver out) throws IOException{
PubsubMessage message = c.element();
String jsonString = new String(message.getPayload(), StandardCharsets.UTF_8);
ObjectMapper mapper = new ObjectMapper();

// A DoorEvent can either be a door open/close event signified with #door# in the SensorName or can
// be a motion event from the motion sensor next to the door signified with #motion# in the SensorName
if (jsonString.contains("#door#") || jsonString.contains("#motion#")){
DoorEvent doorEvent = mapper.readValue(jsonString, DoorEvent.class);
out.get(doorEventReadings).output(doorEvent.toTableRow());
} else if (jsonString.contains("noiseFloor")){
SoundEvent soundEvent = mapper.readValue(jsonString, SoundEvent.class);
out.get(soundEventReadings).output(soundEvent.toTableRow());
}
}
}

测试


@Rule
public final transient TestPipeline testPipeline = TestPipeline.create();

@Test
public void testPubsubMessageToTableRow() throws IOException{
String jsonDoorEvent = "{\"EventID\":\"12\",\"HomeID\":" +
"\"22222\",\"SmartThingsSensorName\":" +
"\"sa#door#1#front_door\",\"State\":\"closed\",\"Label\":false," +
"\"HasBeenLabelled\":false,\"EventTime\":\"2019-01-09T12:22:22Z\",\"CreateDate\":" +
"\"2019-01-09T15:17:00Z\",\"ModifyDate\":\"2019-01-09T15:17:00Z\"}";

TableRow door = doorEvent.toTableRow();

TableRow outputDoorRow = new TableRow().set("EventID", "12")
.set("HomeID", "22222")
.set("SmartThingsSensorName", "sa#door#1#front_door")
.set("State", "closed")
.set("Label", false)
.set("HasBeenLabelled", false)
.set("EventTime", "2019-01-09T12:22:22Z")
.set("CreateDate", "2019-01-09T15:17:00Z")
.set("ModifyDate", "2019-01-09T15:17:00Z");

Map<String, String> attributes = new HashMap<>();
attributes.put("eventTime", "2019-01-09T12:22:22Z");
PubsubMessage messageDoor = new PubsubMessage(jsonDoorEvent.getBytes(), attributes);

final TupleTag<TableRow> doorEventReadings = new TupleTag<TableRow>(){};

TestStream<PubsubMessage> createEvent =
TestStream.create(PubsubMessageWithAttributesCoder.of())
.addElements(messageDoor).advanceWatermarkToInfinity();

PCollectionTuple tuple = testPipeline
.apply("Create Stream", createEvent)
.apply("Parse pipeline",
new PubSubToBigQuery.PubsubMessageToTableRow(soundEventReadings, doorEventReadings));

PCollection<TableRow> doorEventOutput = tuple.get(doorEventReadings);
PAssert.that(doorEventOutput).containsInAnyOrder(outputDoorRow);

testPipeline.run().waitUntilFinish();
}

堆栈跟踪:



java.lang.IllegalArgumentException: unable to serialize DoFnAndMainOutput{doFn=com.pipeline.PubSubToBigQuery$ParsePubSubMessage@50eca7c6, mainOutputTag=Tag<output>}

at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:57)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateDoFn(ParDoTranslation.java:462)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$1.translateDoFn(ParDoTranslation.java:160)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.payloadForParDoLike(ParDoTranslation.java:695)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.translateParDo(ParDoTranslation.java:156)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation$ParDoPayloadTranslator.translate(ParDoTranslation.java:111)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformTranslation.toProto(PTransformTranslation.java:206)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.getParDoPayload(ParDoTranslation.java:547)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ParDoTranslation.isSplittable(ParDoTranslation.java:557)
at org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.PTransformMatchers$4.matches(PTransformMatchers.java:194)
at org.apache.beam.sdk.Pipeline$2.visitPrimitiveTransform(Pipeline.java:278)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:668)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:660)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$600(TransformHierarchy.java:311)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:245)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:458)
at org.apache.beam.sdk.Pipeline.replace(Pipeline.java:256)
at org.apache.beam.sdk.Pipeline.replaceAll(Pipeline.java:209)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:170)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:348)
at org.apache.beam.sdk.testing.TestPipeline.run(TestPipeline.java:329)
at com.ecobee.hm_occupancy_data_pipeline.PubSubToBigQueryTest.testPubsubMessageToTableRow(PubSubToBigQueryTest.java:118)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.apache.beam.sdk.testing.TestPipeline$1.evaluate(TestPipeline.java:317)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.io.NotSerializableException: com.pipeline.PubSubToBigQueryTest
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
... 48 more

最佳答案

问题在日志中:java.io.NotSerializableException: com.pipeline.PubSubToBigQueryTest。使您的测试实现 Serializable,这应该可以解决它。或者尝试将所有 DoFns 和其他内联功能移动到单独的可序列化类中。

关于java - 如何在 Apache Beam Java 中将 TestStreams 与多输出类一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54184013/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com