Delta table : COPY INTO only specific partitioned folders from S3 bucket
Delta table : COPY INTO only specific partitioned folders from S3 bucket
我有一个登陆 S3 桶,每天都会收到一些数据,按日期分区:
s3:/my_bucket/date=2020-01-01/my_data.txt
s3:/my_bucket/date=2020-01-02/my_data.txt
s3:/my_bucket/date=2020-01-03/my_data.txt
我确实创建了第一个测试 Delta table 在日期列上分区,使用 SQL,然后我做了:
COPY INTO delta.my_table FROM (SELECT date, value FROM 's3:/my_bucket/') FILEFORMAT = TEXT
一切正常,然后...
今天我在那个桶上有了一个新的 partitionFolder :
s3:/my_bucket/date=2020-01-04/my_data.txt
我不知道如何只“复制到”新文件夹,并保留日期分区值。
基本上,我不能重复使用相同的 sql 语句来复制,因为它会复制以前的文件(其他日期,已经在 delta table 内)。
我试过直接指定文件夹路径:
COPY INTO delta.my_table FROM (SELECT date, value FROM 's3:/my_bucket/date=2020-01-04') FILEFORMAT = TEXT
但是Delta/Spark无法获取“日期列”,因为它是直接读取文件而不是包含日期列信息的子文件夹。
SQL AnalysisException: cannot resolve 'date' given input columns: [value];
我想在像
这样的子文件夹上添加一些“模式”匹配
COPY INTO delta.my_table FROM (SELECT date, value FROM 's3:/my_bucket/') FILEFORMAT = TEXT
PÄTTERN = 'date=2020-01-04/*'
但同样的错误,spark 无法理解此文件夹是一个新分区。
我的问题很简单,是否可以使用 增量分区 s3 存储桶 中的 'COPY INTO'(比如我的例如)。
PS :我的 Delta table 是一个外部 table,它 link 到另一个 s3 存储桶。
s3:/my_bucket/
应该只是着陆区并且应该是 immutable.
COPY INTO 是幂等操作 - 它不会从已经处理过的文件中加载数据,至少在您用 COPY_OPTIONS(force=true)
明确询问它之前是这样(请参阅 docs)。
您可以通过小实验轻松证明这一点 - 我使用 DBFS 只是一个例子:
- 将内容为
1
的文件加载到文件 dbfs:/tmp/cp-test/date=2022-05-16/1.txt
中并导入:
- 检查您是否已加载数据:
- 创建一个包含内容
2
的文件,将其上传到文件 dbfs:/tmp/cp-test/date=2022-05-17/1.txt
,然后也将其导入 - 您会看到我们只导入了一行,尽管那里有旧文件:
- 如果我们查看数据,我们会发现我们只有 2 行,这是应该的:
我有一个登陆 S3 桶,每天都会收到一些数据,按日期分区:
s3:/my_bucket/date=2020-01-01/my_data.txt
s3:/my_bucket/date=2020-01-02/my_data.txt
s3:/my_bucket/date=2020-01-03/my_data.txt
我确实创建了第一个测试 Delta table 在日期列上分区,使用 SQL,然后我做了:
COPY INTO delta.my_table FROM (SELECT date, value FROM 's3:/my_bucket/') FILEFORMAT = TEXT
一切正常,然后... 今天我在那个桶上有了一个新的 partitionFolder :
s3:/my_bucket/date=2020-01-04/my_data.txt
我不知道如何只“复制到”新文件夹,并保留日期分区值。 基本上,我不能重复使用相同的 sql 语句来复制,因为它会复制以前的文件(其他日期,已经在 delta table 内)。
我试过直接指定文件夹路径:
COPY INTO delta.my_table FROM (SELECT date, value FROM 's3:/my_bucket/date=2020-01-04') FILEFORMAT = TEXT
但是Delta/Spark无法获取“日期列”,因为它是直接读取文件而不是包含日期列信息的子文件夹。
SQL AnalysisException: cannot resolve 'date' given input columns: [value];
我想在像
这样的子文件夹上添加一些“模式”匹配 COPY INTO delta.my_table FROM (SELECT date, value FROM 's3:/my_bucket/') FILEFORMAT = TEXT
PÄTTERN = 'date=2020-01-04/*'
但同样的错误,spark 无法理解此文件夹是一个新分区。
我的问题很简单,是否可以使用 增量分区 s3 存储桶 中的 'COPY INTO'(比如我的例如)。
PS :我的 Delta table 是一个外部 table,它 link 到另一个 s3 存储桶。
s3:/my_bucket/
应该只是着陆区并且应该是 immutable.
COPY INTO 是幂等操作 - 它不会从已经处理过的文件中加载数据,至少在您用 COPY_OPTIONS(force=true)
明确询问它之前是这样(请参阅 docs)。
您可以通过小实验轻松证明这一点 - 我使用 DBFS 只是一个例子:
- 将内容为
1
的文件加载到文件dbfs:/tmp/cp-test/date=2022-05-16/1.txt
中并导入:
- 检查您是否已加载数据:
- 创建一个包含内容
2
的文件,将其上传到文件dbfs:/tmp/cp-test/date=2022-05-17/1.txt
,然后也将其导入 - 您会看到我们只导入了一行,尽管那里有旧文件:
- 如果我们查看数据,我们会发现我们只有 2 行,这是应该的: