我正在从MongodB导入一个集合到Spark。
val partitionDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("database", "db").option("collection", collectionName).load()
对于结果DataFrame
中的data
列,我得到以下类型:
StructType(StructField(configurationName,NullType,true), ...
因此在一些列中的至少一些类型是NullType
。
根据Spark中的Writing null values to Parquet(当NullType位于StructType内部时),我尝试通过将所有NullType
替换为StringType
来修复模式:
def denullifyStruct(struct: StructType): StructType = {
val items = struct.map{ field => StructField(field.name, denullify(field.dataType), field.nullable, field.metadata) }
StructType(items)
}
def denullify(dt: DataType): DataType = {
if (dt.isInstanceOf[StructType]) {
val struct = dt.asInstanceOf[StructType]
return denullifyStruct(struct)
} else if (dt.isInstanceOf[ArrayType]) {
val array = dt.asInstanceOf[ArrayType]
return ArrayType(denullify(array.elementType), array.containsNull)
} else if (dt.isInstanceOf[NullType]) {
return StringType
}
return dt
}
val fixedDF = spark.createDataFrame(partitionDF.rdd, denullifyStruct(partitionDF.schema))
发出fixedDF.printSchema
,我可以看到NullType
不再存在于fixedDF
的模式中。
fixedDF.write.mode("overwrite").parquet(partitionName + ".parquet")
出现以下错误:
Caused by: com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a NullType (value: BsonString{value='117679.8'})
at com.mongodb.spark.sql.MapFunctions$.convertToDataType(MapFunctions.scala:214)
at com.mongodb.spark.sql.MapFunctions$.$anonfun$documentToRow$1(MapFunctions.scala:37)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
又是一个NullType
!
当我只计算行数时,也会出现同样的问题:fixedDF.count()
.
Spark在写Parquet(或计数)时是否会再次推断模式?是否有可能关闭这种推断(或以其他方式克服这种推断)?
2条答案
按热度按时间2izufjch1#
问题不是由parquet写入方法引起的。由于某种类型转换问题,将数据作为 Dataframe 读取时出错。此jira page表示从mondoDB读取数据时需要添加
samplePoolSize
选项和other options。mspsb9vt2#
问题是,即使您提供了带有显式模式的
DataFrame
,对于某些操作(如count()
或保存到磁盘),Mongo派生的DataFrame
仍将推断模式。为了推断架构,它使用采样,这意味着它在推断时看不到某些数据。如果它只看到具有
null
值的某个字段,它将为其推断NullType
。稍后,当它遇到具有某个字符串的此字段时,这样的字符串将无法转换为NullType
。因此,这里的基本问题是采样。如果您的模式是稳定的和“密集的”(每个或几乎每个文档都填充了所有字段),采样将很好地工作。但如果某些字段是“稀疏的”(高概率为空),采样可能会失败。
一个粗略的解决方案是完全避免抽样。也就是说,使用一般总体而不是样本来推断模式。如果没有太多的数据(或者你能够等待),它可能会工作。
下面是一个实验分支:https://github.com/rpuch/mongo-spark/tree/read-full-collection-instead-of-sampling
这个想法是从采样切换到使用整个集合,如果这样配置的话。引入一个新的配置选项有点太麻烦了,所以我只是在'sampleSize'配置选项设置为1的情况下禁用采样,如下所示:
在这种情况下,完全避免了采样。一个明显的解决方案是使用等于集合大小的N来采样,这使得MongoDB在内存中对大量数据进行排序,这似乎是有问题的。因此,我完全禁用了采样。