GCP Dataflow Kafka(作为 Azure 事件中心)-> 大查询
GCP Dataflow Kafka (as Azure Event Hub) -> Big Query
TDLR;
我有一个支持 Kafka 的 Azure 事件中心,我正在尝试从 Google Cloud 的数据流服务连接到该中心,以将数据流式传输到 Google Big Query。我可以成功地使用 Kafka CLI 与 Azure 事件中心对话。但是,对于 GCP,5 分钟后,我在 GCP 数据流作业 window.
中收到超时错误
启用 Kafka 的 Azure EH -> GCP 数据流 -> GCP 大查询 table
详情
为了设置支持 Kafka 的事件中心,我遵循了 this GitHub page 上的详细信息。它让开发人员添加 jaas.conf
和 client_common.properties
。 jaas.conf
包括对登录模块的引用以及 username/password。 Kafka 事件中心的用户名是 $ConnectionString
。密码是从 CLI 复制的连接字符串。 client_common.properties
包含两个标志:security.protocol=SASL_SSL
和 sasl.mechanism=PLAIN
。通过配置这些文件,我能够使用 Kafka CLI 工具和 Azure 事件中心发送和接收数据。我可以通过 Azure 事件中心看到从生产者流向消费者的数据。
export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
(echo -n "1|"; cat message.json | jq . -c) | kafka-conle-producer.sh --topic test-event-hub --broker-list test-eh-namespace.servicebus.windows.net:9093 --producer.config client_common.properties --property "parse.key=true" --property "key.separator=|"
kafka-console-consumer.sh --topic test-event-hub --bootstrap-server test-eh-namespace.servicebus.windows.net:9093 --consumer.config client_common.properties --property "print.key=true"
# prints: 1 { "transaction_time": "2020-07-20 15:14:54", "first_name": "Joe", "last_name": "Smith" }
我为 Kafka -> Big Query 修改了 Google's Data Flow template。已经为偏移量的重置指定了一个配置映射。我添加了额外的配置以匹配 Azure 事件中心与 Kafka 教程。虽然不是最佳实践,但我将连接字符串添加到密码字段以进行测试。当我将它上传到 GCP 数据流引擎和 运行 作业时,我在日志中每 5 分钟收到一次超时错误并且 Google 大查询中没有任何结果。
工作命令
gcloud dataflow jobs run kafka-test --gcs-location=<removed> --region=us-east1 --worker-zone=us-east4-a --parameters bootstrapServers=test-eh-namespace.servicebus.servicebus.windows.net:9093,inputTopic=test-event-hub,outputTableSpec=project:Kafka_Test.test --service-account-email my-service-account.iam.gserviceaccount.com
GCP 数据流中的错误
# these errors show up in the worker logs
Operation ongoing in step ReadFromKafka/KafkaIO.Read/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds for at least 05m00s without outputting or completing in state process at java.lang.Thread.sleep(Native Method) at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:45) at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1481) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.updatedSpecWithAssignedPartitions(KafkaUnboundedSource.java:85) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:125) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45) at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReader.iterator(WorkerCustomSources.java:433) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:186) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163) at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1426) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access00(StreamingDataflowWorker.java:163) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.run(StreamingDataflowWorker.java:1105) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Execution of work for computation 'S4' on key '0000000000000001' failed with uncaught exception. Work will be retried locally.
# this error shows up in the Job log
Error message from worker: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
更新配置
Map<String, Object> props = new HashMap<>();
// azure event hub authentication
props.put("sasl.mechanism", "PLAIN");
props.put("security.protocol", "SASL_SSL")
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<removed>\";");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/CONFIGURATION.md
props.put("request.timeout.ms", 60000);
props.put("session.timeout.ms", 15000);
props.put("max.poll.interval.ms", 30000);
props.put("offset.metadata.max.bytes", 1024);
props.put("connections.max.idle.ms", 180000);
props.put("metadata.max.age.ms", 180000);
流水线
PCollectionTuple convertedTableRows =
pipeline
/*
* Step #1: Read messages in from Kafka
*/
.apply(
"ReadFromKafka",
KafkaIO.<String, String>read()
.withConsumerConfigUpdates(ImmutableMap.of(props))
.withBootstrapServers(options.getBootstrapServers())
.withTopics(topicsList)
.withKeyDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withValueDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withoutMetadata())
/*
* Step #2: Transform the Kafka Messages into TableRows
*/
.apply("ConvertMessageToTableRow", new MessageToTableRow(options));
相关问题
- Kafka send to azure event hub
- Write to ConfluentCloud from Apache Beam (GCP Dataflow)
概述
- 配置环境变量
- 修改、构建并上传到 GCP 的 Container Registry
- 创建数据流图像规范
- 使用 Dataflow 执行图像
此应用程序具有从 GCP Data Flow templates 移植而来的复杂构建过程。构建过程引入了作为依赖项引入的 GCP Dataflow docker 图像构建和部署脚本。只需克隆存储库即可开始。
先决条件
- 安装Google Cloud CLI Tool
- (可选,但如果未完成,需要从 build/deploy 命令中删除托管服务 ID)托管服务 ID
- 设置 GCP managed service ID
- 创建 static key
- 将 GCR Authentication Helper 设置为托管服务 ID,以从您的本地计算机向 GCR 进行身份验证
- 克隆Repo
配置环境变量
第一步是设置环境变量以配置给定应用程序的构建和部署脚本。
export PROJECT=test-project
export IMAGE_NAME=test-project
export BUCKET_NAME=gs://test-project
export TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
export BASE_CONTAINER_IMAGE=gcr.io/dataflow-templates-base/java8-template-launcher-base
export BASE_CONTAINER_IMAGE_VERSION=latest
export TEMPLATE_MODULE=kafka-to-bigquery
export APP_ROOT=/template/${TEMPLATE_MODULE}
export COMMAND_SPEC=${APP_ROOT}/resources/${TEMPLATE_MODULE}-command-spec.json
export TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_MODULE}-image-spec.json
export BOOTSTRAP=<event_grid_name>.servicebus.windows.net:9093
export TOPICS=<event_grid_topic_name>
export OUTPUT_TABLE=test-project:<schema>.test
export AUTHENTICATION_STRING="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<EVENT_GRID_TOPIC_APP_SECRET>\";"
修改、构建和上传项目
构建之前,您需要使用额外的内容更新 ./kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQuery.java 文件以处理身份验证字符串:
public class KafkaToBigQuery {
public interface Options extends PipelineOptions {
@Description("Kafka Authentication String")
@Required
String getAuthenticationString();
void setAuthenticationString(String authenticationString);
}
public static PipelineResult run(Options options) {
Map<String, Object> props = new HashMap<>();
props.put("sasl.mechanism", "PLAIN");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", options.getAuthenticationString());
// https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/CONFIGURATION.md
props.put("request.timeout.ms", 60000);
props.put("session.timeout.ms", 15000);
props.put("max.poll.interval.ms", 30000);
props.put("offset.metadata.max.bytes", 1024);
props.put("connections.max.idle.ms", 180000);
props.put("metadata.max.age.ms", 180000);
PCollectionTuple convertedTableRows =
pipeline
/*
* Step #1: Read messages in from Kafka
*/
.apply(
"ReadFromKafka",
KafkaIO.<String, String>read()
.withConsumerConfigUpdates(props)
.withBootstrapServers(options.getBootstrapServers())
.withTopics(topicsList)
.withKeyDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withValueDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withoutMetadata())
}
}
设置项目并更改文件后,下一阶段是构建 docker 图像以上传到 Google 的 Container Registry。此命令还将构建与其他 Google 服务交互的 common
文件。如果构建成功,容器将被推送到 Google 容器注册表 (GCR)。从 GCR,您可以部署到 Google Dataflow。
mvn clean package -Dimage=${TARGET_GCR_IMAGE} \
-Dbase-container-image=${BASE_CONTAINER_IMAGE} \
-Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
-Dapp-root=${APP_ROOT} \
-Dcommand-spec=${COMMAND_SPEC} \
-am -pl ${TEMPLATE_MODULE}
创建并上传图像规范(仅完成一次)
在 Dataflow 中启动项目之前,Dataflow 运行ner 需要一个 Flex 模板来了解如何执行项目。 Flex 模板是一个 JSON 元数据文件,其中包含用于构建 GCP 数据流应用程序的参数和说明。必须将 Flex 模板上传到 Google 云存储 (GCS) 到由环境变量设置的相应存储桶名称。这一步必须匹配这个环境变量TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_MODULE}-image-spec.json
.
{
"image": "gcr.io/<my-project-url>:latest",
"metadata": {
"name": "Streaming data generator",
"description": "Generates Synthetic data as per user specified schema at a fixed QPS and writes to Sink of user choice.",
"parameters": [
{
"name": "authenticationString",
"label": "Kafka Event Hub Authentication String",
"helpText": "The authentication string for the Azure Event Hub",
"is_optional": false,
"regexes": [
".+"
],
"paramType": "TEXT"
},
{
"name": "bootstrapServers",
"label": "Kafka Broker IP",
"helpText": "The Kafka broker IP",
"is_optional": false,
"regexes": [
".+"
],
"paramType": "TEXT"
},
{
"name": "inputTopics",
"label": "PubSub Topic name",
"helpText": "The name of the topic to which the pipeline should publish data. For example, projects/<project-id>/topics/<topic-name> - should match the Event Grid Topic",
"is_optional": false,
"regexes": [
".+"
],
"paramType": "PUBSUB_TOPIC"
},
{
"name": "outputTableSpec",
"label": "Output BigQuery table",
"helpText": "Output BigQuery table. For example, <project>:<dataset>.<table_name>. Mandatory when sinkType is BIGQUERY.",
"isOptional": false,
"regexes": [
".+:.+\..+"
],
"paramType": "TEXT"
},
{
"name": "outputDeadletterTable",
"label": "Output Deadletter table",
"helpText": "Output Deadletter table. For example, <project>:<dataset>.<table_name>",
"isOptional": true,
"regexes": [
".+:.+\..+"
],
"paramType": "TEXT"
}
]
},
"sdk_info": {
"language": "JAVA"
}
}
使用数据流执行图像
将图像上传到 GCP 并上传 Flex 模板后,您可以启动 Dataflow 应用程序。参数必须与 Flex 模板的元数据部分中包含的参数相匹配。
export JOB_NAME="${TEMPLATE_MODULE}-`date +%Y%m%d-%H%M%S-%N`"
gcloud beta dataflow flex-template run ${JOB_NAME} \
--project=${PROJECT} --region=us-east1 \
--template-file-gcs-location=${TEMPLATE_IMAGE_SPEC} \
--parameters ^~^outputTableSpec=${OUTPUT_TABLE}~inputTopics=${TOPICS}~bootstrapServers=${BOOTSTRAP}~authenticationString="${AUTHENTICATION_STRING}" \
--verbosity=info \
--service-account-email=<service_account_to_execute_service>
执行运行此命令后,请检查 GCP Cloud Console 以查看状态。数据流作业此时应该可以成功地从 Azure 事件网格中提取消息并将它们插入到 Google 大查询中。
GCP 存储库假定 Google 大 Query/Dataflow 将动态地使表具有正确的行,但是 YMMV 我发现这很奇怪。解决方法是在 运行 数据流作业之前在 Google Big Query 中创建模式。
TDLR;
我有一个支持 Kafka 的 Azure 事件中心,我正在尝试从 Google Cloud 的数据流服务连接到该中心,以将数据流式传输到 Google Big Query。我可以成功地使用 Kafka CLI 与 Azure 事件中心对话。但是,对于 GCP,5 分钟后,我在 GCP 数据流作业 window.
中收到超时错误启用 Kafka 的 Azure EH -> GCP 数据流 -> GCP 大查询 table
详情
为了设置支持 Kafka 的事件中心,我遵循了 this GitHub page 上的详细信息。它让开发人员添加 jaas.conf
和 client_common.properties
。 jaas.conf
包括对登录模块的引用以及 username/password。 Kafka 事件中心的用户名是 $ConnectionString
。密码是从 CLI 复制的连接字符串。 client_common.properties
包含两个标志:security.protocol=SASL_SSL
和 sasl.mechanism=PLAIN
。通过配置这些文件,我能够使用 Kafka CLI 工具和 Azure 事件中心发送和接收数据。我可以通过 Azure 事件中心看到从生产者流向消费者的数据。
export KAFKA_OPTS="-Djava.security.auth.login.config=jaas.conf"
(echo -n "1|"; cat message.json | jq . -c) | kafka-conle-producer.sh --topic test-event-hub --broker-list test-eh-namespace.servicebus.windows.net:9093 --producer.config client_common.properties --property "parse.key=true" --property "key.separator=|"
kafka-console-consumer.sh --topic test-event-hub --bootstrap-server test-eh-namespace.servicebus.windows.net:9093 --consumer.config client_common.properties --property "print.key=true"
# prints: 1 { "transaction_time": "2020-07-20 15:14:54", "first_name": "Joe", "last_name": "Smith" }
我为 Kafka -> Big Query 修改了 Google's Data Flow template。已经为偏移量的重置指定了一个配置映射。我添加了额外的配置以匹配 Azure 事件中心与 Kafka 教程。虽然不是最佳实践,但我将连接字符串添加到密码字段以进行测试。当我将它上传到 GCP 数据流引擎和 运行 作业时,我在日志中每 5 分钟收到一次超时错误并且 Google 大查询中没有任何结果。
工作命令
gcloud dataflow jobs run kafka-test --gcs-location=<removed> --region=us-east1 --worker-zone=us-east4-a --parameters bootstrapServers=test-eh-namespace.servicebus.servicebus.windows.net:9093,inputTopic=test-event-hub,outputTableSpec=project:Kafka_Test.test --service-account-email my-service-account.iam.gserviceaccount.com
GCP 数据流中的错误
# these errors show up in the worker logs
Operation ongoing in step ReadFromKafka/KafkaIO.Read/Read(KafkaUnboundedSource)/DataflowRunner.StreamingUnboundedRead.ReadWithIds for at least 05m00s without outputting or completing in state process at java.lang.Thread.sleep(Native Method) at org.apache.kafka.common.utils.SystemTime.sleep(SystemTime.java:45) at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:366) at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1481) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.updatedSpecWithAssignedPartitions(KafkaUnboundedSource.java:85) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:125) at com.google.cloud.teleport.kafka.connector.KafkaUnboundedSource.createReader(KafkaUnboundedSource.java:45) at org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReader.iterator(WorkerCustomSources.java:433) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:186) at org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:163) at org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:92) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1426) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access00(StreamingDataflowWorker.java:163) at org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.run(StreamingDataflowWorker.java:1105) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
Execution of work for computation 'S4' on key '0000000000000001' failed with uncaught exception. Work will be retried locally.
# this error shows up in the Job log
Error message from worker: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
更新配置
Map<String, Object> props = new HashMap<>();
// azure event hub authentication
props.put("sasl.mechanism", "PLAIN");
props.put("security.protocol", "SASL_SSL")
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<removed>\";");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/CONFIGURATION.md
props.put("request.timeout.ms", 60000);
props.put("session.timeout.ms", 15000);
props.put("max.poll.interval.ms", 30000);
props.put("offset.metadata.max.bytes", 1024);
props.put("connections.max.idle.ms", 180000);
props.put("metadata.max.age.ms", 180000);
流水线
PCollectionTuple convertedTableRows =
pipeline
/*
* Step #1: Read messages in from Kafka
*/
.apply(
"ReadFromKafka",
KafkaIO.<String, String>read()
.withConsumerConfigUpdates(ImmutableMap.of(props))
.withBootstrapServers(options.getBootstrapServers())
.withTopics(topicsList)
.withKeyDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withValueDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withoutMetadata())
/*
* Step #2: Transform the Kafka Messages into TableRows
*/
.apply("ConvertMessageToTableRow", new MessageToTableRow(options));
相关问题
- Kafka send to azure event hub
- Write to ConfluentCloud from Apache Beam (GCP Dataflow)
概述
- 配置环境变量
- 修改、构建并上传到 GCP 的 Container Registry
- 创建数据流图像规范
- 使用 Dataflow 执行图像
此应用程序具有从 GCP Data Flow templates 移植而来的复杂构建过程。构建过程引入了作为依赖项引入的 GCP Dataflow docker 图像构建和部署脚本。只需克隆存储库即可开始。
先决条件
- 安装Google Cloud CLI Tool
- (可选,但如果未完成,需要从 build/deploy 命令中删除托管服务 ID)托管服务 ID
- 设置 GCP managed service ID
- 创建 static key
- 将 GCR Authentication Helper 设置为托管服务 ID,以从您的本地计算机向 GCR 进行身份验证
- 克隆Repo
配置环境变量
第一步是设置环境变量以配置给定应用程序的构建和部署脚本。
export PROJECT=test-project
export IMAGE_NAME=test-project
export BUCKET_NAME=gs://test-project
export TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
export BASE_CONTAINER_IMAGE=gcr.io/dataflow-templates-base/java8-template-launcher-base
export BASE_CONTAINER_IMAGE_VERSION=latest
export TEMPLATE_MODULE=kafka-to-bigquery
export APP_ROOT=/template/${TEMPLATE_MODULE}
export COMMAND_SPEC=${APP_ROOT}/resources/${TEMPLATE_MODULE}-command-spec.json
export TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_MODULE}-image-spec.json
export BOOTSTRAP=<event_grid_name>.servicebus.windows.net:9093
export TOPICS=<event_grid_topic_name>
export OUTPUT_TABLE=test-project:<schema>.test
export AUTHENTICATION_STRING="org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"<EVENT_GRID_TOPIC_APP_SECRET>\";"
修改、构建和上传项目
构建之前,您需要使用额外的内容更新 ./kafka-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/KafkaToBigQuery.java 文件以处理身份验证字符串:
public class KafkaToBigQuery {
public interface Options extends PipelineOptions {
@Description("Kafka Authentication String")
@Required
String getAuthenticationString();
void setAuthenticationString(String authenticationString);
}
public static PipelineResult run(Options options) {
Map<String, Object> props = new HashMap<>();
props.put("sasl.mechanism", "PLAIN");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config", options.getAuthenticationString());
// https://github.com/Azure/azure-event-hubs-for-kafka/blob/master/CONFIGURATION.md
props.put("request.timeout.ms", 60000);
props.put("session.timeout.ms", 15000);
props.put("max.poll.interval.ms", 30000);
props.put("offset.metadata.max.bytes", 1024);
props.put("connections.max.idle.ms", 180000);
props.put("metadata.max.age.ms", 180000);
PCollectionTuple convertedTableRows =
pipeline
/*
* Step #1: Read messages in from Kafka
*/
.apply(
"ReadFromKafka",
KafkaIO.<String, String>read()
.withConsumerConfigUpdates(props)
.withBootstrapServers(options.getBootstrapServers())
.withTopics(topicsList)
.withKeyDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withValueDeserializerAndCoder(
StringDeserializer.class, NullableCoder.of(StringUtf8Coder.of()))
.withoutMetadata())
}
}
设置项目并更改文件后,下一阶段是构建 docker 图像以上传到 Google 的 Container Registry。此命令还将构建与其他 Google 服务交互的 common
文件。如果构建成功,容器将被推送到 Google 容器注册表 (GCR)。从 GCR,您可以部署到 Google Dataflow。
mvn clean package -Dimage=${TARGET_GCR_IMAGE} \
-Dbase-container-image=${BASE_CONTAINER_IMAGE} \
-Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
-Dapp-root=${APP_ROOT} \
-Dcommand-spec=${COMMAND_SPEC} \
-am -pl ${TEMPLATE_MODULE}
创建并上传图像规范(仅完成一次)
在 Dataflow 中启动项目之前,Dataflow 运行ner 需要一个 Flex 模板来了解如何执行项目。 Flex 模板是一个 JSON 元数据文件,其中包含用于构建 GCP 数据流应用程序的参数和说明。必须将 Flex 模板上传到 Google 云存储 (GCS) 到由环境变量设置的相应存储桶名称。这一步必须匹配这个环境变量TEMPLATE_IMAGE_SPEC=${BUCKET_NAME}/images/${TEMPLATE_MODULE}-image-spec.json
.
{
"image": "gcr.io/<my-project-url>:latest",
"metadata": {
"name": "Streaming data generator",
"description": "Generates Synthetic data as per user specified schema at a fixed QPS and writes to Sink of user choice.",
"parameters": [
{
"name": "authenticationString",
"label": "Kafka Event Hub Authentication String",
"helpText": "The authentication string for the Azure Event Hub",
"is_optional": false,
"regexes": [
".+"
],
"paramType": "TEXT"
},
{
"name": "bootstrapServers",
"label": "Kafka Broker IP",
"helpText": "The Kafka broker IP",
"is_optional": false,
"regexes": [
".+"
],
"paramType": "TEXT"
},
{
"name": "inputTopics",
"label": "PubSub Topic name",
"helpText": "The name of the topic to which the pipeline should publish data. For example, projects/<project-id>/topics/<topic-name> - should match the Event Grid Topic",
"is_optional": false,
"regexes": [
".+"
],
"paramType": "PUBSUB_TOPIC"
},
{
"name": "outputTableSpec",
"label": "Output BigQuery table",
"helpText": "Output BigQuery table. For example, <project>:<dataset>.<table_name>. Mandatory when sinkType is BIGQUERY.",
"isOptional": false,
"regexes": [
".+:.+\..+"
],
"paramType": "TEXT"
},
{
"name": "outputDeadletterTable",
"label": "Output Deadletter table",
"helpText": "Output Deadletter table. For example, <project>:<dataset>.<table_name>",
"isOptional": true,
"regexes": [
".+:.+\..+"
],
"paramType": "TEXT"
}
]
},
"sdk_info": {
"language": "JAVA"
}
}
使用数据流执行图像
将图像上传到 GCP 并上传 Flex 模板后,您可以启动 Dataflow 应用程序。参数必须与 Flex 模板的元数据部分中包含的参数相匹配。
export JOB_NAME="${TEMPLATE_MODULE}-`date +%Y%m%d-%H%M%S-%N`"
gcloud beta dataflow flex-template run ${JOB_NAME} \
--project=${PROJECT} --region=us-east1 \
--template-file-gcs-location=${TEMPLATE_IMAGE_SPEC} \
--parameters ^~^outputTableSpec=${OUTPUT_TABLE}~inputTopics=${TOPICS}~bootstrapServers=${BOOTSTRAP}~authenticationString="${AUTHENTICATION_STRING}" \
--verbosity=info \
--service-account-email=<service_account_to_execute_service>
执行运行此命令后,请检查 GCP Cloud Console 以查看状态。数据流作业此时应该可以成功地从 Azure 事件网格中提取消息并将它们插入到 Google 大查询中。
GCP 存储库假定 Google 大 Query/Dataflow 将动态地使表具有正确的行,但是 YMMV 我发现这很奇怪。解决方法是在 运行 数据流作业之前在 Google Big Query 中创建模式。