在spark scala中操作Dataframe的逻辑

iyr7buue  于 2021-05-27  发布在  Spark
关注(0)|答案(2)|浏览(462)

以下面的Dataframe为例:x.show(false)

+-------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|colId|hdfsPath                                                                                                                                        |timestamp    |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|11    |hdfs://novus-nameservice/a/b/c/done/compiled-20200218050518-1-0-0-1582020318751.snappy|1662157400000|
|12    |hdfs://novus-nameservice/a/b/c/done/compiled-20200219060507-1-0-0-1582023907108.snappy|1662158000000|
+-------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+

现在我正在尝试更新现有的df,以基于列创建一个新的df hdfsPath 新的df应该如下所示:

+-------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|colId|hdfsPath                                                                                                                                        |timestamp    |
+-------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+
|11    |hdfs://novus-nameservice/a/b/c/target/20200218/11/compiled-20200218050518-1-0-0-1582020318751.snappy|1662157400000|
|12    |hdfs://novus-nameservice/a/b/c/target/20200219/12/compiled-20200219060507-1-0-0-1582023907108.snappy|1662158000000|
+-------+--------------------------------------------------------------------------------------------------------------------------------------------+-------------+

所以所做的路径更改为目标,然后从 compiled-20200218050518-1-0-0-1582020318751.snappy 我知道日期了 20200218 然后 colID 11,最后是snappy文件。实现这一目标的最简单和最有效的方法是什么?
创建一个新的df并不难,我可以用一个新列更新现有的df。
总结:当前hdfspath: hdfs://novus-nameservice/a/b/c/done/compiled-20200218050518-1-0-0-1582020318751.snappy 预期hdfspath: hdfs://novus-nameservice/a/b/c/target/20200218/11/compiled-20200218050518-1-0-0-1582020318751.snappy 基于colid。

mnowg1ta

mnowg1ta1#

我能想象的最简单的方法是将Dataframe转换为数据集,应用Map操作,然后返回到Dataframe,

//  Define a case class

case class MyType(colId:Int,path:String,timestamp:Int) // they need to match the column names

dataframe.as[MyType].map(x=> <<Your Transformation code>>).toDf()
7d7tgy0s

7d7tgy0s2#

这是你能做的 regex_replace 以及 regex_extract ,提取所需的值并替换为它

df.withColumn("hdfsPath", regexp_replace(
  $"hdfsPath",
  lit("/done"),
  concat(
    lit("/target/"),
    regexp_extract($"hdfsPath", "compiled-([0-9]{1,8})", 1),
    lit("/"),
    $"colId")
))

输出:

+-----+----------------------------------------------------------------------------------------------------+-------------+
|colId|hdfsPath                                                                                            |timestamp    |
+-----+----------------------------------------------------------------------------------------------------+-------------+
|11   |hdfs://novus-nameservice/a/b/c/target/20200218/11/compiled-20200218050518-1-0-0-1582020318751.snappy|1662157400000|
|12   |hdfs://novus-nameservice/a/b/c/target/20200219/12/compiled-20200219060507-1-0-0-1582023907108.snappy|1662158000000|
+-----+----------------------------------------------------------------------------------------------------+-------------+

希望这有帮助!

相关问题