从 BigQuery 采购时,Beam 管道是否在源查询中保留 ORDER BY?
Do beam pipelines preserve ORDER BY in a source query when sourcing from BigQuery?
我们有一个用 Java 编写的 Beam 管道,我们在 GCP Dataflow 上 运行。它非常简单,它将 SQL 查询作为 PipelineOption
查询,针对 BigQuery 发出 SQL 查询,并为返回数据集中的每一行构造一条消息并将其放入 pubsub 主题。
import com.google.api.services.bigquery.model.TableRow;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@code BigQueryEventReplayer} pipeline runs a supplied SQL query
* against BigQuery, and sends the results one-by-one to PubSub
* The query MUST return a column named 'json', it is this column
* (and ONLY this column) that will be sent onward. The column must be a String type
* and should be valid JSON.
*/
public class BigQueryEventReplayer {
private static final Logger logger = LoggerFactory.getLogger(BigQueryEventReplayer.class);
/**
* Options for the BigQueryEventReplayer. See descriptions for more info
*/
public interface Options extends PipelineOptions {
@Description("SQL query to be run."
+ "An SQL string literal which will be run 'as is'")
@Required
ValueProvider<String> getBigQuerySql();
void setBigQuerySql(ValueProvider<String> value);
@Description("The name of the topic which data should be published to. "
+ "The name should be in the format of projects/<project-id>/topics/<topic-name>.")
@Required
ValueProvider<String> getOutputTopic();
void setOutputTopic(ValueProvider<String> value);
@Description("The ID of the BigQuery dataset targeted by the event")
@Required
ValueProvider<String> getBigQueryTargetDataset();
void setBigQueryTargetDataset(ValueProvider<String> value);
@Description("The ID of the BigQuery table targeted by the event")
@Required
ValueProvider<String> getBigQueryTargetTable();
void setBigQueryTargetTable(ValueProvider<String> value);
@Description("The SourceSystem attribute of the event")
@Required
ValueProvider<String> getSourceSystem();
void setSourceSystem(ValueProvider<String> value);
}
/**
* Takes the data from the TableRow and prepares it for the PubSub, including
* adding attributes to ensure the payload is routed correctly.
*/
// We would rather use a SimpleFunction here but then we wouldn't be able
// to inject our value providers. So instead we hackishly make a nested class
public static class MapQueryToPubsub extends DoFn<TableRow, PubsubMessage> {
private final ValueProvider<String> targetDataset;
private final ValueProvider<String> targetTable;
private final ValueProvider<String> sourceSystem;
MapQueryToPubsub(
ValueProvider<String> targetDataset,
ValueProvider<String> targetTable,
ValueProvider<String> sourceSystem) {
this.targetDataset = targetDataset;
this.targetTable = targetTable;
this.sourceSystem = sourceSystem;
}
/**
* Entry point of DoFn for Dataflow.
*/
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = c.element();
if (!row.containsKey("json")) {
logger.warn("table does not contain column named 'json'");
}
Map<String, String> attributes = new HashMap<>();
attributes.put("sourceSystem", sourceSystem.get());
attributes.put("targetDataset", targetDataset.get());
attributes.put("targetTable", targetTable.get());
String json = (String) row.get("json");
c.output(new PubsubMessage(json.getBytes(), attributes));
}
}
/**
* Run the pipeline. This is the entrypoint for running 'locally'
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Run the pipeline. This is the entrypoint that GCP will use
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read from BigQuery query",
BigQueryIO.readTableRows().fromQuery(options.getBigQuerySql()).usingStandardSql().withoutValidation()
.withTemplateCompatibility())
.apply("Map data to PubsubMessage",
ParDo.of(
new MapQueryToPubsub(
options.getBigQueryTargetDataset(),
options.getBigQueryTargetTable(),
options.getSourceSystem()
)
)
)
.apply("Write message to PubSub", PubsubIO.writeMessages().to(options.getOutputTopic()));
return pipeline.run();
}
}
被查询的 BigQuery 数据本质上是事件日志。我们最近确定将这些事件插入 pubsub 主题的顺序很重要。我们可以通过在针对 BigQuery 发出的查询中使用 ORDER BY 来确定正确的顺序,但是我们怀疑在将数据插入 pubsub 主题时是否会遵守该顺序。
我们主要关注的是这段代码:
pipeline.apply("Read from BigQuery query",
BigQueryIO.readTableRows().fromQuery(options.getBigQuerySql()).usingStandardSql().withoutValidation()
.withTemplateCompatibility())
该简单命令在 Dataflow 中显示为:
该步骤中发生了很多事情(洗牌等...),实际上许多子步骤本身由多个子步骤组成。此外,其中一个子步骤称为“ReadFiles”,这让我觉得 Dataflow 可能正在将数据写入某种临时文件存储。总而言之,这让我怀疑当行发布到 pubsub 时,提供的 SQL 查询中的 ORDER BY 是否会被保留。
beam/Dataflow 是否提供任何保证 ORDER BY 将在这种情况下保留,或者我是否必须在我的管道中引入一种排序以保证遵守所需的顺序?
BigQueryIO Read 基本上包括一个导入 GCS 的作业作为 query/table 的 Avro,然后从这些文件(以及更多内容)中读取。所以它不会保留顺序,因为读取是并行的,并且会有多个线程读取已创建文件的块。
一般来说,像 Dataflow(或 Spark 等)这样的分布式处理系统不会保留顺序,并且鉴于其工作的并行性质,它们不擅长对事物进行排序。请记住,要对元素进行排序,您需要将所有内容保存在一个 worker 中。
事实上,即使在 BigQuery 中,ORDER BY 也是一项要求很高的任务。
很难为此找到解决方法,因为系统不是为此类任务构建的。我可以考虑添加 ROW NUMBER,将其用作时间戳并添加 window,但这是特定于用例的。
此外,PubSubIO 不会在发布时保留顺序。
我们有一个用 Java 编写的 Beam 管道,我们在 GCP Dataflow 上 运行。它非常简单,它将 SQL 查询作为 PipelineOption
查询,针对 BigQuery 发出 SQL 查询,并为返回数据集中的每一行构造一条消息并将其放入 pubsub 主题。
import com.google.api.services.bigquery.model.TableRow;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@code BigQueryEventReplayer} pipeline runs a supplied SQL query
* against BigQuery, and sends the results one-by-one to PubSub
* The query MUST return a column named 'json', it is this column
* (and ONLY this column) that will be sent onward. The column must be a String type
* and should be valid JSON.
*/
public class BigQueryEventReplayer {
private static final Logger logger = LoggerFactory.getLogger(BigQueryEventReplayer.class);
/**
* Options for the BigQueryEventReplayer. See descriptions for more info
*/
public interface Options extends PipelineOptions {
@Description("SQL query to be run."
+ "An SQL string literal which will be run 'as is'")
@Required
ValueProvider<String> getBigQuerySql();
void setBigQuerySql(ValueProvider<String> value);
@Description("The name of the topic which data should be published to. "
+ "The name should be in the format of projects/<project-id>/topics/<topic-name>.")
@Required
ValueProvider<String> getOutputTopic();
void setOutputTopic(ValueProvider<String> value);
@Description("The ID of the BigQuery dataset targeted by the event")
@Required
ValueProvider<String> getBigQueryTargetDataset();
void setBigQueryTargetDataset(ValueProvider<String> value);
@Description("The ID of the BigQuery table targeted by the event")
@Required
ValueProvider<String> getBigQueryTargetTable();
void setBigQueryTargetTable(ValueProvider<String> value);
@Description("The SourceSystem attribute of the event")
@Required
ValueProvider<String> getSourceSystem();
void setSourceSystem(ValueProvider<String> value);
}
/**
* Takes the data from the TableRow and prepares it for the PubSub, including
* adding attributes to ensure the payload is routed correctly.
*/
// We would rather use a SimpleFunction here but then we wouldn't be able
// to inject our value providers. So instead we hackishly make a nested class
public static class MapQueryToPubsub extends DoFn<TableRow, PubsubMessage> {
private final ValueProvider<String> targetDataset;
private final ValueProvider<String> targetTable;
private final ValueProvider<String> sourceSystem;
MapQueryToPubsub(
ValueProvider<String> targetDataset,
ValueProvider<String> targetTable,
ValueProvider<String> sourceSystem) {
this.targetDataset = targetDataset;
this.targetTable = targetTable;
this.sourceSystem = sourceSystem;
}
/**
* Entry point of DoFn for Dataflow.
*/
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = c.element();
if (!row.containsKey("json")) {
logger.warn("table does not contain column named 'json'");
}
Map<String, String> attributes = new HashMap<>();
attributes.put("sourceSystem", sourceSystem.get());
attributes.put("targetDataset", targetDataset.get());
attributes.put("targetTable", targetTable.get());
String json = (String) row.get("json");
c.output(new PubsubMessage(json.getBytes(), attributes));
}
}
/**
* Run the pipeline. This is the entrypoint for running 'locally'
*/
public static void main(String[] args) {
// Parse the user options passed from the command-line
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Run the pipeline. This is the entrypoint that GCP will use
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("Read from BigQuery query",
BigQueryIO.readTableRows().fromQuery(options.getBigQuerySql()).usingStandardSql().withoutValidation()
.withTemplateCompatibility())
.apply("Map data to PubsubMessage",
ParDo.of(
new MapQueryToPubsub(
options.getBigQueryTargetDataset(),
options.getBigQueryTargetTable(),
options.getSourceSystem()
)
)
)
.apply("Write message to PubSub", PubsubIO.writeMessages().to(options.getOutputTopic()));
return pipeline.run();
}
}
被查询的 BigQuery 数据本质上是事件日志。我们最近确定将这些事件插入 pubsub 主题的顺序很重要。我们可以通过在针对 BigQuery 发出的查询中使用 ORDER BY 来确定正确的顺序,但是我们怀疑在将数据插入 pubsub 主题时是否会遵守该顺序。
我们主要关注的是这段代码:
pipeline.apply("Read from BigQuery query",
BigQueryIO.readTableRows().fromQuery(options.getBigQuerySql()).usingStandardSql().withoutValidation()
.withTemplateCompatibility())
该简单命令在 Dataflow 中显示为:
该步骤中发生了很多事情(洗牌等...),实际上许多子步骤本身由多个子步骤组成。此外,其中一个子步骤称为“ReadFiles”,这让我觉得 Dataflow 可能正在将数据写入某种临时文件存储。总而言之,这让我怀疑当行发布到 pubsub 时,提供的 SQL 查询中的 ORDER BY 是否会被保留。
beam/Dataflow 是否提供任何保证 ORDER BY 将在这种情况下保留,或者我是否必须在我的管道中引入一种排序以保证遵守所需的顺序?
BigQueryIO Read 基本上包括一个导入 GCS 的作业作为 query/table 的 Avro,然后从这些文件(以及更多内容)中读取。所以它不会保留顺序,因为读取是并行的,并且会有多个线程读取已创建文件的块。
一般来说,像 Dataflow(或 Spark 等)这样的分布式处理系统不会保留顺序,并且鉴于其工作的并行性质,它们不擅长对事物进行排序。请记住,要对元素进行排序,您需要将所有内容保存在一个 worker 中。
事实上,即使在 BigQuery 中,ORDER BY 也是一项要求很高的任务。
很难为此找到解决方法,因为系统不是为此类任务构建的。我可以考虑添加 ROW NUMBER,将其用作时间戳并添加 window,但这是特定于用例的。
此外,PubSubIO 不会在发布时保留顺序。