从 Apache Flink 中的输入文件创建对象
Create objects from input files in Apache Flink
我有一个由文件夹和文件构成的数据集。文件夹/文件结构本身对数据分析很重要。
数据集的结构:
folder1
+-----file11
+-----column1
+-----column2
每个文件都包含描述一个对象的数据。文件格式一致。它基本上是一个包含两列的 csv 文件。这两列应表示为结果对象中的元组序列。
文件的大小非常小。最多只有 20 kb。每个文件夹包含大约200个文件。
所需的输出对象应该是:
{
a: "folder1", // name of parent folder
b: "file11", // name of content file
c: Seq[(String, String)] // content of file1
}
如何在 Scala 中处理读取此数据集?
有两种方法可以解决这个问题:
a) 如果文件夹中的数据非常小(不到几兆),可以在本地读取,然后使用ExecutionEnvironment.fromCollection()
方法将数据导入到Flink作业中。
b) 您创建了一个自定义的 InputFormat。 InputFormat 允许解析自定义文件格式。在您的情况下,我会扩展 TextInputFormat
并覆盖 readRecord()
方法。此方法将文件中的每一行作为字符串提供给您。
然后,您可以手动解析 String 中的数据,并将解析结果 return 与目录信息放在 Tuple3 中。您可以从 filePath
变量访问路径。
对于使用 FileInputFormat
s 递归读取文件,有 recursive.file.enumeration
配置值。
阅读以上帖子后,我能够创建自定义 FileInputFormat class 来读取 excel.xlsx 文件并在 flink 中进行流式传输。代码如下
/*
* Custom format output I was expecting from the records converted into records
*/
case class ExcelSheetData(Module : Double, StartTime : String, EndTime : String,...., FileName : String)
/*
* Custom class to read Excel spreadsheet using flink FileInputFormat class
*/
class ExcelInputFormat extends FileInputFormat[ExcelSheetData]{
var running : Boolean = false
var excelData : Seq[ExcelSheetData] = null
unsplittable = true
override def open(fileSplit : FileInputSplit) = {
println(fileSplit.getPath.toString.drop(6))
val myFile = new File(fileSplit.getPath.toString.drop(6))
val fileName = fileSplit.getPath.toString.drop(6)
val fis = new FileInputStream(myFile)
try{
val myWorkbook = new XSSFWorkbook(fis)
// println(s"Sheet Name: ${mySheet.getSheetName()}")
// reading multiple sheets having identical data
val mySheets = myWorkbook.iterator().asScala
val exData = for(s <- mySheets
if(s.getSheetName() == "Sheet1" || s.getSheetName() == "Sheet")) yield {
val rowItr = s.rowIterator().asScala
for(e <- rowItr
if(e.getRowNum() > 5 && e.getCell(1).getCellType() == 0)) yield {
(e.getCell(1).getDateCellValue(), e.getCell(2).getDateCellValue(), ......,
,fileName)
}
}
excelData = exData.toSeq.flatten.map( record => {ExcelSheetData(record._1, record._2.toString, record._3.toString, record._4, record._5, record._6, record._7, record._8, record._9)})
running = if(excelData.length >= 1) true else false
} finally { fis.close()}
}
override def nextRecord(reuse: ExcelSheetData): ExcelSheetData = {
val head = excelData.head
excelData = excelData.tail
running = if (excelData.length == 0) false else true
head
}
override def reachedEnd(): Boolean = ! running
}
/*
* Initialize custom class to read Excel Input
*/
val excelInput = new ExcelInputFormat()
// Read excel data into flink stream
val excelData = senv.readFile(excelInput, ExcelFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000)
.uid("Excel File Read")
//Windowing code down below...
HadoopOffice 库原生支持 Flink Table API 以及 Excel 文件的 Flink DataSource/DataSink。
我有一个由文件夹和文件构成的数据集。文件夹/文件结构本身对数据分析很重要。
数据集的结构:
folder1
+-----file11
+-----column1
+-----column2
每个文件都包含描述一个对象的数据。文件格式一致。它基本上是一个包含两列的 csv 文件。这两列应表示为结果对象中的元组序列。
文件的大小非常小。最多只有 20 kb。每个文件夹包含大约200个文件。
所需的输出对象应该是:
{
a: "folder1", // name of parent folder
b: "file11", // name of content file
c: Seq[(String, String)] // content of file1
}
如何在 Scala 中处理读取此数据集?
有两种方法可以解决这个问题:
a) 如果文件夹中的数据非常小(不到几兆),可以在本地读取,然后使用ExecutionEnvironment.fromCollection()
方法将数据导入到Flink作业中。
b) 您创建了一个自定义的 InputFormat。 InputFormat 允许解析自定义文件格式。在您的情况下,我会扩展 TextInputFormat
并覆盖 readRecord()
方法。此方法将文件中的每一行作为字符串提供给您。
然后,您可以手动解析 String 中的数据,并将解析结果 return 与目录信息放在 Tuple3 中。您可以从 filePath
变量访问路径。
对于使用 FileInputFormat
s 递归读取文件,有 recursive.file.enumeration
配置值。
阅读以上帖子后,我能够创建自定义 FileInputFormat class 来读取 excel.xlsx 文件并在 flink 中进行流式传输。代码如下
/*
* Custom format output I was expecting from the records converted into records
*/
case class ExcelSheetData(Module : Double, StartTime : String, EndTime : String,...., FileName : String)
/*
* Custom class to read Excel spreadsheet using flink FileInputFormat class
*/
class ExcelInputFormat extends FileInputFormat[ExcelSheetData]{
var running : Boolean = false
var excelData : Seq[ExcelSheetData] = null
unsplittable = true
override def open(fileSplit : FileInputSplit) = {
println(fileSplit.getPath.toString.drop(6))
val myFile = new File(fileSplit.getPath.toString.drop(6))
val fileName = fileSplit.getPath.toString.drop(6)
val fis = new FileInputStream(myFile)
try{
val myWorkbook = new XSSFWorkbook(fis)
// println(s"Sheet Name: ${mySheet.getSheetName()}")
// reading multiple sheets having identical data
val mySheets = myWorkbook.iterator().asScala
val exData = for(s <- mySheets
if(s.getSheetName() == "Sheet1" || s.getSheetName() == "Sheet")) yield {
val rowItr = s.rowIterator().asScala
for(e <- rowItr
if(e.getRowNum() > 5 && e.getCell(1).getCellType() == 0)) yield {
(e.getCell(1).getDateCellValue(), e.getCell(2).getDateCellValue(), ......,
,fileName)
}
}
excelData = exData.toSeq.flatten.map( record => {ExcelSheetData(record._1, record._2.toString, record._3.toString, record._4, record._5, record._6, record._7, record._8, record._9)})
running = if(excelData.length >= 1) true else false
} finally { fis.close()}
}
override def nextRecord(reuse: ExcelSheetData): ExcelSheetData = {
val head = excelData.head
excelData = excelData.tail
running = if (excelData.length == 0) false else true
head
}
override def reachedEnd(): Boolean = ! running
}
/*
* Initialize custom class to read Excel Input
*/
val excelInput = new ExcelInputFormat()
// Read excel data into flink stream
val excelData = senv.readFile(excelInput, ExcelFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000)
.uid("Excel File Read")
//Windowing code down below...
HadoopOffice 库原生支持 Flink Table API 以及 Excel 文件的 Flink DataSource/DataSink。