如何从 Sqoop 导入中捕获已处理记录的计数?
How to capture the count of processed records from Sqoop import?
要将数据从我的数据库复制到 S3,我使用以下命令。
sqoop import -D mapreduce.job.name= xxx-D mapred.job.queue.name=user -Dhadoop.security.credential.provider.path=<path> -Dfs.s3a.server-side-encryption-algorithm=<xx>--options-file <path> --query "select col1,ID,UPDATETIME from db.table where UPDATETIME between to_date('2015-09-11 00:00:00','yyyy/mm/dd hh24:mi:ss') and to_date('2018-05-24 04:28:16','yyyy/mm/dd hh24:mi:ss') and $CONDITIONS" --hive-delims-replacement ' ' --direct --m 1 --split-by ID --target-dir <s3//path>
我能够复制数据,但我需要从同一命令获取已处理数据的计数,而无需使用 eval 等其他命令。因为与此同时其他记录可能会被吸收到源中。
我想要的是捕获这个记录数:
18/05/21 22:55:55 INFO mapreduce.ImportJobBase: Transferred 0 bytes in 47.9229 seconds (0 bytes/sec)
18/05/21 22:55:55 INFO mapreduce.ImportJobBase: Retrieved 33372 records.
我已经找到了上述查询的方法。当你在下面传递你的 sqoop 命令时
subprocess
编程并使用 .communicate 存储整个输出以及警告和信息消息。
sqoop_command ='sqoop import........'
process = subprocess.Popen(sqoop_command , stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
rec_str = process.communicate()
rec_str 包含输出。
要将数据从我的数据库复制到 S3,我使用以下命令。
sqoop import -D mapreduce.job.name= xxx-D mapred.job.queue.name=user -Dhadoop.security.credential.provider.path=<path> -Dfs.s3a.server-side-encryption-algorithm=<xx>--options-file <path> --query "select col1,ID,UPDATETIME from db.table where UPDATETIME between to_date('2015-09-11 00:00:00','yyyy/mm/dd hh24:mi:ss') and to_date('2018-05-24 04:28:16','yyyy/mm/dd hh24:mi:ss') and $CONDITIONS" --hive-delims-replacement ' ' --direct --m 1 --split-by ID --target-dir <s3//path>
我能够复制数据,但我需要从同一命令获取已处理数据的计数,而无需使用 eval 等其他命令。因为与此同时其他记录可能会被吸收到源中。
我想要的是捕获这个记录数:
18/05/21 22:55:55 INFO mapreduce.ImportJobBase: Transferred 0 bytes in 47.9229 seconds (0 bytes/sec)
18/05/21 22:55:55 INFO mapreduce.ImportJobBase: Retrieved 33372 records.
我已经找到了上述查询的方法。当你在下面传递你的 sqoop 命令时
subprocess
编程并使用 .communicate 存储整个输出以及警告和信息消息。
sqoop_command ='sqoop import........'
process = subprocess.Popen(sqoop_command , stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
rec_str = process.communicate()
rec_str 包含输出。