将数据流式传输到 Google BigQuery 表:使用 InsertId 删除重复记录的问题
Streaming Data into Google BigQuery Tables : problems using InsertId to De-Duplicate Records
我们正在使用 Camel BigQuery API(版本 2.20)从 ActiveMQ 上的消息队列流式传输记录服务器(版本 5.14.3)到 Google BigQuery table.
我们已经在我们的主站点上的 Spring 框架 运行ning 中将流机制作为 XML 路由定义来实现和部署而且看起来效果不错。
<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
./spring-beans.xsd
http://camel.apache.org/schema/spring
./camel-spring.xsd">
<!--
# ==========================================================================
# ActiveMQ JMS Bean Definition
# ==========================================================================
-->
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="nio://192.168.10.10:61616?jms.useAsyncSend=true" />
<property name="userName" value="MyAmqUserName" />
<property name="password" value="MyAmqPassword" />
</bean>
</property>
</bean>
<!--
# ==========================================================================
# GoogleBigQueryComponent
# https://github.com/apache/camel/tree/master/components/camel-google-bigquery
# ==========================================================================
-->
<bean id="gcp" class="org.apache.camel.component.google.bigquery.GoogleBigQueryComponent">
<property name="connectionFactory">
<bean class="org.apache.camel.component.google.bigquery.GoogleBigQueryConnectionFactory">
<property name="credentialsFileLocation" value="MyDir/MyGcpKeyFile.json" />
</bean>
</property>
</bean>
<!--
# ==========================================================================
# Main Context Bean Definition
# ==========================================================================
-->
<camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring" >
<!--
# ==================================================================
# Message Route :
# 1. consume messages from my AMQ queue
# 2. set the InsertId / INSERT_ID (it is not clear which is the correct one)
# 3. write message to Google BigQuery table
# see https://github.com/apache/camel/blob/master/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc
# ==================================================================
<log message="${headers} | ${body}" />
-->
<route>
<from uri="jms:my.amq.queue.of.output.data.for.gcp?acknowledgementModeName=DUPS_OK_ACKNOWLEDGE&concurrentConsumers=20" />
<setHeader headerName="CamelGoogleBigQuery.InsertId">
<simple>${header.KeyValuePreviouslyGenerated}</simple>
</setHeader>
<setHeader headerName="GoogleBigQueryConstants.INSERT_ID">
<simple>${header.KeyValuePreviouslyGenerated}</simple>
</setHeader>
<to uri="gcp:my_gcp_project:my_bq_data_set:my_bq_table" />
</route>
</camelContext>
</beans>
为了获得高(呃)可用性,我们现在已经将相同的实现部署到我们的备份站点,流式传输到相同的目标 BigQuery table。正如预期的那样,相同的记录从两个站点流式传输到相同的 table,存在重复记录。为了消除记录重复,我们正在尝试遵循此处给出的指导:
https://camel.apache.org/staging/components/latest/google-bigquery-component.html
Message Headers 部分建议使用 suitable 运行-时间键值.
但是在同一页下方,确保数据一致性部分建议设置GoogleBigQueryConstants.INSERT_ID.
我们已经检查过我们的主服务器和备用服务器 运行ning 在同一时区 (UTC),并且我们正在生成我们认为合适的内容table 运行时间唯一键:包含精确到秒的 UNIX 时间的字符串。
我们上面的代码示例显示我们已经尝试了这两种方法,但是对我们目标 BigQuery table 中的数据的审查表明这两种方法似乎都不起作用,即我们仍然有重复的记录。
问题
- 我们在上面的代码中设置 InsertID / INSERT_ID 的方式是否有错误?
- 您是否使用过 Camel Google BigQuery API 将数据流式传输到 BigQuery 中?
- 如果是,你是否成功使用了InsertId / INSERT_ID去重机制?如果有,是哪一个以及如何?
- 您观察到的重复数据删除时间是多少 window(s)?
GoogleBigQueryConstants.INSERT_ID
是字符串常量,值为 CamelGoogleBigQueryInsertId
。
这样使用:
<setHeader headerName="CamelGoogleBigQueryInsertId">
<simple>${header.KeyValuePreviouslyGenerated}</simple>
</setHeader>
演示此行为的单元测试位于此处:InsertIdTest.java
关于这些 headers 的文档有点过时,我已经修复了它,正确的版本可以在 google-bigquery-component.adoc 中找到。即将在网站上发布。
我们正在使用 Camel BigQuery API(版本 2.20)从 ActiveMQ 上的消息队列流式传输记录服务器(版本 5.14.3)到 Google BigQuery table.
我们已经在我们的主站点上的 Spring 框架 运行ning 中将流机制作为 XML 路由定义来实现和部署而且看起来效果不错。
<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://www.springframework.org/schema/beans"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
./spring-beans.xsd
http://camel.apache.org/schema/spring
./camel-spring.xsd">
<!--
# ==========================================================================
# ActiveMQ JMS Bean Definition
# ==========================================================================
-->
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="nio://192.168.10.10:61616?jms.useAsyncSend=true" />
<property name="userName" value="MyAmqUserName" />
<property name="password" value="MyAmqPassword" />
</bean>
</property>
</bean>
<!--
# ==========================================================================
# GoogleBigQueryComponent
# https://github.com/apache/camel/tree/master/components/camel-google-bigquery
# ==========================================================================
-->
<bean id="gcp" class="org.apache.camel.component.google.bigquery.GoogleBigQueryComponent">
<property name="connectionFactory">
<bean class="org.apache.camel.component.google.bigquery.GoogleBigQueryConnectionFactory">
<property name="credentialsFileLocation" value="MyDir/MyGcpKeyFile.json" />
</bean>
</property>
</bean>
<!--
# ==========================================================================
# Main Context Bean Definition
# ==========================================================================
-->
<camelContext id="camelContext" xmlns="http://camel.apache.org/schema/spring" >
<!--
# ==================================================================
# Message Route :
# 1. consume messages from my AMQ queue
# 2. set the InsertId / INSERT_ID (it is not clear which is the correct one)
# 3. write message to Google BigQuery table
# see https://github.com/apache/camel/blob/master/components/camel-google-bigquery/src/main/docs/google-bigquery-component.adoc
# ==================================================================
<log message="${headers} | ${body}" />
-->
<route>
<from uri="jms:my.amq.queue.of.output.data.for.gcp?acknowledgementModeName=DUPS_OK_ACKNOWLEDGE&concurrentConsumers=20" />
<setHeader headerName="CamelGoogleBigQuery.InsertId">
<simple>${header.KeyValuePreviouslyGenerated}</simple>
</setHeader>
<setHeader headerName="GoogleBigQueryConstants.INSERT_ID">
<simple>${header.KeyValuePreviouslyGenerated}</simple>
</setHeader>
<to uri="gcp:my_gcp_project:my_bq_data_set:my_bq_table" />
</route>
</camelContext>
</beans>
为了获得高(呃)可用性,我们现在已经将相同的实现部署到我们的备份站点,流式传输到相同的目标 BigQuery table。正如预期的那样,相同的记录从两个站点流式传输到相同的 table,存在重复记录。为了消除记录重复,我们正在尝试遵循此处给出的指导:
https://camel.apache.org/staging/components/latest/google-bigquery-component.html
Message Headers 部分建议使用 suitable 运行-时间键值.
但是在同一页下方,确保数据一致性部分建议设置GoogleBigQueryConstants.INSERT_ID.
我们已经检查过我们的主服务器和备用服务器 运行ning 在同一时区 (UTC),并且我们正在生成我们认为合适的内容table 运行时间唯一键:包含精确到秒的 UNIX 时间的字符串。
我们上面的代码示例显示我们已经尝试了这两种方法,但是对我们目标 BigQuery table 中的数据的审查表明这两种方法似乎都不起作用,即我们仍然有重复的记录。
问题
- 我们在上面的代码中设置 InsertID / INSERT_ID 的方式是否有错误?
- 您是否使用过 Camel Google BigQuery API 将数据流式传输到 BigQuery 中?
- 如果是,你是否成功使用了InsertId / INSERT_ID去重机制?如果有,是哪一个以及如何?
- 您观察到的重复数据删除时间是多少 window(s)?
GoogleBigQueryConstants.INSERT_ID
是字符串常量,值为 CamelGoogleBigQueryInsertId
。
这样使用:
<setHeader headerName="CamelGoogleBigQueryInsertId">
<simple>${header.KeyValuePreviouslyGenerated}</simple>
</setHeader>
演示此行为的单元测试位于此处:InsertIdTest.java
关于这些 headers 的文档有点过时,我已经修复了它,正确的版本可以在 google-bigquery-component.adoc 中找到。即将在网站上发布。