spark wholetextfiles 会选择部分创建的文件吗?
Will spark wholetextfiles pick partially created file?
我正在使用 Spark wholeTextFiles API 从源文件夹中读取文件并将其加载到配置单元 table。
文件正在从远程服务器到达源文件夹。文件很大,如 1GB-3GB。文件的 SCP 需要一段时间。
如果我启动 spark 作业并且文件正在 SCPd 到源文件夹并且进程进行到一半,spark 会选择文件吗?
如果 spark 在中途选择文件,这将是一个问题,因为它会忽略文件的其余内容。
如果您正在将文件 SCP 到源文件夹;然后 spark 正在从该文件夹中读取;可能会发生,写入一半的文件被 spark 拾取,因为 SCP 可能需要一些时间来复制。
那肯定会发生。
您的任务是 - 如何不直接写入该源文件夹 - 这样 Spark 就不会选择不完整的文件。
可能的解决方法:
- 在每个文件复制结束时,SCP 零 kb 文件表明 SCP 已完成。
- 在 spark 作业中,当您执行
sc.wholeTextFiles(...)
时,仅选择那些具有零 kb 对应文件的文件名 - 使用 map。
可能的解决方法:
- 在每个文件复制结束时,SCP 零 kb 文件表明 SCP 已完成。
- 在 spark 作业中,当您执行 sc.wholeTextFiles(...) 时,仅选择那些具有零 kb 对应文件的文件名 - 使用 map。
因此,这是检查 src 文件夹中是否存在相应 .ctl
文件的代码。
val fr = sc.wholeTextFiles("D:\DATA\TEST\tempstatus")
// Get only .ctl file
val temp1 = fr.map(x => x._1).filter(x => x.endsWith(".ctl"))
// Identify corresponding REAL-FILEs - without .ctl suffix
val temp2 = temp1.map(x => (x.replace(".ctl", ""),x.replace(".ctl", "")))
val result = fr
.join(xx)
.map{
case (_, (entry, x)) => (x, entry)
}
... 根据需要处理 rdd result
。
rdd temp2
从 RDD[String]
更改为 RDD[String, String]
- 用于 JOIN
操作。没关系。
我正在使用 Spark wholeTextFiles API 从源文件夹中读取文件并将其加载到配置单元 table。
文件正在从远程服务器到达源文件夹。文件很大,如 1GB-3GB。文件的 SCP 需要一段时间。
如果我启动 spark 作业并且文件正在 SCPd 到源文件夹并且进程进行到一半,spark 会选择文件吗?
如果 spark 在中途选择文件,这将是一个问题,因为它会忽略文件的其余内容。
如果您正在将文件 SCP 到源文件夹;然后 spark 正在从该文件夹中读取;可能会发生,写入一半的文件被 spark 拾取,因为 SCP 可能需要一些时间来复制。
那肯定会发生。
您的任务是 - 如何不直接写入该源文件夹 - 这样 Spark 就不会选择不完整的文件。
可能的解决方法:
- 在每个文件复制结束时,SCP 零 kb 文件表明 SCP 已完成。
- 在 spark 作业中,当您执行
sc.wholeTextFiles(...)
时,仅选择那些具有零 kb 对应文件的文件名 - 使用 map。
可能的解决方法:
- 在每个文件复制结束时,SCP 零 kb 文件表明 SCP 已完成。
- 在 spark 作业中,当您执行 sc.wholeTextFiles(...) 时,仅选择那些具有零 kb 对应文件的文件名 - 使用 map。
因此,这是检查 src 文件夹中是否存在相应 .ctl
文件的代码。
val fr = sc.wholeTextFiles("D:\DATA\TEST\tempstatus")
// Get only .ctl file
val temp1 = fr.map(x => x._1).filter(x => x.endsWith(".ctl"))
// Identify corresponding REAL-FILEs - without .ctl suffix
val temp2 = temp1.map(x => (x.replace(".ctl", ""),x.replace(".ctl", "")))
val result = fr
.join(xx)
.map{
case (_, (entry, x)) => (x, entry)
}
... 根据需要处理 rdd result
。
rdd temp2
从 RDD[String]
更改为 RDD[String, String]
- 用于 JOIN
操作。没关系。