如何将 ExternalCatalog.listPartitions() 与 Java 一起使用

How to use ExternalCatalog.listPartitions() with Java

我是 Java 的新人。我想在 hiveTable 中删除分区。我想使用 SparkSession.ExternalCatalog().listPartitionsSparkSession.ExternalCatalog().dropPartitions.

我在 scala 上看到了这个方法 但是我不明白如何在 Java 上 运行 它。这是 etl 进程的一部分,我想了解如何在 Java.

上处理它

我的代码失败是因为误解了如何操作数据类型并将其转换为 java。需要什么类型的对象以及如何理解什么数据return API.

我的代码示例:

ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
ec.listPartitions("need_schema", "need_table");

失败的原因是:

method listPartitions in class org.apache.spark.sql.catalog.ExternalCatalog cannot be applied to given types.

我无法击败它,因为关于 api (https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-ExternalCatalog.html#listPartitions) 和 java 知识的信息较少,而且我找到的所有示例都写在 scala 上。 最后,我需要将这段适用于 Scala 的代码转换为 java:

def dropPartitions(spark:SparkSession, shema:String, table:String, need_date:String):Unit = {
        val cat = spark.sharedState.externalCatalog
        val partit = cat.ListPartitions(shema,table).map(_.spec).map(t => t.get("partition_field")).flatten
        val filteredPartit = partita.filter(_<dt).map(x => Map("partition_field" -> x))
        cat.dropPartitions(
    shema
    ,table
    ,filteredPartitions
    ,ignoreIfNotExists=true
    ,purge=false
    ,retainData=false
    }

拜托,如果你知道如何处理它,你能在这件事上提供帮助吗:

  1. java 中的一些代码示例,用于编写我自己的容器来操作来自 externalCatalog 的数据
  2. 此 api 中使用的数据结构和一些可以帮助我理解它们如何与 java
  3. 一起使用的理论来源
  4. scala 代码字符串中的含义:cat.ListPartitions(shema,table).map(_.spec).map(t => t.get("partition_field")). 变平? 发送

更新中 非常感谢您的反馈@jpg。我会尽力。我有很大的 etl 任务和目标是每周写入一次动态分区 table 数据。制作此数据集市的业务规则:(系统日期 - 90 天)。因此,我想在 public 访问模式中的目标 table 中删除分区数组(按天)。而且我已经阅读了删除分区的正确方法 - 使用 externalCatalog。由于这个项目的历史传统,我应该使用 java)并尝试了解如何最有效地做到这一点。我可以通过 System.out.println() return 进入终端的一些 externalCatalog 方法: externalCatalog.tableExists()、externalCatalog.listTables() 和 externalCatalog.getTable 的方法。但是我不明白如何处理 externalCatalog.listPartitions.

再更新一次 大家好。我的任务向前迈进了一步: 现在我可以 return 在列表分区的终端缓冲区中:

ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
ec.listPartitions("schema", "table", Option.empty()); // work! null or miss parameter fail program
Seq<CatalogTablePartition> ctp = ec.listPartitions("schema", "table", Option.empty());
List<CatalogTablePartition> catalogTablePartitions = JavaConverters.seqAsJavaListConverter(ctp).asJava();
for CatalogTablePartition catalogTablePartition: catalogTablePartitions) {
    System.out.println(catalogTablePartition.toLinkedHashMap().get("Partition Values"));//retutn me value of partition like "Some([validation_date=2021-07-01])"
)

但这是另一个问题。 我可以在这个 api 方法 ec.dropPartitions 中 return 值,例如 Java 列表。它需要 3d 参数 Seq> 结构。在这种情况下我也无法过滤分区 - 在我的梦想中我想过滤分区的值 less by date 参数然后删除它。 如果有人知道如何用这个 api 到 return 编写 map 方法,就像在我的 scala 示例中一样,请帮助我。

我自己解决了。也许它会对某人有所帮助。

public static void partitionDeleteLessDate(String db_name, String table_name, String date_less_delete) {
    ExternalCatalog ec = SparkSessionFactory.getSparkSession.sharedState().externalCatalog();
    Seq<CatalogTablePartition> ctp = ec.listPartitions(db_name, table_name, Option.empty());
    List<CatalogTablePartition> catalogTablePartitions = JavaConverters.seqAsJavaListConverter(ctp).asJava();
    List<Map<String, String>> allPartList = catalogTablePartitions.stream.
        .map(s -> s.spec.seq())
        .collect(Collectors.toList());
    List<String> datePartDel = 
        allPartList.stream()
            .map(x -> x.get("partition_name").get())
            .sorted()
            .collect(Collectors.toList());
    String lessThisDateDelete = date_less_delete;
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
    LocalDate date = LocalDate.parse(lessThisDateDelete, formatter);
    
    List<String> filteredDates = datePartDel.stream()
        .map(s -> LocalDate.parse(s, formatter))
        .filter(d -> d.isBefore(date))
        .map(s -> s.toString())
        .collect(Collectors.toList());
    
    for (String seeDate : filteredDates)) {
        List<Map<String, String>> elem = allPartList.stream()
            .filter(x -> x.get("partition_name").get().equals(seeDate))
            .collect(Collectors.toList());
        Seq<Map<String, String>> seqElem = JavaConverters.asScalaIteratorConverter(elem.iterator()).asScala.toSeq();
        ec.dropPartitions(
        db_name
        , table_name
        , seqElem
        , true
        , false
        , false
        );
    } 
}