gpt4 book ai didi

go - 在 Golang 中使用 BigQuery Write API

转载 作者:行者123 更新时间:2023-12-05 01:55:18 24 4
gpt4 key购买 nike

我正在尝试使用新的 Bigquery Storage API从 Golang 进行流式插入。我的理解基于 this page这个 API 取代了旧的流式插入 bigquery API。

但是,examples in the docs 中没有一个显示如何实际插入行。为了创建 AppendRowsRequest,我得出了以下结论:

&storagepb.AppendRowsRequest{
WriteStream: resp.Name,
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
WriterSchema: nil, // protobuf schema??
Rows: &storagepb.ProtoRows{
SerializedRows: [][]byte{}, // serialized protocol buffer data??
},
},
},
}

我应该在上面的 SerializedRows 字段中输入什么数据?

上面的storagepb.ProtoRows 结构被记录在案here .不幸的是,所有给出的都是 Protocol Buffer 的主要概述页面的链接。

谁能给我一个使用新的 Bigquery Storage API 将行从 Golang 流式传输到 bigquery 的示例?

最佳答案

在上述答案的帮助下,我得到了一个可以在 github 上找到的工作示例: https://github.com/alexflint/bigquery-storage-api-example

主要代码为:

const (
project = "myproject"
dataset = "mydataset"
table = "mytable"
trace = "bigquery-writeclient-example" // identifies this client for bigquery debugging
)

// the data we will stream to bigquery
var rows = []*Row{
{Name: "John Doe", Age: 104},
{Name: "Jane Doe", Age: 69},
{Name: "Adam Smith", Age: 33},
}

func main() {
ctx := context.Background()

// create the bigquery client
client, err := storage.NewBigQueryWriteClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()

// create the write stream
// a COMMITTED write stream inserts data immediately into bigquery
resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_COMMITTED,
},
})
if err != nil {
log.Fatal("CreateWriteStream: ", err)
}

// get the stream by calling AppendRows
stream, err := client.AppendRows(ctx)
if err != nil {
log.Fatal("AppendRows: ", err)
}

// get the protobuf descriptor for our row type
var row Row
descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
if err != nil {
log.Fatal("NormalizeDescriptor: ", err)
}

// serialize the rows
var opts proto.MarshalOptions
var data [][]byte
for _, row := range rows {
buf, err := opts.Marshal(row)
if err != nil {
log.Fatal("protobuf.Marshal: ", err)
}
data = append(data, buf)
}

// send the rows to bigquery
err = stream.Send(&storagepb.AppendRowsRequest{
WriteStream: resp.Name,
TraceId: trace, // identifies this client
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
// protocol buffer schema
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: descriptor,
},
// protocol buffer data
Rows: &storagepb.ProtoRows{
SerializedRows: data, // serialized protocol buffer data
},
},
},
})
if err != nil {
log.Fatal("AppendRows.Send: ", err)
}

// get the response, which will tell us whether it worked
_, err = stream.Recv()
if err != nil {
log.Fatal("AppendRows.Recv: ", err)
}

log.Println("done")
}

上面“Row”结构的 Protocol Buffer 定义是:

syntax = "proto3";

package tutorial;

option go_package = ".;main";

message Row {
string Name = 1;
int32 Age = 2;
}

您需要先使用与 Protocol Buffer 对应的模式创建一个 bigquery 数据集和表。请参阅上面链接的存储库中的自述文件,了解如何执行此操作。

运行上面的代码后,数据在 bigquery 中显示如下:

$ bq query 'select * from mydataset.mytable'
Waiting on bqjob_r1b39442e5474a885_0000017df21f629e_1 ... (0s) Current status: DONE
+------------+-----+
| name | age |
+------------+-----+
| John Doe | 104 |
| Jane Doe | 69 |
| Adam Smith | 33 |
+------------+-----+

感谢大家的帮助!

关于go - 在 Golang 中使用 BigQuery Write API,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70279279/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com