在 spark scala [Spark] 中操作数据框的逻辑

Logic to manipulate dataframe in spark scala [Spark]

以下面的dataFrame为例: x.show(假)

+-------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|colId|hdfsPath                                                                                                                                        |timestamp    |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|11    |hdfs://novus-nameservice/a/b/c/done/compiled-20200218050518-1-0-0-1582020318751.snappy|1662157400000|
|12    |hdfs://novus-nameservice/a/b/c/done/compiled-20200219060507-1-0-0-1582023907108.snappy|1662158000000|
+-------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+

现在我正在尝试更新现有的 DF 以基于列 hdfsPath

创建一个新的 DF

新的 DF 应如下所示:

+-------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|colId|hdfsPath                                                                                                                                        |timestamp    |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|11    |hdfs://novus-nameservice/a/b/c/target/20200218/11/compiled-20200218050518-1-0-0-1582020318751.snappy|1662157400000|
|12    |hdfs://novus-nameservice/a/b/c/target/20200219/12/compiled-20200219060507-1-0-0-1582023907108.snappy|1662158000000|
+-------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+

所以路径 done 更改为 target 然后从 compiled-20200218050518-1-0-0-1582020318751.snappy 部分我得到日期 20200218 然后 colID 11 最后是 snappy 文件。实现这一目标的最简单和最有效的方法是什么?

创建新 DF 不是硬性要求,我可以用新列更新现有 DF。

总结一下: 当前的 hdfsPath:

hdfs://novus-nameservice/a/b/c/done/compiled-20200218050518-1-0-0-1582020318751.snappy

预期的 hdfsPath:

hdfs://novus-nameservice/a/b/c/target/20200218/11/compiled-20200218050518-1-0-0-1582020318751.snappy

基于colID。

我能想到的最简单的方法是将您的数据框转换为数据集并应用地图操作,然后返回数据框,

//  Define a case class

case class MyType(colId:Int,path:String,timestamp:Int) // they need to match the column names

dataframe.as[MyType].map(x=> <<Your Transformation code>>).toDf()

这是您可以使用 regex_replaceregex_extract 执行的操作,提取您想要的值并替换为它

df.withColumn("hdfsPath", regexp_replace(
  $"hdfsPath",
  lit("/done"),
  concat(
    lit("/target/"),
    regexp_extract($"hdfsPath", "compiled-([0-9]{1,8})", 1),
    lit("/"),
    $"colId")
))

输出:

+-----+----------------------------------------------------------------------------------------------------+-------------+
|colId|hdfsPath                                                                                            |timestamp    |
+-----+----------------------------------------------------------------------------------------------------+-------------+
|11   |hdfs://novus-nameservice/a/b/c/target/20200218/11/compiled-20200218050518-1-0-0-1582020318751.snappy|1662157400000|
|12   |hdfs://novus-nameservice/a/b/c/target/20200219/12/compiled-20200219060507-1-0-0-1582023907108.snappy|1662158000000|
+-----+----------------------------------------------------------------------------------------------------+-------------+

希望对您有所帮助!