如何将 ExternalCatalog.listPartitions() 与 Java 一起使用
How to use ExternalCatalog.listPartitions() with Java
我是 Java 的新人。我想在 hiveTable 中删除分区。我想使用 SparkSession.ExternalCatalog().listPartitions
和 SparkSession.ExternalCatalog().dropPartitions
.
我在 scala 上看到了这个方法
但是我不明白如何在 Java 上 运行 它。这是 etl 进程的一部分,我想了解如何在 Java.
上处理它
我的代码失败是因为误解了如何操作数据类型并将其转换为 java。需要什么类型的对象以及如何理解什么数据return API.
我的代码示例:
ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
ec.listPartitions("need_schema", "need_table");
失败的原因是:
method listPartitions in class org.apache.spark.sql.catalog.ExternalCatalog cannot be applied to given types.
我无法击败它,因为关于 api (https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-ExternalCatalog.html#listPartitions) 和 java 知识的信息较少,而且我找到的所有示例都写在 scala 上。
最后,我需要将这段适用于 Scala 的代码转换为 java:
def dropPartitions(spark:SparkSession, shema:String, table:String, need_date:String):Unit = {
val cat = spark.sharedState.externalCatalog
val partit = cat.ListPartitions(shema,table).map(_.spec).map(t => t.get("partition_field")).flatten
val filteredPartit = partita.filter(_<dt).map(x => Map("partition_field" -> x))
cat.dropPartitions(
shema
,table
,filteredPartitions
,ignoreIfNotExists=true
,purge=false
,retainData=false
}
拜托,如果你知道如何处理它,你能在这件事上提供帮助吗:
- java 中的一些代码示例,用于编写我自己的容器来操作来自 externalCatalog 的数据
- 此 api 中使用的数据结构和一些可以帮助我理解它们如何与 java
一起使用的理论来源
- scala 代码字符串中的含义:cat.ListPartitions(shema,table).map(_.spec).map(t => t.get("partition_field")). 变平?
发送
更新中
非常感谢您的反馈@jpg。我会尽力。我有很大的 etl 任务和目标是每周写入一次动态分区 table 数据。制作此数据集市的业务规则:(系统日期 - 90 天)。因此,我想在 public 访问模式中的目标 table 中删除分区数组(按天)。而且我已经阅读了删除分区的正确方法 - 使用 externalCatalog。由于这个项目的历史传统,我应该使用 java)并尝试了解如何最有效地做到这一点。我可以通过 System.out.println() return 进入终端的一些 externalCatalog 方法:
externalCatalog.tableExists()、externalCatalog.listTables() 和 externalCatalog.getTable 的方法。但是我不明白如何处理 externalCatalog.listPartitions.
再更新一次
大家好。我的任务向前迈进了一步:
现在我可以 return 在列表分区的终端缓冲区中:
ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
ec.listPartitions("schema", "table", Option.empty()); // work! null or miss parameter fail program
Seq<CatalogTablePartition> ctp = ec.listPartitions("schema", "table", Option.empty());
List<CatalogTablePartition> catalogTablePartitions = JavaConverters.seqAsJavaListConverter(ctp).asJava();
for CatalogTablePartition catalogTablePartition: catalogTablePartitions) {
System.out.println(catalogTablePartition.toLinkedHashMap().get("Partition Values"));//retutn me value of partition like "Some([validation_date=2021-07-01])"
)
但这是另一个问题。
我可以在这个 api 方法 ec.dropPartitions 中 return 值,例如 Java 列表。它需要 3d 参数 Seq
我自己解决了。也许它会对某人有所帮助。
public static void partitionDeleteLessDate(String db_name, String table_name, String date_less_delete) {
ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
Seq<CatalogTablePartition> ctp = ec.listPartitions(db_name, table_name, Option.empty());
List<CatalogTablePartition> catalogTablePartitions = JavaConverters.seqAsJavaListConverter(ctp).asJava();
List<Map<String, String>> allPartList = catalogTablePartitions.stream.
.map(s -> s.spec.seq())
.collect(Collectors.toList());
List<String> datePartDel =
allPartList.stream()
.map(x -> x.get("partition_name").get())
.sorted()
.collect(Collectors.toList());
String lessThisDateDelete = date_less_delete;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
LocalDate date = LocalDate.parse(lessThisDateDelete, formatter);
List<String> filteredDates = datePartDel.stream()
.map(s -> LocalDate.parse(s, formatter))
.filter(d -> d.isBefore(date))
.map(s -> s.toString())
.collect(Collectors.toList());
for (String seeDate : filteredDates)) {
List<Map<String, String>> elem = allPartList.stream()
.filter(x -> x.get("partition_name").get().equals(seeDate))
.collect(Collectors.toList());
Seq<Map<String, String>> seqElem = JavaConverters.asScalaIteratorConverter(elem.iterator()).asScala.toSeq();
ec.dropPartitions(
db_name
, table_name
, seqElem
, true
, false
, false
);
}
}
我是 Java 的新人。我想在 hiveTable 中删除分区。我想使用 SparkSession.ExternalCatalog().listPartitions
和 SparkSession.ExternalCatalog().dropPartitions
.
我在 scala 上看到了这个方法
我的代码失败是因为误解了如何操作数据类型并将其转换为 java。需要什么类型的对象以及如何理解什么数据return API.
我的代码示例:
ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
ec.listPartitions("need_schema", "need_table");
失败的原因是:
method listPartitions in class org.apache.spark.sql.catalog.ExternalCatalog cannot be applied to given types.
我无法击败它,因为关于 api (https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-ExternalCatalog.html#listPartitions) 和 java 知识的信息较少,而且我找到的所有示例都写在 scala 上。 最后,我需要将这段适用于 Scala 的代码转换为 java:
def dropPartitions(spark:SparkSession, shema:String, table:String, need_date:String):Unit = {
val cat = spark.sharedState.externalCatalog
val partit = cat.ListPartitions(shema,table).map(_.spec).map(t => t.get("partition_field")).flatten
val filteredPartit = partita.filter(_<dt).map(x => Map("partition_field" -> x))
cat.dropPartitions(
shema
,table
,filteredPartitions
,ignoreIfNotExists=true
,purge=false
,retainData=false
}
拜托,如果你知道如何处理它,你能在这件事上提供帮助吗:
- java 中的一些代码示例,用于编写我自己的容器来操作来自 externalCatalog 的数据
- 此 api 中使用的数据结构和一些可以帮助我理解它们如何与 java 一起使用的理论来源
- scala 代码字符串中的含义:cat.ListPartitions(shema,table).map(_.spec).map(t => t.get("partition_field")). 变平? 发送
更新中 非常感谢您的反馈@jpg。我会尽力。我有很大的 etl 任务和目标是每周写入一次动态分区 table 数据。制作此数据集市的业务规则:(系统日期 - 90 天)。因此,我想在 public 访问模式中的目标 table 中删除分区数组(按天)。而且我已经阅读了删除分区的正确方法 - 使用 externalCatalog。由于这个项目的历史传统,我应该使用 java)并尝试了解如何最有效地做到这一点。我可以通过 System.out.println() return 进入终端的一些 externalCatalog 方法: externalCatalog.tableExists()、externalCatalog.listTables() 和 externalCatalog.getTable 的方法。但是我不明白如何处理 externalCatalog.listPartitions.
再更新一次 大家好。我的任务向前迈进了一步: 现在我可以 return 在列表分区的终端缓冲区中:
ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
ec.listPartitions("schema", "table", Option.empty()); // work! null or miss parameter fail program
Seq<CatalogTablePartition> ctp = ec.listPartitions("schema", "table", Option.empty());
List<CatalogTablePartition> catalogTablePartitions = JavaConverters.seqAsJavaListConverter(ctp).asJava();
for CatalogTablePartition catalogTablePartition: catalogTablePartitions) {
System.out.println(catalogTablePartition.toLinkedHashMap().get("Partition Values"));//retutn me value of partition like "Some([validation_date=2021-07-01])"
)
但这是另一个问题。 我可以在这个 api 方法 ec.dropPartitions 中 return 值,例如 Java 列表。它需要 3d 参数 Seq
我自己解决了。也许它会对某人有所帮助。
public static void partitionDeleteLessDate(String db_name, String table_name, String date_less_delete) {
ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
Seq<CatalogTablePartition> ctp = ec.listPartitions(db_name, table_name, Option.empty());
List<CatalogTablePartition> catalogTablePartitions = JavaConverters.seqAsJavaListConverter(ctp).asJava();
List<Map<String, String>> allPartList = catalogTablePartitions.stream.
.map(s -> s.spec.seq())
.collect(Collectors.toList());
List<String> datePartDel =
allPartList.stream()
.map(x -> x.get("partition_name").get())
.sorted()
.collect(Collectors.toList());
String lessThisDateDelete = date_less_delete;
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
LocalDate date = LocalDate.parse(lessThisDateDelete, formatter);
List<String> filteredDates = datePartDel.stream()
.map(s -> LocalDate.parse(s, formatter))
.filter(d -> d.isBefore(date))
.map(s -> s.toString())
.collect(Collectors.toList());
for (String seeDate : filteredDates)) {
List<Map<String, String>> elem = allPartList.stream()
.filter(x -> x.get("partition_name").get().equals(seeDate))
.collect(Collectors.toList());
Seq<Map<String, String>> seqElem = JavaConverters.asScalaIteratorConverter(elem.iterator()).asScala.toSeq();
ec.dropPartitions(
db_name
, table_name
, seqElem
, true
, false
, false
);
}
}