Cloud Dataflow、PubSub 和 Bigquery (TableRowJsonCoder) 问题
Cloud Dataflow, PubSub & Bigquery (TableRowJsonCoder) Issues
我正在使用 Cloud Dataflow、PubSub 和 Bigquery 读取 JSON Pubsub 消息,使用 TableRowJsonCoder 将 JSON 转换为 table,然后将它们写入 Bigquery。
我的问题是一致性,以下代码有时会起作用。没有错误被抛出。我确定我正在将消息正确发布到 Pubsub 主题。我也确定 Dataflow 正在读取每条消息。我已经使用 gcloud 命令行工具对此进行了测试。
gcloud beta pubsub subscriptions pull --auto-ack SUBSCRIPTION-NAME
我有两个主题订阅,一个由 Dataflow 阅读,一个由我在终端阅读。该代码也成功地将 JSON 数据格式化为 table 格式,并将其写入我指定的数据集和 table,当我觉得它时:(
我的假设是我并不真正理解发生了什么,我缺少与 windowing 相关的内容,其中每个 window 应该是一条消息。
假设我发送了 50 条消息,数据流似乎只读取了大约一半的元素。这是我的第一个问题,这与元素被视为一定数量的字节或消息有关吗?我该如何解决这个问题?我正在使用 TableRowJSONCoder.
读取数据
然后似乎又出现了类似的问题,对于 X 元素,只有一小部分成功通过了 Groupbykey。如果我能进一步解决问题,我对问题的描述会更深入。请注意,"id" 字段总是不固定的,所以我认为这与重复无关,但我可能是错的。
就在我写这条消息的时候,添加的元素已经上升到 41 并且 bigquery 的输出已经上升到 12。我只是等待的时间不够长吗?我的测试数据是否很小(总是低于 100 条消息)?即使它最终保存了我所有的行,花一个多小时来完成这似乎也太长了。
dataflow console
The succesfully inserted data
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.example;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import java.util.ArrayList;
import java.util.List;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A starter example for writing Google Cloud Dataflow programs.
*
* <p>The example takes two strings, converts them to their upper-case
* representation and logs them.
*
* <p>To run this starter example locally using DirectPipelineRunner, just
* execute it without any additional parameters from your favorite development
* environment.
*
* <p>To run this starter example using managed resource in Google Cloud
* Platform, you should specify the following command-line options:
* --project=<YOUR_PROJECT_ID>
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
* --runner=BlockingDataflowPipelineRunner
*/
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
static final int WINDOW_SIZE = 1; // Default window duration in minutes
private final static String PROJECT_ID = "dataflow-project";
private final static String PUBSUB_TOPIC = "projects/dataflow-project/topics/pub-sub-topic";
private final static String DATASET_ID = "test_dataset";
private final static String TABLE_ID = "test_table_version_one";
private static TableSchema getSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("id").setType("STRING"));
fields.add(new TableFieldSchema().setName("ip").setType("STRING"));
fields.add(new TableFieldSchema().setName("installation_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("user_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("device_type").setType("STRING"));
fields.add(new TableFieldSchema().setName("language").setType("STRING"));
fields.add(new TableFieldSchema().setName("application_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
private static TableReference getTableReference() {
TableReference tableRef = new TableReference();
tableRef.setProjectId(PROJECT_ID);
tableRef.setDatasetId(DATASET_ID);
tableRef.setTableId(TABLE_ID);
return tableRef;
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setStreaming(true);
Pipeline pipeline = Pipeline.create(dataflowOptions);
LOG.info("Reading from PubSub.");
PCollection<TableRow> input = pipeline
.apply(PubsubIO.Read.topic(PUBSUB_TOPIC).withCoder(TableRowJsonCoder.of()))
.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));
input
.apply(BigQueryIO.Write.to(getTableReference()).withSchema(getSchema()));
pipeline.run();
}
}
我还感兴趣的是将时间戳和记录 ID 指定为 "timestamp" 和 "id" 字段。
问题是您的 GCE VM 的网络配置错误。 Dataflow 要求 VM 能够通过 TCP 进行通信,而您的防火墙规则不允许这样做。添加防火墙规则以允许 VM 之间的一般 TCP 连接将解决此问题。
有些数据通过你的管道缓慢的原因是因为有时你很幸运,数据只需要在一台机器上处理。 Pubsub 最终会超时并重试消息,因此它们最终都会通过。
我正在使用 Cloud Dataflow、PubSub 和 Bigquery 读取 JSON Pubsub 消息,使用 TableRowJsonCoder 将 JSON 转换为 table,然后将它们写入 Bigquery。
我的问题是一致性,以下代码有时会起作用。没有错误被抛出。我确定我正在将消息正确发布到 Pubsub 主题。我也确定 Dataflow 正在读取每条消息。我已经使用 gcloud 命令行工具对此进行了测试。
gcloud beta pubsub subscriptions pull --auto-ack SUBSCRIPTION-NAME
我有两个主题订阅,一个由 Dataflow 阅读,一个由我在终端阅读。该代码也成功地将 JSON 数据格式化为 table 格式,并将其写入我指定的数据集和 table,当我觉得它时:(
我的假设是我并不真正理解发生了什么,我缺少与 windowing 相关的内容,其中每个 window 应该是一条消息。
假设我发送了 50 条消息,数据流似乎只读取了大约一半的元素。这是我的第一个问题,这与元素被视为一定数量的字节或消息有关吗?我该如何解决这个问题?我正在使用 TableRowJSONCoder.
读取数据然后似乎又出现了类似的问题,对于 X 元素,只有一小部分成功通过了 Groupbykey。如果我能进一步解决问题,我对问题的描述会更深入。请注意,"id" 字段总是不固定的,所以我认为这与重复无关,但我可能是错的。
就在我写这条消息的时候,添加的元素已经上升到 41 并且 bigquery 的输出已经上升到 12。我只是等待的时间不够长吗?我的测试数据是否很小(总是低于 100 条消息)?即使它最终保存了我所有的行,花一个多小时来完成这似乎也太长了。
dataflow console
The succesfully inserted data
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.example;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath;
import java.util.ArrayList;
import java.util.List;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A starter example for writing Google Cloud Dataflow programs.
*
* <p>The example takes two strings, converts them to their upper-case
* representation and logs them.
*
* <p>To run this starter example locally using DirectPipelineRunner, just
* execute it without any additional parameters from your favorite development
* environment.
*
* <p>To run this starter example using managed resource in Google Cloud
* Platform, you should specify the following command-line options:
* --project=<YOUR_PROJECT_ID>
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
* --runner=BlockingDataflowPipelineRunner
*/
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
static final int WINDOW_SIZE = 1; // Default window duration in minutes
private final static String PROJECT_ID = "dataflow-project";
private final static String PUBSUB_TOPIC = "projects/dataflow-project/topics/pub-sub-topic";
private final static String DATASET_ID = "test_dataset";
private final static String TABLE_ID = "test_table_version_one";
private static TableSchema getSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("id").setType("STRING"));
fields.add(new TableFieldSchema().setName("ip").setType("STRING"));
fields.add(new TableFieldSchema().setName("installation_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("user_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("device_type").setType("STRING"));
fields.add(new TableFieldSchema().setName("language").setType("STRING"));
fields.add(new TableFieldSchema().setName("application_id").setType("STRING"));
fields.add(new TableFieldSchema().setName("timestamp").setType("TIMESTAMP"));
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
private static TableReference getTableReference() {
TableReference tableRef = new TableReference();
tableRef.setProjectId(PROJECT_ID);
tableRef.setDatasetId(DATASET_ID);
tableRef.setTableId(TABLE_ID);
return tableRef;
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
dataflowOptions.setStreaming(true);
Pipeline pipeline = Pipeline.create(dataflowOptions);
LOG.info("Reading from PubSub.");
PCollection<TableRow> input = pipeline
.apply(PubsubIO.Read.topic(PUBSUB_TOPIC).withCoder(TableRowJsonCoder.of()))
.apply(Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(1))));
input
.apply(BigQueryIO.Write.to(getTableReference()).withSchema(getSchema()));
pipeline.run();
}
}
我还感兴趣的是将时间戳和记录 ID 指定为 "timestamp" 和 "id" 字段。
问题是您的 GCE VM 的网络配置错误。 Dataflow 要求 VM 能够通过 TCP 进行通信,而您的防火墙规则不允许这样做。添加防火墙规则以允许 VM 之间的一般 TCP 连接将解决此问题。
有些数据通过你的管道缓慢的原因是因为有时你很幸运,数据只需要在一台机器上处理。 Pubsub 最终会超时并重试消息,因此它们最终都会通过。