gpt4 book ai didi

google-bigquery - Google Pub/Sub to Dataflow,避免与记录 ID 重复

转载 作者:行者123 更新时间:2023-12-03 10:33:43 25 4
gpt4 key购买 nike

我正在尝试构建一个流式数据流作业,它从 Pub/Sub 读取事件并将它们写入 BigQuery。

根据文档,如果使用记录 ID,Dataflow 可以检测到重复的消息传递(请参阅:https://cloud.google.com/dataflow/model/pubsub-io#using-record-ids)

但即使使用这个记录 ID,我仍然有一些重复
(约 0.0002%)。

我错过了什么 ?

编辑:

我用 Spotify Async PubSub Client使用以下代码发布消息:

Message
.builder()
.data(new String(Base64.encodeBase64(json.getBytes())))
.attributes("myid", id, "mytimestamp", timestamp.toString)
.build()

然后我使用 Spotify scio从 pub/sub 读取消息并将其保存到 DataFlow:
val input = sc.withName("ReadFromSubscription")
.pubsubSubscription(subscriptionName, "myid", "mytimestamp")
input
.withName("FixedWindow")
.withFixedWindows(windowSize) // apply windowing logic
.toWindowed // convert to WindowedSCollection
//
.withName("ParseJson")
.map { wv =>
wv.copy(value = TableRow(
"message_id" -> (Json.parse(wv.value) \ "id").as[String],
"message" -> wv.value)
)
}
//
.toSCollection // convert back to normal SCollection
//
.withName("SaveToBigQuery")
.saveAsBigQuery(bigQueryTable(opts), BQ_SCHEMA, WriteDisposition.WRITE_APPEND)

窗口大小为 1 分钟。

在注入(inject)消息几秒钟后,我已经在 BigQuery 中有重复消息。

我使用此查询来计算重复项:
SELECT 
COUNT(message_id) AS TOTAL,
COUNT(DISTINCT message_id) AS DISTINCT_TOTAL
FROM my_dataset.my_table

//returning 273666 273564

而这个来看看他们:
SELECT *
FROM my_dataset.my_table
WHERE message_id IN (
SELECT message_id
FROM my_dataset.my_table
GROUP BY message_id
HAVING COUNT(*) > 1
) ORDER BY message_id

//returning for instance:
row|id | processed_at | processed_at_epoch
1 00166a5c-9143-3b9e-92c6-aab52601b0be 2017-02-02 14:06:50 UTC 1486044410367 { ...json1... }
2 00166a5c-9143-3b9e-92c6-aab52601b0be 2017-02-02 14:06:50 UTC 1486044410368 { ...json1... }
3 00354cc4-4794-3878-8762-f8784187c843 2017-02-02 13:59:33 UTC 1486043973907 { ...json2... }
4 00354cc4-4794-3878-8762-f8784187c843 2017-02-02 13:59:33 UTC 1486043973741 { ...json2... }
5 0047284e-0e89-3d57-b04d-ebe4c673cc1a 2017-02-02 14:09:10 UTC 1486044550489 { ...json3... }
6 0047284e-0e89-3d57-b04d-ebe4c673cc1a 2017-02-02 14:08:52 UTC 1486044532680 { ...json3... }

最佳答案

BigQuery documentation states可能存在重复到达的罕见情况:

  • “BigQuery 会记住此 ID 至少一分钟”——如果 Dataflow 在重试插入之前花费超过一分钟 BigQuery 可能允许重复输入。您可以查看管道中的日志以确定是否是这种情况.
  • “在谷歌数据中心意外失去连接的罕见情况下,自动重复数据删除可能是不可能的。”

  • 您可能想尝试 manually removing duplicates 的说明.这也将允许您查看 insertID与每行一起使用以确定问题是在 Dataflow 端(为同一记录生成不同的 insertID )还是在 BigQuery 端(未能根据行的 insertID 删除重复数据)。

    关于google-bigquery - Google Pub/Sub to Dataflow,避免与记录 ID 重复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41984789/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com