从 Apache Flink 中的输入文件创建对象

Create objects from input files in Apache Flink

我有一个由文件夹和文件构成的数据集。文件夹/文件结构本身对数据分析很重要。

数据集的结构:

folder1
   +-----file11
            +-----column1
            +-----column2

每个文件都包含描述一个对象的数据。文件格式一致。它基本上是一个包含两列的 csv 文件。这两列应表示为结果对象中的元组序列。

文件的大小非常小。最多只有 20 kb。每个文件夹包含大约200个文件。

所需的输出对象应该是:

{
    a: "folder1",              // name of parent folder
    b: "file11",               // name of content file
    c: Seq[(String, String)]   // content of file1
}

如何在 Scala 中处理读取此数据集?

有两种方法可以解决这个问题:

a) 如果文件夹中的数据非常小(不到几兆),可以在本地读取,然后使用ExecutionEnvironment.fromCollection()方法将数据导入到Flink作业中。

b) 您创建了一个自定义的 InputFormat。 InputFormat 允许解析自定义文件格式。在您的情况下,我会扩展 TextInputFormat 并覆盖 readRecord() 方法。此方法将文件中的每一行作为字符串提供给您。 然后,您可以手动解析 String 中的数据,并将解析结果 return 与目录信息放在 Tuple3 中。您可以从 filePath 变量访问路径。 对于使用 FileInputFormats 递归读取文件,有 recursive.file.enumeration 配置值。

阅读以上帖子后,我能够创建自定义 FileInputFormat class 来读取 excel.xlsx 文件并在 flink 中进行流式传输。代码如下

/*
 * Custom format output I was expecting from the records converted into records
 */
case class ExcelSheetData(Module : Double, StartTime : String, EndTime : String,....,  FileName : String)

/*
 * Custom class to read Excel spreadsheet using flink FileInputFormat class 
 */
class ExcelInputFormat extends FileInputFormat[ExcelSheetData]{

    var running : Boolean = false
    var excelData : Seq[ExcelSheetData] = null
    unsplittable = true

    override def open(fileSplit : FileInputSplit) = {

        println(fileSplit.getPath.toString.drop(6))
        val myFile = new File(fileSplit.getPath.toString.drop(6)) 
        val fileName = fileSplit.getPath.toString.drop(6)

        val fis = new FileInputStream(myFile)
        try{
            val myWorkbook = new XSSFWorkbook(fis)

            // println(s"Sheet Name: ${mySheet.getSheetName()}")
            // reading multiple sheets having identical data
            val mySheets = myWorkbook.iterator().asScala   
            val exData = for(s <- mySheets
                            if(s.getSheetName() == "Sheet1" || s.getSheetName() == "Sheet")) yield  {
                            val rowItr = s.rowIterator().asScala
                                for(e <- rowItr
                                    if(e.getRowNum() > 5 && e.getCell(1).getCellType() == 0)) yield {
                                    (e.getCell(1).getDateCellValue(), e.getCell(2).getDateCellValue(), ......,
                                    ,fileName)
                            }
                }

            excelData = exData.toSeq.flatten.map( record => {ExcelSheetData(record._1, record._2.toString, record._3.toString, record._4, record._5, record._6, record._7, record._8, record._9)})

            running = if(excelData.length >= 1) true else false

        } finally { fis.close()}

    }

    override def nextRecord(reuse: ExcelSheetData): ExcelSheetData = { 
        val head = excelData.head 
        excelData = excelData.tail 
        running = if (excelData.length == 0) false else true 
        head 
    }

    override def reachedEnd(): Boolean = ! running  

}

/*
 * Initialize custom class to read Excel Input
 */
val excelInput = new ExcelInputFormat()

// Read excel data into flink stream
val excelData = senv.readFile(excelInput, ExcelFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000)
                            .uid("Excel File Read")

//Windowing code down below... 

HadoopOffice 库原生支持 Flink Table API 以及 Excel 文件的 Flink DataSource/DataSink。

https://github.com/ZuInnoTe/hadoopoffice/wiki