bigquery TIMESTAMP 字段的 Protobuf 类型
Protobuf type for bigquery TIMESTAMP field
我正在使用新的 Storage API 从 Golang 将数据流式传输到 bigquery。我的 bigquery table 的架构包含一个 TIMESTAMP 字段,如下所示:
bq mk -t mydataset.mytable name:string,lastseen:timestamp
我单独定义了一个这样的协议缓冲区:
message Row {
string Name = 1;
google.protobuf.Timestamp LastSeen = 3;
}
但是,当我将此数据提交到 BigQuery 时,出现以下错误:
rpc error: code = InvalidArgument desc = The proto field mismatched with BigQuery field at tutorial_Row.LastSeen, the proto field type message, BigQuery field type TIMESTAMP
似乎google.protobuf.Timestamp
protobuf 与bigquery 中的TIMESTAMP 类型不对应。这是有道理的,因为 bigquery 文档说 TIMESTAMP 包含时区,但 google.protobuf.Timestamp
不包含时区。但是我应该使用哪个协议缓冲区?
我正在使用源自 this repository 的代码,它看起来像这样:
import (
"context"
"fmt"
"log"
storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
"google.golang.org/protobuf/proto"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)
const (
project = "myproject"
dataset = "mydataset"
table = "mytable2"
)
func main() {
ctx := context.Background()
// the data we will stream to bigquery
var rows = []*Row{
{Name: "John Doe", Age: 104, LastSeen: timestamppb.Now()},
{Name: "Jane Doe", Age: 69, LastSeen: timestamppb.Now()},
{Name: "Adam Smith", Age: 33, LastSeen: timestamppb.Now()},
}
// create the bigquery client
client, err := storage.NewBigQueryWriteClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// create the write stream
// a COMMITTED write stream inserts data immediately into bigquery
resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_COMMITTED,
},
})
if err != nil {
log.Fatal("CreateWriteStream: ", err)
}
// get the stream by calling AppendRows
stream, err := client.AppendRows(ctx)
if err != nil {
log.Fatal("AppendRows: ", err)
}
// get the protobuf descriptor for our row type
var row Row
descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
if err != nil {
log.Fatal("NormalizeDescriptor: ", err)
}
// serialize the rows
var opts proto.MarshalOptions
var data [][]byte
for _, row := range rows {
buf, err := opts.Marshal(row)
if err != nil {
log.Fatal("protobuf.Marshal: ", err)
}
data = append(data, buf)
}
// send the rows to bigquery
err = stream.Send(&storagepb.AppendRowsRequest{
WriteStream: resp.Name,
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
// protocol buffer schema
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: descriptor,
},
// protocol buffer data
Rows: &storagepb.ProtoRows{
SerializedRows: data, // serialized protocol buffer data
},
},
},
})
if err != nil {
log.Fatal("AppendRows.Send: ", err)
}
// get the response, which will tell us whether it worked
_, err = stream.Recv()
if err != nil {
log.Fatal("AppendRows.Recv: ", err)
}
log.Println("done")
}
是的,后端无法正确解码原型时间戳消息。
最快的解决方案答案:发送 int64 类型,填充为纪元微秒。
https://cloud.google.com/bigquery/docs/write-api#data_type_conversions
我正在使用新的 Storage API 从 Golang 将数据流式传输到 bigquery。我的 bigquery table 的架构包含一个 TIMESTAMP 字段,如下所示:
bq mk -t mydataset.mytable name:string,lastseen:timestamp
我单独定义了一个这样的协议缓冲区:
message Row {
string Name = 1;
google.protobuf.Timestamp LastSeen = 3;
}
但是,当我将此数据提交到 BigQuery 时,出现以下错误:
rpc error: code = InvalidArgument desc = The proto field mismatched with BigQuery field at tutorial_Row.LastSeen, the proto field type message, BigQuery field type TIMESTAMP
似乎google.protobuf.Timestamp
protobuf 与bigquery 中的TIMESTAMP 类型不对应。这是有道理的,因为 bigquery 文档说 TIMESTAMP 包含时区,但 google.protobuf.Timestamp
不包含时区。但是我应该使用哪个协议缓冲区?
我正在使用源自 this repository 的代码,它看起来像这样:
import (
"context"
"fmt"
"log"
storage "cloud.google.com/go/bigquery/storage/apiv1beta2"
"cloud.google.com/go/bigquery/storage/managedwriter/adapt"
storagepb "google.golang.org/genproto/googleapis/cloud/bigquery/storage/v1beta2"
"google.golang.org/protobuf/proto"
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
)
const (
project = "myproject"
dataset = "mydataset"
table = "mytable2"
)
func main() {
ctx := context.Background()
// the data we will stream to bigquery
var rows = []*Row{
{Name: "John Doe", Age: 104, LastSeen: timestamppb.Now()},
{Name: "Jane Doe", Age: 69, LastSeen: timestamppb.Now()},
{Name: "Adam Smith", Age: 33, LastSeen: timestamppb.Now()},
}
// create the bigquery client
client, err := storage.NewBigQueryWriteClient(ctx)
if err != nil {
log.Fatal(err)
}
defer client.Close()
// create the write stream
// a COMMITTED write stream inserts data immediately into bigquery
resp, err := client.CreateWriteStream(ctx, &storagepb.CreateWriteStreamRequest{
Parent: fmt.Sprintf("projects/%s/datasets/%s/tables/%s", project, dataset, table),
WriteStream: &storagepb.WriteStream{
Type: storagepb.WriteStream_COMMITTED,
},
})
if err != nil {
log.Fatal("CreateWriteStream: ", err)
}
// get the stream by calling AppendRows
stream, err := client.AppendRows(ctx)
if err != nil {
log.Fatal("AppendRows: ", err)
}
// get the protobuf descriptor for our row type
var row Row
descriptor, err := adapt.NormalizeDescriptor(row.ProtoReflect().Descriptor())
if err != nil {
log.Fatal("NormalizeDescriptor: ", err)
}
// serialize the rows
var opts proto.MarshalOptions
var data [][]byte
for _, row := range rows {
buf, err := opts.Marshal(row)
if err != nil {
log.Fatal("protobuf.Marshal: ", err)
}
data = append(data, buf)
}
// send the rows to bigquery
err = stream.Send(&storagepb.AppendRowsRequest{
WriteStream: resp.Name,
Rows: &storagepb.AppendRowsRequest_ProtoRows{
ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
// protocol buffer schema
WriterSchema: &storagepb.ProtoSchema{
ProtoDescriptor: descriptor,
},
// protocol buffer data
Rows: &storagepb.ProtoRows{
SerializedRows: data, // serialized protocol buffer data
},
},
},
})
if err != nil {
log.Fatal("AppendRows.Send: ", err)
}
// get the response, which will tell us whether it worked
_, err = stream.Recv()
if err != nil {
log.Fatal("AppendRows.Recv: ", err)
}
log.Println("done")
}
是的,后端无法正确解码原型时间戳消息。
最快的解决方案答案:发送 int64 类型,填充为纪元微秒。
https://cloud.google.com/bigquery/docs/write-api#data_type_conversions