无主键无时间戳使用sqoop增量加载数据
Incremental data load using sqoop without primary key or timestamp
我有一个没有任何主键的 table 和 datemodified/timestamp。这个table就像一个事务table,不断保存所有数据(否delete/update)。
我现在的问题是每次 运行 增量加载时,我都想将数据注入 HDFS 而无需再次加载整个 table。
如果我的 table 有主键,下面的代码会获取导入到 HDFS 的最新行。
sqoop job \
--create tb_w_PK_DT_append \
-- \
import \
--connect jdbc:mysql://10.217.55.176:3306/SQOOP_Test \
--username root \
--incremental append \
--check-column P_id \
--last-value 0 \
--target-dir /data \
--query "SELECT * FROM tb_w_PK_DT WHERE $CONDITIONS" \
-m 1;
在不修改任何主键或日期的情况下获取最新导入数据的任何解决方案。
如果您的数据有像 rowid 这样的字段,您可以使用 --last-value in sqoop arguments 导入。
请参考https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html#_incremental_imports
You can follow these steps
1) The initial load data (previous day data) is in hdfs - Relation A
2) Import the current data into HDFS using sqoop -- Relation B
3) Use pig Load the above two hdfs directories in relation A and B define schema.
4) Convert them to tuples and join them by all columns
5) The join result will have two tuples in each row((A,B),(A,B)) , fetch the result from join where tuple B is null ((A,D),).
6) Now flatten the join by tuple A you will have new/updated records(A,D).
我知道我来晚了,但只是想分享一下以供参考。如果有一种情况,您的源 table 上没有主键列或日期列,并且您只想将增量数据 sqoop 到 hdfs。
假设有一些 table 保存数据的历史记录和每天插入的新行,您只需要将新插入的行保存到 hdfs。如果您的源是 sql 服务器,您可以在历史 table.
上创建插入或更新触发器
您可以创建一个插入触发器,如下所示:
CREATE TRIGGER transactionInsertTrigger
ON [dbo].[TransactionHistoryTable]
AFTER INSERT
AS
BEGIN
SET NOCOUNT ON;
INSERT INTO [dbo].[TriggerHistoryTable]
(
product ,price,payment_type,name,city,state,country,Last_Modified_Date
)
SELECT
product,price,payment_type,name,city,state,country,GETDATE() as Last_Modified_Date
FROM
inserted i
END
创建一个 Table 来保存主 table 上发生插入事件时的记录。保持架构与您的主要 table 相同,但是您可以向其添加额外的列。
只要有任何新行插入到您的主 TransactionHistoryTable.
中,上述触发器就会在 table 中插入一行
CREATE TABLE [dbo].[TriggerHistoryTable](
[product] [varchar](20) NULL,
[price] [int] NULL,
[payment_type] [varchar](20) NULL,
[name] [varchar](20) NULL,
[city] [varchar](20) NULL,
[state] [varchar](20) NULL,
[country] [varchar](20) NULL,
[Last_Modified_Date] [date] NULL
) ON [PRIMARY]
现在,如果我们向主 TransactionHistoryTable 插入两个新行,由于这个插入事件,我们的触发器被触发并且将这两行与主 TransactionHistoryTable 一起插入到 TriggerHistoryTable =37=]
insert into [Transaction_db].[dbo].[TransactionHistoryTable]
values
('Product3',2100,'Visa','Cindy' ,'Kemble','England','United Kingdom')
,('Product4',50000,'Mastercard','Tamar','Headley','England','United Kingdom')
;
select * from TriggerHistoryTable;
现在您可以从您的 TriggerHistoryTable 查询,它将每天插入或更新记录。您也可以使用增量 sqoop,因为我们为此添加了一个日期列。将数据导入 hdfs 后,您可以每天或每周清除此 table。这只是 sql 服务器的示例。你也可以使用 Teradata 和 oracle 以及其他数据库的触发器。您还可以设置一个 update/delete 触发器。
我有一个没有任何主键的 table 和 datemodified/timestamp。这个table就像一个事务table,不断保存所有数据(否delete/update)。
我现在的问题是每次 运行 增量加载时,我都想将数据注入 HDFS 而无需再次加载整个 table。
如果我的 table 有主键,下面的代码会获取导入到 HDFS 的最新行。
sqoop job \
--create tb_w_PK_DT_append \
-- \
import \
--connect jdbc:mysql://10.217.55.176:3306/SQOOP_Test \
--username root \
--incremental append \
--check-column P_id \
--last-value 0 \
--target-dir /data \
--query "SELECT * FROM tb_w_PK_DT WHERE $CONDITIONS" \
-m 1;
在不修改任何主键或日期的情况下获取最新导入数据的任何解决方案。
如果您的数据有像 rowid 这样的字段,您可以使用 --last-value in sqoop arguments 导入。
请参考https://sqoop.apache.org/docs/1.4.2/SqoopUserGuide.html#_incremental_imports
You can follow these steps
1) The initial load data (previous day data) is in hdfs - Relation A
2) Import the current data into HDFS using sqoop -- Relation B
3) Use pig Load the above two hdfs directories in relation A and B define schema.
4) Convert them to tuples and join them by all columns
5) The join result will have two tuples in each row((A,B),(A,B)) , fetch the result from join where tuple B is null ((A,D),).
6) Now flatten the join by tuple A you will have new/updated records(A,D).
我知道我来晚了,但只是想分享一下以供参考。如果有一种情况,您的源 table 上没有主键列或日期列,并且您只想将增量数据 sqoop 到 hdfs。
假设有一些 table 保存数据的历史记录和每天插入的新行,您只需要将新插入的行保存到 hdfs。如果您的源是 sql 服务器,您可以在历史 table.
上创建插入或更新触发器您可以创建一个插入触发器,如下所示:
CREATE TRIGGER transactionInsertTrigger
ON [dbo].[TransactionHistoryTable]
AFTER INSERT
AS
BEGIN
SET NOCOUNT ON;
INSERT INTO [dbo].[TriggerHistoryTable]
(
product ,price,payment_type,name,city,state,country,Last_Modified_Date
)
SELECT
product,price,payment_type,name,city,state,country,GETDATE() as Last_Modified_Date
FROM
inserted i
END
创建一个 Table 来保存主 table 上发生插入事件时的记录。保持架构与您的主要 table 相同,但是您可以向其添加额外的列。 只要有任何新行插入到您的主 TransactionHistoryTable.
中,上述触发器就会在 table 中插入一行CREATE TABLE [dbo].[TriggerHistoryTable](
[product] [varchar](20) NULL,
[price] [int] NULL,
[payment_type] [varchar](20) NULL,
[name] [varchar](20) NULL,
[city] [varchar](20) NULL,
[state] [varchar](20) NULL,
[country] [varchar](20) NULL,
[Last_Modified_Date] [date] NULL
) ON [PRIMARY]
现在,如果我们向主 TransactionHistoryTable 插入两个新行,由于这个插入事件,我们的触发器被触发并且将这两行与主 TransactionHistoryTable 一起插入到 TriggerHistoryTable =37=]
insert into [Transaction_db].[dbo].[TransactionHistoryTable]
values
('Product3',2100,'Visa','Cindy' ,'Kemble','England','United Kingdom')
,('Product4',50000,'Mastercard','Tamar','Headley','England','United Kingdom')
;
select * from TriggerHistoryTable;
现在您可以从您的 TriggerHistoryTable 查询,它将每天插入或更新记录。您也可以使用增量 sqoop,因为我们为此添加了一个日期列。将数据导入 hdfs 后,您可以每天或每周清除此 table。这只是 sql 服务器的示例。你也可以使用 Teradata 和 oracle 以及其他数据库的触发器。您还可以设置一个 update/delete 触发器。