使用 Apache Beam 将重复的字符串写入 BigQuery
Write repeated Strings to BigQuery using Apache Beam
我有一个包含 Strings
的数据流,看起来像 JSONArrays
。我想解析这些字符串并使用 Apache Beam 写入 BigQuery table,但在写入重复的字符串时出现错误。
以下是我将字符串转换为 TableRow
的方法:
String dataString = "[{\"EMAIL\": [\"zog@yahoo.com\"]}]";
JSONArray jsonArray = new JSONArray(dataString);
TableRow tableRow = new TableRow();
for (int i = 0; i < jsonArray.length(); i++) {
JSONArray emailArray = new JSONArray(jsonArray.getJSONObject(i).get("EMAIL").toString());
tableRow.set("EMAIL", emailArray); //Results in error
}
我的 BigQuery 架构如下所示:
[
{
"name": "EMAIL",
"type": "STRING",
"mode": "REPEATED"
}
]
我已经设法使用 Python 将类似的重复字符串写入 BigQuery table,但无法使用 Apache Beam 完成。我想我没有在 TableRow
中保存正确的键值对。我现在得到的错误是:
java.io.IOException: Insert failed: [{"errors":[{"debugInfo":"","location":"email","message":"This field is not a record.","reason":"invalid"}],"index":0}]
我需要有关如何在不创建记录的情况下将类似的重复字符串保存到 BigQuery 的帮助,我将不胜感激任何意见或建议。提前致谢。
看来你想创建
- 一行包含电子邮件地址的串联字符串,或
- 每个电子邮件行,或
- 具有重复字段的一行。
请注意,您的 ValidFrom
字段似乎是 STRING
类型, 不是 重复字段,除非它包含在重复字段中层次架构。
在您提供的示例代码中,您正在创建一个 JSONArray
并将其放入 STRING
字段,我认为这会导致问题,因为类型不兼容。如果您想将其保留为普通 STRING
字段,您可以使用下面的解决方案 1。
还要确保您在 BigQuery 中的列名称与代码中的列名称相匹配,我看到您同时使用了 ValidFrom
和 EMAIL
(尽管您发布的代码可能有误) .
解决方案 1:一行带有字符串字段
如果您想在 BigQuery 中添加 一行 和 concatenated String
字段,您可以使用以下命令:
// Initialize your final row
TableRow tableRow = new TableRow();
// Find email addresses
String [] emails = ... // your extraction logic
// Build a concatenated string of emails
String allEmails = String.join(";", emails);
// Add the string field to the row
tableRow.set('EMAILS', allEmails);
解决方案 2:包含字符串字段的多行
如果您想插入 多行 ,您可以创建多个 table 行:
// Find email addresses
String [] emails = ... // your extraction logic
// Build a row per email
for(String email: emails) {
// Initialize your final row
TableRow tableRow = new TableRow();
tableRow.set('EMAIL', email);
// TODO: do something with the row (add to list, or ...)
}
解决方案 3:一行包含 REPEATED 字段
如果您想在 BigQuery 中添加 一行 和 REPEATED STRING
字段,您可以使用以下命令:
// Initialize your final row
TableRow tableRow = new TableRow();
// Find email addresses
String [] emails = ... // your extraction logic
// Build the repeated field
List<String> emailCells = new ArrayList<>();
for(String email: emails) {
emailCells.add(email);
}
// Add the repeated field to the row
tableRow.set('EMAILS', emailCells);
如果这不是您的目标,请提供更多详细信息。
我有一个包含 Strings
的数据流,看起来像 JSONArrays
。我想解析这些字符串并使用 Apache Beam 写入 BigQuery table,但在写入重复的字符串时出现错误。
以下是我将字符串转换为 TableRow
的方法:
String dataString = "[{\"EMAIL\": [\"zog@yahoo.com\"]}]";
JSONArray jsonArray = new JSONArray(dataString);
TableRow tableRow = new TableRow();
for (int i = 0; i < jsonArray.length(); i++) {
JSONArray emailArray = new JSONArray(jsonArray.getJSONObject(i).get("EMAIL").toString());
tableRow.set("EMAIL", emailArray); //Results in error
}
我的 BigQuery 架构如下所示:
[
{
"name": "EMAIL",
"type": "STRING",
"mode": "REPEATED"
}
]
我已经设法使用 Python 将类似的重复字符串写入 BigQuery table,但无法使用 Apache Beam 完成。我想我没有在 TableRow
中保存正确的键值对。我现在得到的错误是:
java.io.IOException: Insert failed: [{"errors":[{"debugInfo":"","location":"email","message":"This field is not a record.","reason":"invalid"}],"index":0}]
我需要有关如何在不创建记录的情况下将类似的重复字符串保存到 BigQuery 的帮助,我将不胜感激任何意见或建议。提前致谢。
看来你想创建
- 一行包含电子邮件地址的串联字符串,或
- 每个电子邮件行,或
- 具有重复字段的一行。
请注意,您的 ValidFrom
字段似乎是 STRING
类型, 不是 重复字段,除非它包含在重复字段中层次架构。
在您提供的示例代码中,您正在创建一个 JSONArray
并将其放入 STRING
字段,我认为这会导致问题,因为类型不兼容。如果您想将其保留为普通 STRING
字段,您可以使用下面的解决方案 1。
还要确保您在 BigQuery 中的列名称与代码中的列名称相匹配,我看到您同时使用了 ValidFrom
和 EMAIL
(尽管您发布的代码可能有误) .
解决方案 1:一行带有字符串字段
如果您想在 BigQuery 中添加 一行 和 concatenated String
字段,您可以使用以下命令:
// Initialize your final row
TableRow tableRow = new TableRow();
// Find email addresses
String [] emails = ... // your extraction logic
// Build a concatenated string of emails
String allEmails = String.join(";", emails);
// Add the string field to the row
tableRow.set('EMAILS', allEmails);
解决方案 2:包含字符串字段的多行
如果您想插入 多行 ,您可以创建多个 table 行:
// Find email addresses
String [] emails = ... // your extraction logic
// Build a row per email
for(String email: emails) {
// Initialize your final row
TableRow tableRow = new TableRow();
tableRow.set('EMAIL', email);
// TODO: do something with the row (add to list, or ...)
}
解决方案 3:一行包含 REPEATED 字段
如果您想在 BigQuery 中添加 一行 和 REPEATED STRING
字段,您可以使用以下命令:
// Initialize your final row
TableRow tableRow = new TableRow();
// Find email addresses
String [] emails = ... // your extraction logic
// Build the repeated field
List<String> emailCells = new ArrayList<>();
for(String email: emails) {
emailCells.add(email);
}
// Add the repeated field to the row
tableRow.set('EMAILS', emailCells);
如果这不是您的目标,请提供更多详细信息。