全部,
我正在尝试为spark dataframe创建udf,它将用于生成每行的唯一id。为了确保唯一性,我依赖于:id生成器将“epoch value of timestamp(bigint)+”作为参数传递的唯一源id+随机数5位数
我有两个问题:
如何在id生成函数“idgenerator”中包含单调递增的\u id()
使用自定义项时,由于以下错误而失败:
error:Type mismatch;
found : String(SRC_")
required : org.apache.spark.sql.Column
df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") )
请提供任何指针。。。
Object SequenceGeneratorUtil extends Serializable {
val random = new scala.util.Random
val start = 10000
val end = 99999
//CustomEpochGenerator - this is custom function to generate the epoch value for current timestamp in milliseconds
// ID Generator will take "epoch value of timestamp ( bigint ) + "unique Source ID passed as argument + randomNumber 5 digit
def idGenerator(SrcIdentifier: String ): String = SrcIdentifier + CustomEpochGenerator.nextID.toString + (start + random.nextInt((end - start) + 1)).toString // + monotonically_increasing_id ( not working )
val GenID = udf[String, String](idGenerator __)
}
val df2 = df.withColumn("rowkey",SequenceGeneratorUtil.GenID("SRC_") )
1条答案
按热度按时间aiazj4mn1#
更改以下功能
添加到下面的函数
mId
中的额外参数idGenerator
保持monotonically_increasing_id
价值观。在下面更改
udf
```val GenID = udf[String, String](idGenerator __)
val GenID = udf(idGenerator _)
df.withColumn("rowkey",SequenceGeneratorUtil.GenID(lit("SRC_")) )
到