我有一个在spark-2.4.4集群中运行的应用程序,它使用 rdd.toDF()
然后输出到文件。
为了优化从机资源的使用,应用程序在多线程中执行作业。代码段如下所示:
import spark.implicits._
Seq(rdd1, rdd2).par.foreach { rdd =>
rdd.toDF().write.text("xxx")
}
我发现了 toDF()
不是线程安全的。应用程序有时失败 java.lang.UnsupportedOperationException
.
您可以从以下代码片段中复制它(发生率为1%,当case类有大量字段时更容易发生):
package example
import org.apache.spark.sql.SparkSession
object SparkToDF {
case class SampleData(
field_1: Option[Int] = None,
field_2: Option[Long] = Some(0L),
field_3: Option[Double] = None,
field_4: Option[Float] = None,
field_5: Option[Short] = None,
field_6: Option[Byte] = None,
field_7: Option[Boolean] = Some(false),
field_8: Option[StructData] = Some(StructData(Some(0), Some(0L), None, None, None, None, None)),
field_9: String = "",
field_10: String = "",
field_11: String = "",
field_12: String = "",
field_13: String = "",
field_14: String = "",
field_15: String = "",
field_16: String = "",
field_17: String = "",
field_18: String = "",
field_19: String = "",
field_20: String = "",
field_21: String = "",
field_22: String = "",
field_23: String = "",
field_24: String = "",
field_25: String = "",
field_26: String = "",
field_27: String = "",
field_28: String = "",
field_29: String = "",
field_30: String = "",
field_31: String = "",
field_32: String = "",
field_33: String = "",
field_34: String = "",
field_35: String = "",
field_36: String = "",
field_37: String = "",
field_38: String = "",
field_39: String = "",
field_40: String = "",
field_41: String = "",
field_42: String = "",
field_43: String = "",
field_44: String = "",
field_45: String = "",
field_46: String = "",
field_47: String = "",
field_48: String = "",
field_49: String = "",
field_50: String = "",
field_51: String = "",
field_52: String = "",
field_53: String = "",
field_54: String = "",
field_55: String = "",
field_56: String = "",
field_57: String = "",
field_58: String = "",
field_59: String = "",
field_60: String = ""
)
case class StructData(
field_1: Option[Int],
field_2: Option[Long],
field_3: Option[Double],
field_4: Option[Float],
field_5: Option[Short],
field_6: Option[Byte],
field_7: Option[Boolean]
)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("ThreadSafeToDF()")
.getOrCreate()
import spark.implicits._
try {
(1 to 10).par.foreach { _ =>
(1 to 10000).map(_ => SampleData()).toDF()
}
} finally {
spark.close()
}
}
}
for i in {1..200}; do
echo "iter: $i (`date`)";
./bin/spark-submit --class sample.SparkToDF your.jar
done
您可能会收到类似的异常消息: Schema for type A is not supported
```
Exception in thread "main" java.lang.UnsupportedOperationException: Schema for type scala.Option[scala.Float] is not supported
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:812)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:743)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:929)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:742)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:407)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1$$anonfun$7.apply(ScalaReflection.scala:406)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:296)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:406)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor$1.apply(ScalaReflection.scala:176)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:929)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:49)
at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$deserializerFor(ScalaReflection.scala:176)
at org.apache.spark.sql.catalyst.ScalaReflection$.deserializerFor(ScalaReflection.scala:164)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:72)
at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
at org.apache.spark.sql.LowPrioritySQLImplicits$class.newProductEncoder(SQLImplicits.scala:248)
at org.apache.spark.sql.SQLImplicits.newProductEncoder(SQLImplicits.scala:34)
从错误消息中,异常是由 `ScalaReflection.schemaFor()` .
我已经看过代码了,似乎spark使用scala反射来获取数据类型,我知道scala反射中存在并发问题。
https://issues.apache.org/jira/browse/spark-26555
类型匹配scala反射中的线程安全
这是预期的行为吗?我找不到任何关于创建Dataframe时线程安全的文档。
我在将rdd转换为dataframe时添加了一个锁,解决了这个问题。
object toDFLock
import spark.implicits
Seq(rdd1, rdd2).par.foreach { rdd =>
toDFLock.synchronized(rdd.toDF()).write.text("xxx")
}
暂无答案!
目前还没有任何答案,快来回答吧!