spark自定义项

pdsfdshx  于 2021-05-17  发布在  Spark
关注(0)|答案(1)|浏览(549)

全部,
我正在尝试为spark dataframe创建udf,它将用于生成每行的唯一id。为了确保唯一性,我依赖于:id生成器将“epoch value of timestamp(bigint)+”作为参数传递的唯一源id+随机数5位数
我有两个问题:
如何在id生成函数“idgenerator”中包含单调递增的\u id()
使用自定义项时,由于以下错误而失败:

error:Type mismatch;
    found : String(SRC_")
    required : org.apache.spark.sql.Column
            df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") )

请提供任何指针。。。

Object SequenceGeneratorUtil extends Serializable {

    val random = new scala.util.Random
    val start = 10000
    val end = 99999

    //CustomEpochGenerator - this is custom function to generate the epoch value for current timestamp in milliseconds
    // ID Generator will take "epoch value of timestamp ( bigint ) + "unique Source ID passed as argument + randomNumber 5 digit
    def idGenerator(SrcIdentifier: String ): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString // + monotonically_increasing_id ( not working )

    val GenID = udf[String, String](idGenerator __)

}

val df2 = df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") )
aiazj4mn

aiazj4mn1#

更改以下功能

def idGenerator(SrcIdentifier: String ): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString // + monotonically_increasing_id ( not working )

添加到下面的函数 mId 中的额外参数 idGenerator 保持 monotonically_increasing_id 价值观。

def idGenerator(SrcIdentifier: String,mId: Long): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString + mId

在下面更改 udf ```
val GenID = udf[String, String](idGenerator __)

val GenID = udf(idGenerator _)

由于以下错误而失败:error:type mismatch; 找到:需要字符串(src琰):org.apache.spark.sql.column df.withcolumn(“rowkey”,sequencegeneratorutil.genid(“src琰”))
因为 `SequenceGeneratorUtil.GenID` udf需要类型为的值 `org.apache.spark.sql.Column` 但你在传递价值 `SRC_` 属于类型 `String` .
要解决此问题,请使用 `lit` 功能。

df.withColumn("rowkey",SequenceGeneratorUtil.GenID(lit("SRC_")) )

在下面更改 `withColumn` ```
val df2 = df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") )

val df2 = df
            .withColumn(
                "rowkey",
                SequenceGeneratorUtil.GenID(
                    lit("SRC_"), // using lit function to pass static string.
                    monotonically_increasing_id
                )
            )

相关问题