gpt4 book ai didi

java - 结合 BigQuery 和 Pub/Sub Apache Beam

转载 作者:行者123 更新时间:2023-12-05 07:30:34 24 4
gpt4 key购买 nike

我正在尝试使用 DataFlowRunner 执行以下操作:

  1. 从分区的 BigQuery 表中读取数据(很多数据,但只获取最近两天的数据)
  2. 从 Pub/Sub 订阅中读取 JSON
  3. 在公共(public) key 上加入两个集合
  4. 将连接的集合插入另一个 BigQuery 表

我对 Apache Beam 还很陌生,所以我不能 100% 确定我想做的事是否可行。

当我尝试连接两行时,我的问题来了,在使用 CoGroupByKey 转换后,尽管窗口策略相同(30 秒固定窗口、窗口结束触发和丢弃被触发,但数据似乎永远不会同时到达 Pane )。

我的代码的一些相关 block :

    /* Getting the data and windowing */
PCollection<PubsubMessage> pubsub = p.apply("ReadPubSub sub",PubsubIO.readMessages().fromSubscription(SUB_ALIM_REC));

String query = /* The query */
PCollection<TableRow> bqData = p.apply("Reading BQ",BigQueryIO.readTableRows().fromQuery(query).usingStandardSql())
.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardSeconds(30)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.standardSeconds(0)).accumulatingFiredPanes());

PCollection<TableRow> tableRow = pubsub.apply(Window.<PubsubMessage>into(FixedWindows.of(Duration.standardSeconds(120)))
.triggering(AfterWatermark.pastEndOfWindow())
.withAllowedLateness(Duration.standardSeconds(0)).accumulatingFiredPanes())
.apply("JSON to TableRow",ParDo.of(new ToTableRow()));



/* Join code */
PCollection<TableRow> finalResultCollection =
kvpCollection.apply("Join TableRows", ParDo.of(
new DoFn<KV<Long, CoGbkResult>, TableRow>() {
private static final long serialVersionUID = 6627878974147676533L;

@ProcessElement
public void processElement(ProcessContext c) {
KV<Long, CoGbkResult> e = c.element();
Long idPaquete = e.getKey();
Iterable<TableRow> it1 = e.getValue().getAll(packTag);
Iterable<TableRow> it2 = e.getValue().getAll(alimTag);
for(TableRow t1 : itPaq) {
for (TableRow t2 : itAlimRec) {
TableRow joinedRow = new TableRow();
/* set the required fields from each collection */
c.output(joinedRow);
}

}
}
}));

同样在过去的两天里,我一直收到这个错误:

java.io.IOException: Failed to start reading from source: org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter@2808d228
com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:783)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: BigQuery source must be split before being read
org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.createReader(BigQuerySourceBase.java:153)
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.advance(UnboundedReadFromBoundedSource.java:463)
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$ResidualSource.access$300(UnboundedReadFromBoundedSource.java:442)
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.advance(UnboundedReadFromBoundedSource.java:293)
org.apache.beam.runners.core.construction.UnboundedReadFromBoundedSource$BoundedToUnboundedSourceAdapter$Reader.start(UnboundedReadFromBoundedSource.java:286)
com.google.cloud.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:778)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:360)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:193)
com.google.cloud.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:158)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

非常感谢您的指导,让我知道我正在尝试做的事情是否可行,或者是否有解决这种情况的替代方案。

最佳答案

我试过做同样的事情。据我了解this question ,目前是不可能的。我尝试自己做,使用 PeriodicImpulse,遵循 this example (虽然,我不想要侧面输入)。我编写了类似于以下代码,但出现了ValueError: BigQuery source is not currently available for use in streaming pipelines.

segments = p | 'triggering segments fetch' >> PeriodicImpulse() \
| "loading segments" >> beam.io.Read(beam.io.BigQuerySource(
use_standard_sql=True,
query=f'''
SELECT
id,
segment
FROM `some_table`''')) \
| "windowing info" >> beam.WindowInto(window.FixedWindows(5))

info = p | "reading info" >> beam.io.ReadFromPubSub(
topic='my_test_topic') \
| "parsing info" >> beam.Map(message_to_json) \
| "mapping info" >> beam.Map(lambda x: (x['id'], x['username'])) \
| "windowing info" >> beam.WindowInto(window.FixedWindows(5))

results = ({'segments': segments, 'info': info} | beam.CoGroupByKey()) | "printing" >> beam.Map(print_out)

我认为目前最好的解决方案是使用 Datastore 等外部存储。我在另一个生产流水线中使用了这种方法,效果很好。你可以找到解释 here .

关于java - 结合 BigQuery 和 Pub/Sub Apache Beam,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52183368/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com