- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在从谷歌数据流中获取低于 DATA_LOSS 的异常。我有 10-15 个 Json 文件(每个文件大约 2-3 MB)。我正在使用 jackson2 解析文件,使用 ParDo() 进行一些转换,最后进行分组以删除重复项。如果我做错了什么,你能帮忙吗?
它与 DirectPipelineRunner 一起工作正常。
2016-05-11T13:06:31.277Z: Detail: (eb15ba3070c2acbc): Checking required Cloud APIs are enabled.
2016-05-11T13:06:31.637Z: Detail: (eb15ba3070c2abc7): Expanding GroupByKey operations into optimizable parts.
2016-05-11T13:06:31.640Z: Detail: (eb15ba3070c2a6b5): Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
2016-05-11T13:06:31.646Z: Detail: (eb15ba3070c2a77f): Annotating graph with Autotuner information.
2016-05-11T13:06:31.732Z: Detail: (eb15ba3070c2a5c0): Fusing adjacent ParDo, Read, Write, and Flatten operations
2016-05-11T13:06:31.735Z: Detail: (eb15ba3070c2a0ae): Fusing consumer ParDo(ParserEdition) into ReadEditions4GCS
2016-05-11T13:06:31.737Z: Detail: (eb15ba3070c2ab9c): Fusing consumer ParDo(GetRelatedArticles) into ParDo(FlattenArticles)
2016-05-11T13:06:31.739Z: Detail: (eb15ba3070c2a68a): Fusing consumer GroupByKey/GroupByWindow into GroupByKey/Read
2016-05-11T13:06:31.741Z: Detail: (eb15ba3070c2a178): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/Ungroup into Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/GroupByWindow
2016-05-11T13:06:31.743Z: Detail: (eb15ba3070c2ac66): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/GroupByWindow into Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Read
2016-05-11T13:06:31.745Z: Detail: (eb15ba3070c2a754): Fusing consumer Write2Gcs/Write2Gcs into Write2Gcs/FileBasedSink.ReshardForWrite/Ungroup
2016-05-11T13:06:31.747Z: Detail: (eb15ba3070c2a242): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Write into Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Reify
2016-05-11T13:06:31.750Z: Detail: (eb15ba3070c2ad30): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Reify into Write2Gcs/FileBasedSink.ReshardForWrite/RandomKey
2016-05-11T13:06:31.752Z: Detail: (eb15ba3070c2a81e): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/RandomKey into Write2Gcs/FileBasedSink.ReshardForWrite/Window.Into()
2016-05-11T13:06:31.754Z: Detail: (eb15ba3070c2a30c): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/Window.Into() into ParDo(Article2CSV)
2016-05-11T13:06:31.757Z: Detail: (eb15ba3070c2adfa): Fusing consumer ParDo(Article2CSV) into AnonymousParDo
2016-05-11T13:06:31.759Z: Detail: (eb15ba3070c2a8e8): Fusing consumer GroupByKey/Write into GroupByKey/Reify
2016-05-11T13:06:31.761Z: Detail: (eb15ba3070c2a3d6): Fusing consumer AnonymousParDo into GroupByKey/GroupByWindow
2016-05-11T13:06:31.763Z: Detail: (eb15ba3070c2aec4): Fusing consumer GroupByKey/Reify into ParDo(Article2KV)
2016-05-11T13:06:31.765Z: Detail: (eb15ba3070c2a9b2): Fusing consumer ParDo(FlattenArticles) into ParDo(ParserEdition)
2016-05-11T13:06:31.768Z: Detail: (eb15ba3070c2a4a0): Fusing consumer ParDo(Article2KV) into ParDo(GetRelatedArticles)
2016-05-11T13:06:31.815Z: Basic: (eb15ba3070c2aa26): Worker configuration: n1-standard-1 in us-central1-f.
2016-05-11T13:06:32.154Z: Detail: (eb15ba3070c2a931): Adding StepResource setup and teardown to workflow graph.
2016-05-11T13:06:32.262Z: Basic: (120e40c18a94ee3a): Starting 3 workers...
2016-05-11T13:06:32.272Z: Basic: S01: (b31e9392dace1359): Executing operation GroupByKey/Create
2016-05-11T13:06:32.504Z: Basic: S02: (27044e90035e1dd6): Executing operation ReadEditions4GCS+ParDo(ParserEdition)+ParDo(FlattenArticles)+ParDo(GetRelatedArticles)+ParDo(Article2KV)+GroupByKey/Reify+GroupByKey/Write
2016-05-11T13:07:11.352Z: Detail: (e26d7dfd74bb5700): Workers have started successfully.
2016-05-11T13:07:23.464Z: Error: (91724060ab73dbcb): java.io.IOException: DATA_LOSS: Inconsistent number of records, parsed 108, expected 109 when dataflow-articlemetadatapipeline-g-05110606-31f5-harness-cmwd talking to tcp://localhost:12345
at com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write(Native Method)
at com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.writeChunk(ChunkingShuffleEntryWriter.java:72)
at com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.close(ChunkingShuffleEntryWriter.java:66)
at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.close(ShuffleSink.java:272)
at com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.finish(WriteOperation.java:100)
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:77)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:254)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:191)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:144)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.doWork(DataflowWorkerHarness.java:180)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:161)
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:148)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
如果我多次运行相同的代码。我也确实得到了稍微不同的异常
2016-05-11T13:00:27.649Z: Detail: (7ad6fdbb36cc3e7a): Checking required Cloud APIs are enabled.
2016-05-11T13:00:27.994Z: Detail: (7ad6fdbb36cc3ed9): Expanding GroupByKey operations into optimizable parts.
2016-05-11T13:00:27.998Z: Detail: (7ad6fdbb36cc350f): Lifting ValueCombiningMappingFns into MergeBucketsMappingFns
2016-05-11T13:00:28.009Z: Detail: (7ad6fdbb36cc37b1): Annotating graph with Autotuner information.
2016-05-11T13:00:28.106Z: Detail: (7ad6fdbb36cc356e): Fusing adjacent ParDo, Read, Write, and Flatten operations
2016-05-11T13:00:28.110Z: Detail: (7ad6fdbb36cc3ba4): Fusing consumer ParDo(ParserEdition) into ReadEditions4GCS
2016-05-11T13:00:28.112Z: Detail: (7ad6fdbb36cc31da): Fusing consumer ParDo(GetRelatedArticles) into ParDo(FlattenArticles)
2016-05-11T13:00:28.114Z: Detail: (7ad6fdbb36cc3810): Fusing consumer GroupByKey/GroupByWindow into GroupByKey/Read
2016-05-11T13:00:28.117Z: Detail: (7ad6fdbb36cc3e46): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/Ungroup into Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/GroupByWindow
2016-05-11T13:00:28.120Z: Detail: (7ad6fdbb36cc347c): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/GroupByWindow into Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Read
2016-05-11T13:00:28.124Z: Detail: (7ad6fdbb36cc3ab2): Fusing consumer Write2Gcs/Write2Gcs into Write2Gcs/FileBasedSink.ReshardForWrite/Ungroup
2016-05-11T13:00:28.127Z: Detail: (7ad6fdbb36cc30e8): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Write into Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Reify
2016-05-11T13:00:28.129Z: Detail: (7ad6fdbb36cc371e): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/GroupByKey/Reify into Write2Gcs/FileBasedSink.ReshardForWrite/RandomKey
2016-05-11T13:00:28.132Z: Detail: (7ad6fdbb36cc3d54): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/RandomKey into Write2Gcs/FileBasedSink.ReshardForWrite/Window.Into()
2016-05-11T13:00:28.135Z: Detail: (7ad6fdbb36cc338a): Fusing consumer Write2Gcs/FileBasedSink.ReshardForWrite/Window.Into() into ParDo(Article2CSV)
2016-05-11T13:00:28.137Z: Detail: (7ad6fdbb36cc39c0): Fusing consumer ParDo(Article2CSV) into AnonymousParDo
2016-05-11T13:00:28.139Z: Detail: (7ad6fdbb36cc3ff6): Fusing consumer GroupByKey/Write into GroupByKey/Reify
2016-05-11T13:00:28.141Z: Detail: (7ad6fdbb36cc362c): Fusing consumer AnonymousParDo into GroupByKey/GroupByWindow
2016-05-11T13:00:28.144Z: Detail: (7ad6fdbb36cc3c62): Fusing consumer GroupByKey/Reify into ParDo(Article2KV)
2016-05-11T13:00:28.146Z: Detail: (7ad6fdbb36cc3298): Fusing consumer ParDo(FlattenArticles) into ParDo(ParserEdition)
2016-05-11T13:00:28.148Z: Detail: (7ad6fdbb36cc38ce): Fusing consumer ParDo(Article2KV) into ParDo(GetRelatedArticles)
2016-05-11T13:00:28.196Z: Basic: (7ad6fdbb36cc3b3c): Worker configuration: n1-standard-1 in us-central1-f.
2016-05-11T13:00:28.459Z: Detail: (7ad6fdbb36cc3b9b): Adding StepResource setup and teardown to workflow graph.
2016-05-11T13:00:28.639Z: Basic: (cea9ab4bd124bf89): Starting 3 workers...
2016-05-11T13:00:28.658Z: Basic: S01: (e5a53851aa035056): Executing operation GroupByKey/Create
2016-05-11T13:00:28.896Z: Basic: S02: (5803a8f4cae47397): Executing operation ReadEditions4GCS+ParDo(ParserEdition)+ParDo(FlattenArticles)+ParDo(GetRelatedArticles)+ParDo(Article2KV)+GroupByKey/Reify+GroupByKey/Write
2016-05-11T13:01:12.228Z: Detail: (5d4a90d7ea1437dd): Workers have started successfully.
2016-05-11T13:01:22.911Z: Error: (f5a249985c78da4a): com.google.cloud.dataflow.sdk.util.UserCodeException: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: unable to parse secondary key
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:193)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:213)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487)
at uk.news.pipeline.api.ArticleMetaDataPipeline$Article2KV.processElement(ArticleMetaDataPipeline.java:147)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:213)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487)
at uk.news.pipeline.api.ArticleMetaDataPipeline$GetRelatedArticles.lambda$processElement$0(ArticleMetaDataPipeline.java:71)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: java.lang.RuntimeException: java.io.IOException: INVALID_ARGUMENT: unable to parse secondary key
at com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables.propagate(Throwables.java:160)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:176)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487)
at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38)
Caused by: java.io.IOException: INVALID_ARGUMENT: unable to parse secondary key
at com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write(Native Method)
at com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.writeChunk(ChunkingShuffleEntryWriter.java:72)
at com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.put(ChunkingShuffleEntryWriter.java:56)
at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.add(ShuffleSink.java:263)
at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.add(ShuffleSink.java:169)
at com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.process(WriteOperation.java:90)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487)
at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:213)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487)
at uk.news.pipeline.api.ArticleMetaDataPipeline$Article2KV.processElement(ArticleMetaDataPipeline.java:147)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:213)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487)
at uk.news.pipeline.api.ArticleMetaDataPipeline$GetRelatedArticles.lambda$processElement$0(ArticleMetaDataPipeline.java:71)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1689)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
2016-05-11T13:01:25.776Z: Error: (e9a78cb2969ddea0): java.lang.RuntimeException: java.io.IOException: DATA_LOSS: Inconsistent number of records, parsed 97, expected 98 when dataflow-articlemetadatapipeline-g-05110600-a71d-harness-p0a2 talking to tcp://localhost:12345
at com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Throwables.propagate(Throwables.java:160)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:176)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487)
at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38)
Caused by: java.io.IOException: DATA_LOSS: Inconsistent number of records, parsed 97, expected 98 when dataflow-articlemetadatapipeline-g-05110600-a71d-harness-p0a2 talking to tcp://localhost:12345
at com.google.cloud.dataflow.sdk.runners.worker.ApplianceShuffleWriter.write(Native Method)
at com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.writeChunk(ChunkingShuffleEntryWriter.java:72)
at com.google.cloud.dataflow.sdk.runners.worker.ChunkingShuffleEntryWriter.put(ChunkingShuffleEntryWriter.java:56)
at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.add(ShuffleSink.java:263)
at com.google.cloud.dataflow.sdk.runners.worker.ShuffleSink$ShuffleSinkWriter.add(ShuffleSink.java:169)
at com.google.cloud.dataflow.sdk.util.common.worker.WriteOperation.process(WriteOperation.java:90)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487)
at com.google.cloud.dataflow.sdk.util.ReifyTimestampAndWindowsDoFn.processElement(ReifyTimestampAndWindowsDoFn.java:38)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:213)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487)
at uk.news.pipeline.api.ArticleMetaDataPipeline$Article2KV.processElement(ArticleMetaDataPipeline.java:147)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:213)
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:53)
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52)
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase$1.output(ParDoFnBase.java:174)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnContext.outputWindowedValue(DoFnRunner.java:333)
at com.google.cloud.dataflow.sdk.util.DoFnRunner$DoFnProcessContext.output(DoFnRunner.java:487)
at uk.news.pipeline.api.ArticleMetaDataPipeline$GetRelatedArticles.processElement(ArticleMetaDataPipeline.java:69)
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189)
....
代码:
static class ParserEdition extends DoFn<String, Edition> {
@Override
public void processElement(ProcessContext c) throws Exception {
final String editionStr = c.element();
ObjectMapper mapper = new ObjectMapper();
ObjectReader reader = mapper.reader(Edition.class);
final Object editionObj = reader.readValue(editionStr);
c.output((Edition)editionObj);
}
}
static class FlattenArticles extends DoFn<Edition,Article>{
@Override
public void processElement(ProcessContext c) throws Exception {
final List<Article> articleList = c.element().getArticleList();
for (Article a : articleList){
c.output(a);
}
}
}
static class GetRelatedArticles extends DoFn<Article, Article>{
@Override
public void processElement(ProcessContext c) throws Exception {
final Article tArticle = c.element();
if(tArticle.getCategory().equals("article")){
Article cloneArticle = (Article)SerializationUtils.clone(tArticle);
cloneArticle.setImage(getRelatedImage(tArticle));
c.output(cloneArticle);
final List<Article> relateArticle = getRelateArticle(tArticle, 5);
relateArticle.parallelStream().forEach(a -> c.output(a));
}
}
public List<Article> getRelateArticle(Article art, int i){
List<Article> list = new ArrayList<>();
if(i <= 0 || art.getArticleList() == null){
return null;
}else {
for(Article a : art.getArticleList()) {
if (a.getCategory().equals("article")) {
Article cloneArticle = (Article)SerializationUtils.clone(a);
cloneArticle.setImage(getRelatedImage(a));
list.add(cloneArticle);
final List<Article> relateArticle = getRelateArticle(a, i - 1);
if (relateArticle != null) {
list.addAll(relateArticle);
}
}
}
}
return list;
}
public Image getRelatedImage(Article art){
Image image = new Image();
try{
final Article article = art.getArticleList().parallelStream().filter(
a -> (a.getCategory().equals("image") && a.getIdentifier().equals(art.getLeadAssetId())))
.findFirst().get();
if(article!=null){
image.setId(article.getIdentifier());
image.setImageUrl(URLEncoder.encode(article.getCrops().get(0).getImageId(), Charset.defaultCharset().name()));
}
}catch (Exception e){ }
return image;
}
}
static class Article2CSV extends DoFn<Article,String>{
private String delimiter;
Article2CSV(String delimiter){
this.delimiter = delimiter;
}
@Override
public void processElement(ProcessContext c) throws Exception {
final Article a = c.element();
String str = a.getIdentifier()+delimiter+a.getTitle() +delimiter+getTeaserText(a) +
delimiter+a.getPublished() +delimiter+ a.getLeadAssetId() +
delimiter+ a.getImage().getImageUrl();
c.output(str);
}
private String getTeaserText(Article a){
String teaser = "";
if(!a.getContent().isEmpty()){
for(Content c : a.getContent()){
if(teaser.length() <= 100){
teaser = teaser + c.getData().getText();
}
}
}
return teaser;
}
}
static class Article2KV extends DoFn<Article, KV<String, Article>> {
@Override
public void processElement(ProcessContext c) throws Exception {
final Article art = c.element();
if(art!=null && !StringUtils.isBlank(art.getIdentifier()))
c.output(KV.of(art.getIdentifier(),art));
}
}
........
PipelineOptionsFactory.register(ArticleMetaDataOptions.class);
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(ArticleMetaDataOptions.class);
final ArticleMetaDataOptions opts = (ArticleMetaDataOptions) options;
if (!opts.isTestMode())
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setDefaultWorkerLogLevel(DataflowWorkerLoggingOptions.Level.DEBUG);
Pipeline p = Pipeline.create(options);
final PCollection<String> edition4GCS = p.apply(TextIO.Read.named("ReadEditions4GCS")
.from("gs://editions-newsuk/*"));
// get articles from all the editions
final PCollection<Article> articlePCollection = edition4GCS.apply(ParDo.of(new ParserEdition())).apply(ParDo.of(new FlattenArticles()));
// get related articles
final PCollection<Article> articles = articlePCollection.apply(ParDo.of(new GetRelatedArticles()));
// convert into KV
final PCollection<KV<String, Article>> articlesKV = articles.apply(ParDo.of(new Article2KV()));
// Group by *** if this code below this commented. It then always works...
final PCollection<KV<String, Iterable<Article>>> groupByCollection = articlesKV.apply(GroupByKey.<String, Article>create());
// filter the duplicate/partial articles
PCollection<Article> filterArticles = groupByCollection.apply(ParDo.of(new DoFn<KV<String, Iterable<Article>>, Article>() {
public void processElement(ProcessContext c) {
String articleId = c.element().getKey();
Iterable<Article> arts = c.element().getValue();
boolean found = false;
Article article = null;
if(arts!=null){
for(Article at : arts){
article = at;
if(at!=null && !StringUtils.isBlank(at.getImage().getImageUrl())){
found = true;
c.output(at);
break;
}
}
if(!found ){
c.output(article);
}
}
}}));
// transform into file and persist to GCS
filterArticles.apply(ParDo.of(new Article2CSV(opts.getDelimiter()))).apply(TextIO.Write.named("Write2Gcs").withoutSharding().to(opts.getOutputLocation()));
最佳答案
在从关联的 startBundle
、processElement
或 finishBundle< 返回之前,必须同步并完成对
方法。DoFn.Context#output
的调用
在共享的代码中,您似乎在使用 someList.parallelStream().forEach(e -> c.output(e))
来输出元素。 parallelStream
的使用违反了要求。
使用常规(非并行)forEach
应该可以防止这些问题。
关于google-cloud-dataflow - Google 数据流,DATA_LOSS 异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37163200/
问题很简单:我正在寻找一种优雅的使用方式 CompletableFuture#exceptionally与 CompletableFuture#supplyAsync 一起.这是行不通的: priva
对于 Web 服务,我们通常使用 maven-jaxb2-plugin 生成 java bean,并在 Spring 中使用 JAXB2 编码。我想知道如何处理 WSDL/XSD 中声明的(SOAP-
这个问题已经有答案了: Array index out of bound behavior (10 个回答) 已关闭 8 年前。 我对下面的 C 代码感到好奇 int main(){
当在类的开头使用上下文和资源初始化 MediaPlayer 对象时,它会抛出 NullPointer 异常,但是当在类的开头声明它时(因此它是 null),然后以相同的方式初始化它在onCreate方
嘿 我尝试将 java 程序连接到 REST API。 使用相同的代码部分,我在 Java 6 中遇到了 Java 异常,并且在 Java 8 中运行良好。 环境相同: 信任 机器 unix 用户 代
我正在尝试使用 Flume 和 Hive 进行 Twitter 分析。为了从 twitter 获取推文,我在 flume.conf 文件中设置了所有必需的参数(consumerKey、consumer
我在 JavaFX 异常方面遇到一些问题。我的项目在我的 Eclipse 中运行,但现在我的 friend 也尝试访问该项目。我们已共享并直接保存到保管箱文件夹中。但他根本无法让它发挥作用。他在控制台
假设我使用 blur() 事件验证了电子邮件 ID,我正在这样做: $('#email').blur(function(){ //make ajax call , check if dupli
我这样做是为了从 C 代码调用非托管函数。 pCallback 是一个函数指针,因此在托管端是一个委托(delegate)。 [DllImport("MyDLL.dll")] public stati
为什么这段代码是正确的: try { } catch(ArrayOutOfBoundsException e) {} 这是错误的: try { } catch(IOException e) {} 这段
我遇到了以下问题:有导出函数的DLL。 代码示例如下:[动态链接库] __declspec(dllexport) int openDevice(int,void**) [应用] 开发者.h: __de
从其他线程,我知道我们不应该在析构函数中抛出异常!但是对于下面的例子,它确实有效。这是否意味着我们只能在一个实例的析构函数中抛出异常?我们应该如何理解这个代码示例! #include using n
为什么需要异常 引出 public static void main(String[
1. Java的异常机制 Throwable类是Java异常类型的顶层父类,一个对象只有是 Throwable 类的(直接或者间接)实例,他才是一个异常对象,才能被异常处理机制识别。JDK中内
我是 Python 的新手,我对某种异常方法的实现有疑问。这是代码(缩写): class OurException(Exception): """User defined Exception"
我已经创建了以下模式来表示用户和一组线程之间的关联,这些线程按他们的最后一条消息排序(用户已经阅读了哪些线程,哪些没有): CREATE TABLE table(user_id bigint, mes
我正在使用 Python 编写一个简单的自动化脚本,它可能会在多个位置引发异常。在他们每个人中,我都想记录一条特定的消息并退出程序。为此,我在捕获异常并处理它(执行特定的日志记录操作等)后引发 Sys
谁能解释一下为什么这会导致错误: let xs = [| "Mary"; "Mungo"; "Midge" |] Array.iter printfn xs 虽然不是这样: Array.iter pr
在我使用 Play! 的网站上,我有一个管理部分。所有 Admin Controller 都有一个 @With 和一个 @Check 注释。 断开连接后,一切正常。连接后,每次加载页面(任何页面,无论
我尝试连接到 azure 表存储并添加一个对象。它在本地主机上工作得很好,但是在我使用的服务器上我得到以下异常及其内部异常: Exception of type 'Microsoft.Wind
我是一名优秀的程序员,十分优秀!