如何从cassandra列中的字母数字值中删除字符?

g0czyy6m  于 2021-06-14  发布在  Cassandra
关注(0)|答案(2)|浏览(299)

我有如下所示的数据模型。我的问题是在marks列而不是int值中插入了字母数字值,因此需要通过删除marks列中每个字母数字值的字符来清理此列
我想通过编程来实现这一点,所以我想继续使用apachespark和scala来编写spark作业并与cassandra进行比较。我对这两种技术都很陌生,所以有没有人能告诉我spark中有没有什么内置函数可以做到这一点,或者我可以采取什么最佳的方法。我很感激你的建议。
我知道我可以使用sparkDataframe,但对于如何在Dataframe上编写scala reg表达式来实现这一点并将其持久化感到困惑。

CREATE TABLE student (
    student_id text,
    dob text,
    subject text,
    marks text,
    PRIMARY KEY (student_id, dob, subject, marks)
) WITH CLUSTERING ORDER BY (dob DESC, subject ASC, marks ASC).

现在在我的 student 表中,marks列本应存储int值,但由于某些错误作业,因此插入了许多带有字母数字值的记录,如下所示:

===============================================
student_id |  dob         |  subject  |  marks
===============================================
1          | 10-05-2019   | A         | ab50
2          | 08-06-2019   | B         | 88
3          | 02-02-2019   | C         | h65u
4          | 04-02-2019   | D         | 99

现在我要清理这个表,从marks列中存储的字母数字值中删除所有字符。
例如: ab50 -> 50 (这是预期结果)。

a1o7rhls

a1o7rhls1#

这个特定的用例可以使用udf来解决。示例代码如下:

import org.apache.spark.sql.functions.udf
import spark.implicits._

val cleanUDF = udf((x: String) => x.filter(_.toString.matches("\\d")))

val rows = List(
  (1, "10-05-2019", "A", "ab50"),
  (2, "08-06-2019", "B", "88"),
  (3, "02-02-2019", "C", "h65u"),
  (4, "04-02-2019", "D", "99")
)
val inDF = spark.sparkContext.parallelize(rows).toDF("student_id", "dob", "subject", "marks")
inDF.show()
//  +----------+----------+-------+-----+
//  |student_id|       dob|subject|marks|
//  +----------+----------+-------+-----+
//  |         1|10-05-2019|      A| ab50|
//  |         2|08-06-2019|      B|   88|
//  |         3|02-02-2019|      C| h65u|
//  |         4|04-02-2019|      D|   99|
//  +----------+----------+-------+-----+

//inDF using data from cassandra db
/*val inDF = spark.read
  .format("org.apache.spark.sql.cassandra")
  .options(Map("table" -> "student", "keyspace" -> "$keyspace"))
  .load()

* /

val outDF = inDF.select(
  $"student_id", $"dob", $"subject", cleanUDF($"marks").alias("marks")
)
outDF.show()
//  +----------+----------+-------+-----+
//  |student_id|       dob|subject|marks|
//  +----------+----------+-------+-----+
//  |         1|10-05-2019|      A|   50|
//  |         2|08-06-2019|      B|   88|
//  |         3|02-02-2019|      C|   65|
//  |         4|04-02-2019|      D|   99|
//  +----------+----------+-------+-----+

-----编辑——数据可以在cassandra中重写,以避免由于主键约束而重复输入。免责声明:输出df必须包含整个数据,因为它将被截断和加载。

outDF.write.format("org.apache.spark.sql.cassandra")
.options(Map(
  "keyspace" -> "$keyspace",
  "table" -> "student",
  "confirm.truncate" -> "true"
))
.mode(SaveMode.Overwrite).save()
jhkqcmku

jhkqcmku2#

可以在cassandra中使用自定义udf和更新来完成

相关问题