我正在使用Spark3.0和scala在AmazonEMR6.10上运行作业。
问题:在将我的键值rdd导出到s3上的序列文件并将其读回时,我看到一些键指向错误的值。我在保存数据之前和读回数据之后都打印了出来,键的值是不同的。例子:
Export:
"a" -> ObjA,"b" -> ObjB,"c" -> ObjC,.....,"z" -> ObjZ
Import:
"a" -> ObjK,"b" -> ObjV,"c" -> ObjC,.....,"z" -> ObjZ
导出rdd[(text,immutablewriteble)],键为string,值为kryov4序列化对象。
导出使用的选项:
rdd.saveAsNewAPIHadoopFile(path, classOf[Text], classOf[ImmutableBytesWritable], classOf[SequenceFileOutputFormat[Text, ImmutableBytesWritable]], hadoopConfig)
和
rdd.saveAsSequenceFile(sitesOutputBucketSeq, Some(classOf[SnappyCodec]))
使用的回读选项:
sparkContext.newAPIHadoopFile[Text, ImmutableBytesWritable, SequenceFileInputFormat[Text, ImmutableBytesWritable]](sitesOutputBucketSeq, classOf[SequenceFileInputFormat[Text, ImmutableBytesWritable]], classOf[Text], classOf[ImmutableBytesWritable], hadoopConfig)
和
sparkContext.sequenceFile[String, ImmutableBytesWritable](sitesOutputBucketSeq)
结果是一样的-一些键和值被洗牌
我在写300个值和300万个值时遇到了这个问题。我没有看到这个问题,而保存到Parquet格式和阅读回来,看起来像一些问题与序列格式输出在我的情况。
我的星火形态:
scConf.set("spark.sql.parquet.enableVectorizedReader", "false")
scConf.set("spark.kryo.referenceTracking", "false")
scConf.set("spark.sql.parquet.compression.codec", "snappy")
scConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
scConf.set("spark.sql.adaptive.enabled", "true")
scConf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
scConf.set("mapreduce.map.output.compress", "true")
scConf.set("mapreduce.output.fileoutputformat.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")
scConf.set("mapred.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec")
暂无答案!
目前还没有任何答案,快来回答吧!