Delta table : COPY INTO only specific partitioned folders from S3 bucket

Delta table : COPY INTO only specific partitioned folders from S3 bucket

我有一个登陆 S3 桶,每天都会收到一些数据,按日期分区:

s3:/my_bucket/date=2020-01-01/my_data.txt
s3:/my_bucket/date=2020-01-02/my_data.txt
s3:/my_bucket/date=2020-01-03/my_data.txt

我确实创建了第一个测试 Delta table 在日期列上分区,使用 SQL,然后我做了:

COPY INTO delta.my_table FROM (SELECT date, value FROM 's3:/my_bucket/') FILEFORMAT = TEXT

一切正常,然后... 今天我在那个桶上有了一个新的 partitionFolder :

s3:/my_bucket/date=2020-01-04/my_data.txt

不知道如何只“复制到”新文件夹,并保留日期分区值。 基本上,我不能重复使用相同的 sql 语句来复制,因为它会复制以前的文件(其他日期,已经在 delta table 内)。

我试过直接指定文件夹路径:

COPY INTO delta.my_table FROM (SELECT date, value FROM 's3:/my_bucket/date=2020-01-04') FILEFORMAT = TEXT

但是Delta/Spark无法获取“日期列”,因为它是直接读取文件而不是包含日期列信息的子文件夹。

SQL AnalysisException: cannot resolve 'date' given input columns: [value];

我想在像

这样的子文件夹上添加一些“模式”匹配
    COPY INTO delta.my_table FROM (SELECT date, value FROM 's3:/my_bucket/') FILEFORMAT = TEXT 
PÄTTERN = 'date=2020-01-04/*'

但同样的错误,spark 无法理解此文件夹是一个新分区

我的问题很简单,是否可以使用 增量分区 s3 存储桶 中的 'COPY INTO'(比如我的例如)。

PS :我的 Delta table 是一个外部 table,它 link 到另一个 s3 存储桶。 s3:/my_bucket/ 应该只是着陆区并且应该是 immutable.

COPY INTO 是幂等操作 - 它不会从已经处理过的文件中加载数据,至少在您用 COPY_OPTIONS(force=true) 明确询问它之前是这样(请参阅 docs)。

您可以通过小实验轻松证明这一点 - 我使用 DBFS 只是一个例子:

  • 将内容为 1 的文件加载到文件 dbfs:/tmp/cp-test/date=2022-05-16/1.txt 中并导入:

  • 检查您是否已加载数据:

  • 创建一个包含内容 2 的文件,将其上传到文件 dbfs:/tmp/cp-test/date=2022-05-17/1.txt,然后也将其导入 - 您会看到我们只导入了一行,尽管那里有旧文件:

  • 如果我们查看数据,我们会发现我们只有 2 行,这是应该的: