我正在使用scala处理spark流。我需要从hdfs目录中读取一个.csv文件,如下所示:
val lines = ssc.textFileStream("/user/root/")
我使用以下命令行将文件放入hdfs:
hdfs dfs -put ./head40k.csv
它可以在相对较小的文件中正常工作。当我试着用一个更大的,我得到这个错误:
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/root/head800k.csv._COPYING
我能理解为什么,但我不知道怎么解决。我也尝试过这个解决方案:
hdfs dfs -put ./head800k.csv /user
hdfs dfs -mv /usr/head800k.csv /user/root
但我的程序不读取文件。有什么想法吗?提前谢谢
课程:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.rdd.RDDFunctions._
import scala.sys.process._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.HashMap
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import StreamingContext._
object Traccia2014{
def main(args: Array[String]){
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <test><topicRisultato>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(brokers,risultato) = args
val sparkConf = new SparkConf().setAppName("Traccia2014")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.textFileStream("/user/root/")
//val lines= ssc.fileStream[LongWritable, Text, TextInputFormat](directory="/user/root/",
// filter = (path: org.apache.hadoop.fs.Path) => //(!path.getName.endsWith("._COPYING")),newFilesOnly = true)
//**********Definizioni Producer***********
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val slice=30
lines.foreachRDD( rdd => {
if(!rdd.isEmpty){
val min=rdd.map(x => x.split(",")(0)).reduce((a, b) => if (a < b) a else b)
if(!min.isEmpty){
val ipDst= rdd.map(x => (((x.split(",")(0).toInt - min.toInt).toLong/slice).round*slice+" "+(x.split(",")(2)),1)).reduceByKey(_ + _)
if(!ipDst.isEmpty){
val ipSrc=rdd.map(x => (((x.split(",")(0).toInt - min.toInt).toLong/slice).round*slice+" "+(x.split(",")(1)),1)).reduceByKey(_ + _)
if(!ipSrc.isEmpty){
val Rapporto=ipSrc.leftOuterJoin(ipDst).mapValues{case (x,y) => x.asInstanceOf[Int] / y.getOrElse(1) }
val RapportoFiltrato=Rapporto.filter{case (key, value) => value > 100 }
println("###(ConsumerScala) CalcoloRapporti: ###")
Rapporto.collect().foreach(println)
val str = Rapporto.collect().mkString("\n")
println(s"###(ConsumerScala) Produco Risultato : ${str}")
val message = new ProducerRecord[String, String](risultato, null, str)
producer.send(message)
Thread.sleep(1000)
}else{
println("src vuoto")
}
}else{
println("dst vuoto")
}
}else{
println("min vuoto")
}
}else
{
println("rdd vuoto")
}
})//foreach
ssc.start()
ssc.awaitTermination()
} }
1条答案
按热度按时间h5qlskok1#
/user/root/head800k.csv._COPYING
是在复制过程进行时创建的临时文件。等待复制过程完成,您将有一个失败,而不会出现错误_COPYING
后缀ie/user/root/head800k.csv
.要在spark流式处理作业中过滤这些 transient ,可以使用
fileStream
方法记录如下,例如编辑
由于要将文件从本地文件系统移动到hdfs,最好的解决方案是将文件移动到hdfs中的临时暂存位置,然后将它们移动到目标目录。在hdfs文件系统中复制或移动应该避免临时文件