使用 oozie distcp-action 将文件从 hdfs 目录复制到另一个目录
Copying files from a hdfs directory to another with oozie distcp-action
我的操作
start_fair_usage
以状态 okey 结束,但是 test_copy
returns
Main class [org.apache.oozie.action.hadoop.DistcpMain], main() threw exception, null
在 /user/comverse/data/_B
中我有很多不同的文件,其中一些我想复制到 ${NAME_NODE}/user/evkuzmin/output
。为此,我尝试从 copy_files.sh
传递 paths
,它包含指向我需要的文件的路径数组。
<action name="start_fair_usage">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${JOB_TRACKER}</job-tracker>
<name-node>${NAME_NODE}</name-node>
<exec>${copy_file}</exec>
<argument>${today_without_dash}</argument>
<argument>${mta}</argument>
<!-- <file>${path}#${start_fair_usage}</file> -->
<file>${path}${copy_file}#${copy_file}</file>
<capture-output/>
</shell>
<ok to="test_copy"/>
<error to="KILL"/>
</action>
<action name="test_copy">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${JOB_TRACKER}</job-tracker>
<name-node>${NAME_NODE}</name-node>
<arg>${wf:actionData('start_fair_usage')['paths']}</arg>
<!-- <arg>${NAME_NODE}/user/evkuzmin/input/*</arg> -->
<arg>${NAME_NODE}/user/evkuzmin/output</arg>
</distcp>
<ok to="END"/>
<error to="KILL"/>
</action>
start_fair_usage
开始 copy_file.sh
echo
echo
dirs=(
/user/comverse/data/_B
)
args=()
for i in $(hadoop fs -ls "${dirs[@]}" | egrep .gz | awk -F " " '{print }')
do
args+=("$i")
echo "copy file - "${i}
done
paths=${args}
echo ${paths}
这是我最后所做的。
<start to="start_copy"/>
<fork name="start_copy">
<path start="copy_mta"/>
<path start="copy_rcr"/>
<path start="copy_sub"/>
</fork>
<action name="copy_mta">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${NAME_NODE}${dstFolder}mta/*"/>
</prepare>
<arg>${NAME_NODE}${srcFolder}/*mta.gz</arg>
<arg>${NAME_NODE}${dstFolder}mta/</arg>
</distcp>
<ok to="end_copy"/>
<error to="KILL"/>
</action>
<action name="copy_rcr">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${NAME_NODE}${dstFolder}rcr/*"/>
</prepare>
<arg>${NAME_NODE}${srcFolder}/*rcr.gz</arg>
<arg>${NAME_NODE}${dstFolder}rcr/</arg>
</distcp>
<ok to="end_copy"/>
<error to="KILL"/>
</action>
<action name="copy_sub">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${NAME_NODE}${dstFolder}sub/*"/>
</prepare>
<arg>${NAME_NODE}${srcFolder}/*sub.gz</arg>
<arg>${NAME_NODE}${dstFolder}sub/</arg>
</distcp>
<ok to="end_copy"/>
<error to="KILL"/>
</action>
<join name="end_copy" to="END"/>
<kill name="KILL">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="END"/>
原来可以在 distcp 中使用通配符,所以我根本不需要 bash。
还有。有人建议我用scala写。
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, FileUtil}
val conf = new Configuration()
val fs = FileSystem.get(conf)
val listOfFileTypes = List("mta", "rcr", "sub")
val listOfPlatforms = List("B", "C", "H", "M", "Y")
for(fileType <- listOfFileTypes){
FileUtil.fullyDeleteContents(new File("/apps/hive/warehouse/arstel.db/fair_usage/fct_evkuzmin/file_" + fileType))
for (platform <- listOfPlatforms) {
var srcPaths = fs.globStatus(new Path("/user/comverse/data/" + "20170404" + "_" + platform + "/*" + fileType + ".gz"))
var dstPath = new Path("/apps/hive/warehouse/arstel.db/fair_usage/fct_evkuzmin/file_" + fileType)
for(srcPath <- srcPaths){
println("copying " + srcPath.getPath.toString)
FileUtil.copy(fs, srcPath.getPath, fs, dstPath, false, conf)
}
}
}
两者都有效,虽然我还没有尝试 运行 Oozie 中的 scala 脚本。
我的操作
start_fair_usage
以状态 okey 结束,但是 test_copy
returns
Main class [org.apache.oozie.action.hadoop.DistcpMain], main() threw exception, null
在 /user/comverse/data/_B
中我有很多不同的文件,其中一些我想复制到 ${NAME_NODE}/user/evkuzmin/output
。为此,我尝试从 copy_files.sh
传递 paths
,它包含指向我需要的文件的路径数组。
<action name="start_fair_usage">
<shell xmlns="uri:oozie:shell-action:0.1">
<job-tracker>${JOB_TRACKER}</job-tracker>
<name-node>${NAME_NODE}</name-node>
<exec>${copy_file}</exec>
<argument>${today_without_dash}</argument>
<argument>${mta}</argument>
<!-- <file>${path}#${start_fair_usage}</file> -->
<file>${path}${copy_file}#${copy_file}</file>
<capture-output/>
</shell>
<ok to="test_copy"/>
<error to="KILL"/>
</action>
<action name="test_copy">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<job-tracker>${JOB_TRACKER}</job-tracker>
<name-node>${NAME_NODE}</name-node>
<arg>${wf:actionData('start_fair_usage')['paths']}</arg>
<!-- <arg>${NAME_NODE}/user/evkuzmin/input/*</arg> -->
<arg>${NAME_NODE}/user/evkuzmin/output</arg>
</distcp>
<ok to="END"/>
<error to="KILL"/>
</action>
start_fair_usage
开始 copy_file.sh
echo
echo
dirs=(
/user/comverse/data/_B
)
args=()
for i in $(hadoop fs -ls "${dirs[@]}" | egrep .gz | awk -F " " '{print }')
do
args+=("$i")
echo "copy file - "${i}
done
paths=${args}
echo ${paths}
这是我最后所做的。
<start to="start_copy"/>
<fork name="start_copy">
<path start="copy_mta"/>
<path start="copy_rcr"/>
<path start="copy_sub"/>
</fork>
<action name="copy_mta">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${NAME_NODE}${dstFolder}mta/*"/>
</prepare>
<arg>${NAME_NODE}${srcFolder}/*mta.gz</arg>
<arg>${NAME_NODE}${dstFolder}mta/</arg>
</distcp>
<ok to="end_copy"/>
<error to="KILL"/>
</action>
<action name="copy_rcr">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${NAME_NODE}${dstFolder}rcr/*"/>
</prepare>
<arg>${NAME_NODE}${srcFolder}/*rcr.gz</arg>
<arg>${NAME_NODE}${dstFolder}rcr/</arg>
</distcp>
<ok to="end_copy"/>
<error to="KILL"/>
</action>
<action name="copy_sub">
<distcp xmlns="uri:oozie:distcp-action:0.2">
<prepare>
<delete path="${NAME_NODE}${dstFolder}sub/*"/>
</prepare>
<arg>${NAME_NODE}${srcFolder}/*sub.gz</arg>
<arg>${NAME_NODE}${dstFolder}sub/</arg>
</distcp>
<ok to="end_copy"/>
<error to="KILL"/>
</action>
<join name="end_copy" to="END"/>
<kill name="KILL">
<message>Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="END"/>
原来可以在 distcp 中使用通配符,所以我根本不需要 bash。
还有。有人建议我用scala写。
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path, FileUtil}
val conf = new Configuration()
val fs = FileSystem.get(conf)
val listOfFileTypes = List("mta", "rcr", "sub")
val listOfPlatforms = List("B", "C", "H", "M", "Y")
for(fileType <- listOfFileTypes){
FileUtil.fullyDeleteContents(new File("/apps/hive/warehouse/arstel.db/fair_usage/fct_evkuzmin/file_" + fileType))
for (platform <- listOfPlatforms) {
var srcPaths = fs.globStatus(new Path("/user/comverse/data/" + "20170404" + "_" + platform + "/*" + fileType + ".gz"))
var dstPath = new Path("/apps/hive/warehouse/arstel.db/fair_usage/fct_evkuzmin/file_" + fileType)
for(srcPath <- srcPaths){
println("copying " + srcPath.getPath.toString)
FileUtil.copy(fs, srcPath.getPath, fs, dstPath, false, conf)
}
}
}
两者都有效,虽然我还没有尝试 运行 Oozie 中的 scala 脚本。