强制 Glue Crawler 创建单独的表
Force Glue Crawler to create separate tables
我不断将 parquet 数据集添加到结构如下的 S3 文件夹中:
s3:::my-bucket/public/data/set1
s3:::my-bucket/public/data/set2
s3:::my-bucket/public/data/set3
一开始我只有set1
,我的爬虫在整个桶s3:::my-bucket
上配置为运行。这导致创建了一个名为 my-bucket
的 tabled 分区,其分区名为 public
、data
和 set1
。我真正想要的是有一个名为 set1
的 table,没有任何分区。
我明白了发生这种情况的原因,正如 How Does a Crawler Determine When to Create Partitions? 中所解释的那样。但是当一个新的数据集被上传时(例如 set2
),我不希望它成为另一个分区(因为它是具有不同模式的完全不同的数据)。
如何强制 Glue 爬虫不创建分区?
我知道我可以将爬虫路径定义为 s3:::my-bucket/public/data/
但不幸的是我不知道新数据集将在哪里创建(例如也可以是 s3:::my-bucket/other/folder/set2
)。
有什么解决办法吗?
您可以使用 TableLevelConfiguration
指定搜寻器应在哪个文件夹级别查找表。
更多信息here。
我的解决方案是手动将特定路径添加到 Glue 爬虫中。总体情况是,我正在使用 Glue 作业将数据从一个 S3 存储桶转换并写入另一个存储桶。我现在最终将 Glue 爬虫初步配置为爬取整个存储桶。但每次 Glue 转换作业运行时,它也会更新 Glue 爬虫:它会删除初始的完整存储桶位置(如果它仍然存在),然后将新路径添加到 S3 目标。
在 Python 中看起来像这样:
def update_target_paths(crawler):
"""
Remove initial include path (whole bucket) from paths and
add folder for current files to include paths.
"""
def path_is(c, p):
return c["Path"] == p
# get S3 targets and remove initial bucket target
s3_targets = list(
filter(
lambda c: not path_is(c, f"s3://{bucket_name}"),
crawler["Targets"]["S3Targets"],
)
)
# add new target path if not in targets yet
if not any(filter(lambda c: path_is(c, output_loc), s3_targets)):
s3_targets.append({"Path": output_loc})
logging.info("Appending path '%s' to Glue crawler include path.", output_loc)
crawler["Targets"]["S3Targets"] = s3_targets
return crawler
def remove_excessive_keys(crawler):
"""Remove keys from Glue crawler dict that are not needed/allowed to update the crawler"""
for k in ["State", "CrawlElapsedTime", "CreationTime", "LastUpdated", "LastCrawl", "Version"]:
try:
del crawler[k]
except KeyError:
logging.warning(f"Key '{k}' not in crawler result dictionary.")
return crawler
if __name__ == "__main__":
logging.info(f"Transforming from {input_loc} to {output_loc}.")
if prefix_exists(curated_zone_bucket_name, curated_zone_key):
logging.info("Target object already exists, appending.")
else:
logging.info("Target object doesn't exist, writing to new one.")
transform() # do data transformation and write to output bucket
while True:
try:
crawler = get_crawler(CRAWLER_NAME)
crawler = update_target_paths(crawler)
crawler = remove_excessive_keys(crawler)
# Update Glue crawler with updated include paths
glue_client.update_crawler(**crawler)
glue_client.start_crawler(Name=CRAWLER_NAME)
logging.info("Started Glue crawler '%s'.", CRAWLER_NAME)
break
except (
glue_client.exceptions.CrawlerRunningException,
glue_client.exceptions.InvalidInputException,
):
logging.warning("Crawler still running...")
time.sleep(10)
全局定义的变量:input_loc
、output_loc
、CRAWLER_NAME
、bucket_name
.
对于每个新数据集,Glue 爬虫都会添加一个新路径。不会创建任何分区。
我不断将 parquet 数据集添加到结构如下的 S3 文件夹中:
s3:::my-bucket/public/data/set1
s3:::my-bucket/public/data/set2
s3:::my-bucket/public/data/set3
一开始我只有set1
,我的爬虫在整个桶s3:::my-bucket
上配置为运行。这导致创建了一个名为 my-bucket
的 tabled 分区,其分区名为 public
、data
和 set1
。我真正想要的是有一个名为 set1
的 table,没有任何分区。
我明白了发生这种情况的原因,正如 How Does a Crawler Determine When to Create Partitions? 中所解释的那样。但是当一个新的数据集被上传时(例如 set2
),我不希望它成为另一个分区(因为它是具有不同模式的完全不同的数据)。
如何强制 Glue 爬虫不创建分区?
我知道我可以将爬虫路径定义为 s3:::my-bucket/public/data/
但不幸的是我不知道新数据集将在哪里创建(例如也可以是 s3:::my-bucket/other/folder/set2
)。
有什么解决办法吗?
您可以使用 TableLevelConfiguration
指定搜寻器应在哪个文件夹级别查找表。
更多信息here。
我的解决方案是手动将特定路径添加到 Glue 爬虫中。总体情况是,我正在使用 Glue 作业将数据从一个 S3 存储桶转换并写入另一个存储桶。我现在最终将 Glue 爬虫初步配置为爬取整个存储桶。但每次 Glue 转换作业运行时,它也会更新 Glue 爬虫:它会删除初始的完整存储桶位置(如果它仍然存在),然后将新路径添加到 S3 目标。
在 Python 中看起来像这样:
def update_target_paths(crawler):
"""
Remove initial include path (whole bucket) from paths and
add folder for current files to include paths.
"""
def path_is(c, p):
return c["Path"] == p
# get S3 targets and remove initial bucket target
s3_targets = list(
filter(
lambda c: not path_is(c, f"s3://{bucket_name}"),
crawler["Targets"]["S3Targets"],
)
)
# add new target path if not in targets yet
if not any(filter(lambda c: path_is(c, output_loc), s3_targets)):
s3_targets.append({"Path": output_loc})
logging.info("Appending path '%s' to Glue crawler include path.", output_loc)
crawler["Targets"]["S3Targets"] = s3_targets
return crawler
def remove_excessive_keys(crawler):
"""Remove keys from Glue crawler dict that are not needed/allowed to update the crawler"""
for k in ["State", "CrawlElapsedTime", "CreationTime", "LastUpdated", "LastCrawl", "Version"]:
try:
del crawler[k]
except KeyError:
logging.warning(f"Key '{k}' not in crawler result dictionary.")
return crawler
if __name__ == "__main__":
logging.info(f"Transforming from {input_loc} to {output_loc}.")
if prefix_exists(curated_zone_bucket_name, curated_zone_key):
logging.info("Target object already exists, appending.")
else:
logging.info("Target object doesn't exist, writing to new one.")
transform() # do data transformation and write to output bucket
while True:
try:
crawler = get_crawler(CRAWLER_NAME)
crawler = update_target_paths(crawler)
crawler = remove_excessive_keys(crawler)
# Update Glue crawler with updated include paths
glue_client.update_crawler(**crawler)
glue_client.start_crawler(Name=CRAWLER_NAME)
logging.info("Started Glue crawler '%s'.", CRAWLER_NAME)
break
except (
glue_client.exceptions.CrawlerRunningException,
glue_client.exceptions.InvalidInputException,
):
logging.warning("Crawler still running...")
time.sleep(10)
全局定义的变量:input_loc
、output_loc
、CRAWLER_NAME
、bucket_name
.
对于每个新数据集,Glue 爬虫都会添加一个新路径。不会创建任何分区。