使用MongoDB-Spark连接器(> 10.x)如何从数据库中成功读取Map类型(现在返回为Struct)

oxf4rvwz  于 2023-08-04  发布在  Go
关注(0)|答案(1)|浏览(83)

我正在更新我公司的一些代码,从旧版本的spark迁移到更现代的版本(Spark 3.3.1)。作为其中的一部分,我还升级了我们的spark-mongodb连接器代码(从mongo-spark-connector 2.4.2移动到10.1.1),但我遇到了一个奇怪的问题。如果创建一个spark DataSet,它基于一个简单的case类,包含一个scala Map[String,Long]作为它的字段之一,它会作为一个奇怪的结构类型从数据库中读回,我无法弄清楚为什么会这样,或者如何让新的Spark-mongo连接器将它作为一个类型化对象读回。
在以前的代码版本中,我可以简单地使用MongoSpark同伴对象的load[T]方法,其中T是我想要返回的对象的case类描述,并且它“刚刚工作”。然而,在最新版本的API中,MongoSpark同伴对象似乎消失了(至少我找不到它XD)。
下面是一个完整的最小的例子展示了这一点,我会喜欢一只手弄清楚我在这个升级错过了什么

import org.apache.spark.sql.{DataFrame, Dataset}
import com.mongodb.spark._

case class SimpleOutput(_id: String, name: String, mapInfo: Map[String, Long])

import spark.implicits._
val data1 = SimpleOutput("d1", "Test Data 1", Map("starfleet" -> 10, "serenity" -> 13))
val data2 = SimpleOutput("d2", "Test Data 2", Map("NAMCO" -> 1981, "Nintendo" -> 1985))
val data3 = SimpleOutput("d3", "Test Data 3", Map("Sonic" -> 3, "Tails" -> 2, "Knuckles" -> 5))
val inputData = Seq(data1, data2, data3)
val inputDS = spark.createDataset[SimpleOutput](inputData)
inputDS.printSchema()
inputDS.write.format("mongodb")
  .mode("overwrite")
  .options(Map("connection.uri" -> "MyConnectionURI",
               "database" -> "MyDatabaseName",
               "collection" -> "MyCollectionName",
               "replaceDocument" -> "false"))
  .save()
val readConfigOptions = Map("connection.uri" -> "MyConnectionURI",
                            "database" -> "MyDatabaseName",
                            "collection" -> "MyCollectionName",
                            "sql.inferSchema.mapTypes.enabled" -> "true")

val outputDF = spark.read.format("mongodb").options(readConfigOptions).load()
outputDF.printSchema()
outputDF.count() shouldBe 3
outputDF.as[SimpleOutput].collect() should contain theSameElementsAs inputData

字符串
此操作将失败,并出现以下错误:“不支持反序列化程序:需要(n)“MAP”字段,但得到“STRUCT<Knuckles:BIGINT,NAMCO:任天堂BIGINT:BIGINT,Sonic:BIGINT,尾部:BIGINT,宁静号:BIGINT,星际舰队:BIGINT>".”
打印模式的调用说明了问题inputDS的模式:

root
 |-- _id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- mapInfo: map (nullable = true)
 |    |-- key: string
 |    |-- value: long (valueContainsNull = false)


这和我所期望的完全一样,但是outputDF的模式不是

root
 |-- _id: string (nullable = true)
 |-- mapInfo: struct (nullable = true)
 |    |-- Knuckles: long (nullable = true)
 |    |-- NAMCO: long (nullable = true)
 |    |-- Nintendo: long (nullable = true)
 |    |-- Sonic: long (nullable = true)
 |    |-- Tails: long (nullable = true)
 |    |-- serenity: long (nullable = true)
 |    |-- starfleet: long (nullable = true)
 |-- name: string (nullable = true)


现在,我传递的选项(根据Mongo的默认值为true)sql.inferSchema.mapTypes.enabled设置为true,所以我很困惑为什么我会看到这种行为。在过去(mongo-spark连接器2.4.2),我做了以下操作从Mongo读取我的数据,它工作了:

val readConfig = ReadConfig(Map("uri" -> "MyConnectionURI", "database" -> "MyDatabaseName", "collection" -> "MyCollectionName"), None)
MongoSpark.load[SimpleOutput](spark, readConfig)


我不知道从这里去哪里,Mongo docs是(海事组织)缺乏,只是暗示这将“只是工作”。我在其他地方找不到任何关于这方面的东西。感谢所有人的帮助,我彻底迷失了。

9gm1akwq

9gm1akwq1#

我发现,为了让mongo-spark连接器正确地解释模式,我需要使用Scala的反射来为代码中的T类型生成一个模式。这似乎是一个严重的短期来在新的驱动程序,但它只是其中的许多我已经发现,并已工作左右。但是,下面的代码适用于到目前为止我尝试过的任何case类,无论多么复杂:

def readAs[T <: Product : TypeTag](databaseName: String, collectionName: String,
                                     partitionerConfig: Option[MongoPartitionerConfig] = None): Dataset[T] = {
  import spark.implicits._
  val props = mongoProps(databaseName, collectionName, partitionerConfig)
  val schema = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]

  spark.read.format("mongodb")
       .options(props)
       .schema(schema)
       .load()
       .as[T]
}

字符串

相关问题