azure databricks stream foreach失败,出现notserializableexception

5gfr0r5j  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(345)

我想要不断地细化数据集流的行(最初由kafka发起):基于一个条件,我想要更新radis散列。这是我的代码片段( lastContacts 是前一个命令的结果,该命令是此类型的流: org.apache.spark.sql.DataFrame = [serialNumber: string, lastModified: long] . 这扩展到 org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] ):

class MyStreamProcessor extends ForeachWriter[Row] {
  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }

  override def process(record: Row) = {
    val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
    sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)
  }

  override def close(errorOrNull: Throwable): Unit = {}
}

val query = lastContacts
  .writeStream
  .foreach(new MyStreamProcessor())
  .start()

query.awaitTermination()

我收到一个巨大的堆栈跟踪,相关部分(我认为)是: java.io.NotSerializableException: org.apache.spark.sql.streaming.DataStreamWriter 有人能解释一下为什么会出现这种异常以及如何避免这种情况吗?谢谢您!
这个问题涉及以下两个方面:
dataframe到rdd[(string,string)]的转换
用databricks中的流调用每个元素的函数

iezvtpos

iezvtpos1#

spark上下文不可序列化。
foreachwriter的任何实现都必须是可序列化的,因为每个任务都将获得所提供对象的新序列化反序列化副本。因此,强烈建议在调用open(…)方法(这表示任务已准备好生成数据)之后进行任何写入数据的初始化(例如,打开连接或启动事务)。
在您的代码中,您试图在process方法中使用spark上下文,

override def process(record: Row) = {
    val stringHashRDD = sc.parallelize(Seq(("lastContact", record(1).toString)))
    *sc.toRedisHASH(stringHashRDD, record(0).toString)(redisConfig)*
  }

要将数据发送到redis,您需要创建自己的连接并在open方法中打开它,然后在process方法中使用它。
看看如何创建redis连接池。https://github.com/redislabs/spark-redis/blob/master/src/main/scala/com/redislabs/provider/redis/connectionpool.scala

相关问题