通过 Google Cloud Dataflow 将 PubSub 消息插入 BigQuery
Insert PubSub messages into BigQuery through Google Cloud Dataflow
我想使用 Google Cloud Dataflow 将来自主题的 PubSub 消息数据插入到 BigQuery table 中。
一切都很好,但在 BigQuery table 中,我可以看到不可读的字符串,如“ ”。
这是我的管道:
p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name"))
.apply(ParDo.named("Transformation").of(new StringToRowConverter()))
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table")
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED))
我的简单 StringToRowConverter 函数是:
class StringToRowConverter extends DoFn<String, TableRow> {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split(",")) {
if (!word.isEmpty()) {
System.out.println(word);
c.output(new TableRow().set("data", word));
}
}
}
}
这是我通过 POST 请求发送的消息:
POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish
{
"messages": [
{
"attributes":{
"key": "tablet, smartphone, desktop",
"value": "eng"
},
"data": "34gf5ert"
}
]
}
我错过了什么?
谢谢!
根据 https://cloud.google.com/pubsub/reference/rest/v1/PubsubMessage ,pubsub 消息的 JSON 有效载荷是 base64 编码的。默认情况下,Dataflow 中的 PubsubIO 使用 String UTF8 编码器。您提供的示例字符串“34gf5ert”在经过 base64 解码然后解释为 UTF-8 字符串时,准确地给出了“����”。
这就是我解压 pubsub 消息的方式:
@Override
public void processElement(ProcessContext c) {
String json = c.element();
HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType());
String unpacked = items.get("JsonKey");
希望对你有用。
我想使用 Google Cloud Dataflow 将来自主题的 PubSub 消息数据插入到 BigQuery table 中。 一切都很好,但在 BigQuery table 中,我可以看到不可读的字符串,如“ ”。 这是我的管道:
p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name"))
.apply(ParDo.named("Transformation").of(new StringToRowConverter()))
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table")
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED))
我的简单 StringToRowConverter 函数是:
class StringToRowConverter extends DoFn<String, TableRow> {
private static final long serialVersionUID = 0;
@Override
public void processElement(ProcessContext c) {
for (String word : c.element().split(",")) {
if (!word.isEmpty()) {
System.out.println(word);
c.output(new TableRow().set("data", word));
}
}
}
}
这是我通过 POST 请求发送的消息:
POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish
{
"messages": [
{
"attributes":{
"key": "tablet, smartphone, desktop",
"value": "eng"
},
"data": "34gf5ert"
}
]
}
我错过了什么? 谢谢!
根据 https://cloud.google.com/pubsub/reference/rest/v1/PubsubMessage ,pubsub 消息的 JSON 有效载荷是 base64 编码的。默认情况下,Dataflow 中的 PubsubIO 使用 String UTF8 编码器。您提供的示例字符串“34gf5ert”在经过 base64 解码然后解释为 UTF-8 字符串时,准确地给出了“����”。
这就是我解压 pubsub 消息的方式:
@Override
public void processElement(ProcessContext c) {
String json = c.element();
HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType());
String unpacked = items.get("JsonKey");
希望对你有用。