编写 MapReduce ETL 作业

Write MapReduce ETL job

我在 mysql 中有一个 table 约 300GB 的数据。我想在 table 中添加新列。当我使用 alter 命令更改 table 时,它永远不会完成并且进程终止。所以我计划编写 SQOOP 作业以从 table 获取所有数据并将其转储到 HDFS,然后在 mysql 中创建新数据库并创建带有附加列的 table 并从中重新导入数据再次使用 HDFS。

table structure:
CREATE TABLE `nodes` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `type` int(11) NOT NULL,
  `name` varchar(50) NOT NULL,
  `postcode` varchar(4) DEFAULT NULL,
  `updated` datetime DEFAULT NULL,
  `db_updated` datetime DEFAULT CURRENT_TIMESTAMP,
  `useragent` bigint(20) DEFAULT NULL,
  `last_seen` date DEFAULT NULL, --newly added column
  PRIMARY KEY (`id`),
  UNIQUE KEY `akaid_index` (`type`,`name`),
  KEY `useragent_idx` (`useragent`),
  KEY `type` (`type`),
  CONSTRAINT `useragentfk` FOREIGN KEY (`useragent`) REFERENCES `useragents` (`id`) ON DELETE NO ACTION ON UPDATE NO ACTION
) ENGINE=InnoDB AUTO_INCREMENT=1091725696 DEFAULT CHARSET=latin1;

SQOOP 命令:

sqoop export --connect jdbc:mysql://localhost:3306/graph1 --table nodes --username root --password password --export-dir <dir-path> --input-fields-terminated-by ','

我遇到以下错误:

Error: java.io.IOException: Can't export data, please check failed map task logs
    at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:112)
    at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:39)
    at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:146)
    at org.apache.sqoop.mapreduce.AutoProgressMapper.run(AutoProgressMapper.java:64)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:787)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:164)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)
Caused by: java.lang.RuntimeException: Can't parse input data: 'NULL'
    at nodes.__loadFromFields(nodes.java:470)
    at nodes.parse(nodes.java:388)
    at org.apache.sqoop.mapreduce.TextExportMapper.map(TextExportMapper.java:83)
    ... 10 more
Caused by: java.lang.NumberFormatException: For input string: "NULL"
    at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
    at java.lang.Long.parseLong(Long.java:441)
    at java.lang.Long.valueOf(Long.java:540)
    at nodes.__loadFromFields(nodes.java:467)
    ... 12 more

HDFS 中的文件包含以下记录:

1289603991,1,fee1cee723bdb0bc499c443765b40e3d,,2016-04-13 10:19:59,2016-04-14 03:44:55,5296252
1289603992,1,edf65c2e7b89388fe9068cc3a898a3fd,,2016-04-13 10:20:00,2016-04-14 03:44:55,5411481
1289603993,1,5760fd1cca92a65ce6f2db43853fc118,,2016-04-13 10:19:59,2016-04-14 03:44:55,4441745
1289603994,1,65dd92c80df5581f55bc60f3e997ec05,,2016-04-13 10:19:59,2016-04-14 03:44:55,5332084
1289603995,1,7654a84428f3064828f5972cfce5f8e6,,2016-04-13 10:20:00,2016-04-14 03:44:55,5202243
1289603996,1,84c270212fe5f3a52cb2bd75403da058,,2016-04-13 10:20:00,2016-04-14 03:44:55,5398729
1289603997,1,a486382c4fc296a5e8d3c0491568c22c,,2016-04-13 10:19:57,2016-04-14 03:44:55,5289170
1289603998,112,2_3Nns7YXPmS_xv3imJBiw04BQf1sNc2tJrtFJ5TCx98,,2016-04-13 10:20:00,2016-04-14 03:44:55,NULL
1289603999,1,a3607df77e025b12c1728f62589857fa,,2016-04-13 10:19:59,2016-04-14 03:44:55,12
1289604000,113,570e1d4e6372cd9c,,2016-04-13 10:19:59,2016-04-14 03:44:55,NULL
1289604001,113,57023dd016258fbf,,2016-04-13 10:20:00,2016-04-14 03:44:55,NULL

添加 --input-null-string 和 --input-null-non-string sqoop 作业后工作正常,但现在失败,原因如下:

2016-06-07 18:11:37,750 ERROR [Thread-9] org.apache.sqoop.mapreduce.AsyncSqlOutputFormat: Got exception in update thread: com.mysql.jdbc.exceptions.jdbc4.MySQLIntegrityConstraintViolationException: Duplicate entry '1289603991' for key 'PRIMARY'
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
    at com.mysql.jdbc.Util.getInstance(Util.java:387)
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:934)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526)
    at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
    at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2549)
    at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)
    at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1192)
    at org.apache.sqoop.mapreduce.AsyncSqlOutputFormat$AsyncSqlExecThread.run(AsyncSqlOutputFormat.java:233)

由于 table 有 AUTO_INCREMENT 列,我遇到了以上错误。有没有其他方法可以改变 table 并使其变快?根据这个线程(Changing Large MySQL InnoDB Tables),似乎大型 InnoDB tables 需要更多的时间甚至几天。

如果您能提出任何替代建议,那就太好了。

您的数据违反了主键约束:

If your table has constraints (e.g., a primary key column whose values must be unique) and already contains data, you must take care to avoid inserting records that violate these constraints.

您需要在执行导出命令之前截断 table,或者使用此参数更新现有数据:

--update-key id