是否可以从 Spark Streaming 文件夹中删除文件?

Is it possible to remove files from Spark Streaming folder?

Spark 2.1,ETL过程将文件从源系统转换为parquet并将小parquet放入folder1。 folder1 上的 Spark streaming 工作正常,但 folder1 中的 parquet 文件对于 HDFS 来说太小了。我们必须将小的 parquet 文件合并到更大的文件中,但是当我尝试从 folder1 中删除文件时,spark streaming 进程出现异常:

17/07/26 17:16:23 错误 StreamExecution:查询 [id = f29783ea-bdfb-4b59-a6f6-b77f79509a5a,runId = cbcce2b2-7d7b-4e31-a15a-7efed420f974] 因错误而终止 java.io.FileNotFoundException: 文件不存在

是否可以合并 spark streaming 文件夹中的 parquet 文件?

    Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0.2.6.0.3-8
      /_/

Using Scala version 2.11.8 (OpenJDK 64-Bit Server VM, Java 1.8.0_131)
Type in expressions to have them evaluated.
Type :help for more information.

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.types._

val userSchema = new StructType()
  .add("itemId", "string")
  .add("tstamp", "integer")
  .add("rowtype", "string")
  .add("rowordernumber", "integer")
  .add("parentrowordernumber", "integer")
  .add("fieldname", "string")
  .add("valuestr", "string")

val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2")

csvDF.createOrReplaceTempView("tab1")
val aggDF = spark.sql("select distinct count(itemId) as cases_count from tab1")
aggDF
  .writeStream
  .outputMode("complete")
  .format("console")
  .start()

aggDF
.writeStream
.queryName("aggregates")    // this query name will be the table name
.outputMode("complete")
  .format("memory")
  .start()
spark.sql("select * from aggregates").show()

// Exiting paste mode, now interpreting.

+-----------+
|cases_count|
+-----------+
+-----------+

import org.apache.spark.sql.types._
userSchema: org.apache.spark.sql.types.StructType = StructType(StructField(itemId,StringType,true), StructField(tstamp,IntegerType,true), StructField(rowtype,StringType,true), StructField(rowordernumber,IntegerType,true), StructField(parentrowordernumber,IntegerType,true), StructField(fieldname,StringType,true), StructField(valuestr,StringType,true))
csvDF: org.apache.spark.sql.DataFrame = [itemId: string, tstamp: int ... 5 more fields]
aggDF: org.apache.spark.sql.DataFrame = [cases_count: bigint]

scala> -------------------------------------------
Batch: 0
-------------------------------------------
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
+-----------+
|cases_count|
+-----------+
|  292086106|
+-----------+

-------------------------------------------
Batch: 1
-------------------------------------------
+-----------+
|cases_count|
+-----------+
|  292086106|
+-----------+

-------------------------------------------
Batch: 2
-------------------------------------------
+-----------+
|cases_count|
+-----------+
|  292086106|
+-----------+

-------------------------------------------
Batch: 3
-------------------------------------------
+-----------+
|cases_count|
+-----------+
|  292086106|
+-----------+

-------------------------------------------
Batch: 4
-------------------------------------------
+-----------+
|cases_count|
+-----------+
|  324016758|
|  292086106|
+-----------+

-------------------------------------------
Batch: 5
-------------------------------------------
+-----------+
|cases_count|
+-----------+
|  355839229|
|  324016758|
|  292086106|
+-----------+

17/07/26 17:16:23 ERROR StreamExecution: Query [id = f29783ea-bdfb-4b59-a6f6-b77f79509a5a, runId = cbcce2b2-7d7b-4e31-a15a-7efed420f974] terminated with error
java.io.FileNotFoundException: File does not exist: /folder1/folder2/P-FMVDBAF-4021-20161107152556-1_006.gz.parquet

您可以使用 globbing 来只处理您需要的文件。像这样:

val csvDF = spark.readStream.schema(userSchema).parquet("/folder1/folder2/bigger_file*.parquet")