我正在将Kafka2.11-1的抄表记录作为json流式传输到spark 2.1中。我不明白如何将流对象转换成Dataframe,然后再保存到Parquet文件。我希望scala脚本从json推断模式,以便在流源数据的json格式更改时自动生成一个新的parquet文件格式(稍后我将找出如何检测此情况,并在格式更改时启动一个新文件)。目前,我无法编写Parquet文件。
import org.apache.spark
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode, SparkSession}
val ssc = new StreamingContext(sc, batchDuration = Seconds(5))
val sqlContext = new SQLContext(sc)
ssc.checkpoint("_checkpoint")
// Connect to Kafka
import org.apache.spark.streaming.kafka.KafkaUtils
import _root_.kafka.serializer.StringDecoder
val kafkaParams = Map("metadata.broker.list" -> "xx.xx.xx.xx:9092")
val kafkaTopics = Set("test")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, kafkaTopics)
messages.print()
messages.foreachRDD(rdd => {
val part1= rdd.map(_._1)
val part2= rdd.map(_._2) // this has the json
print ("%%%% part1 is : " + part1)
print ("%%%% part2 is : " + part2)
// here: infer the schema from json and append the streamed data to a parquet file on hdfs
} )
ssc.start()
ssc.awaitTermination()
json如下所示:
-------------------------------------------
Time: 1513155855000 ms
-------------------------------------------
(null,{"customer_id":"customer_51","customer_acct_id":"cusaccid_1197","serv_acct_id":"service_1957","installed_service_id":"instserv_946","meter_id":"meter_319","channel_number":"156","interval_read_date":"2013-06-16 11:26:04","interval_received":"5","interval_measure":"hour","interval_expected":"1","received_3days":"1","expected_3days":"1","received_30days":"1","expected_30days":"1","meter_exclusion_ind":"N", "provisioned_meter_ind:"N"})
(null,{"customer_id":"customer_25","customer_acct_id":"cusaccid_1303","serv_acct_id":"service_844","installed_service_id":"instserv_1636","meter_id":"meter_663","channel_number":"1564","interval_read_date":"2014-02-13 12:52:34","interval_received":"8","interval_measure":"hour","interval_expected":"1","received_3days":"1","expected_3days":"1","received_30days":"1","expected_30days":"1","meter_exclusion_ind":"N","provisioned_meter_ind":"N"})
(null,{"customer_id":"customer_1955","customer_acct_id":"cusaccid_1793","serv_acct_id":"service_577","installed_service_id":"instserv_1971","meter_id":"meter_1459","channel_number":"1312","interval_read_date":"2017-05-23 07:32:13","interval_received":"11","interval_measure":"hour","interval_expected":"1","received_3days":"1","expected_3days":"1","received_30days":"1","expected_30days":"1","meter_exclusion_ind":"N","provisioned_meter_ind":"N"})
(null,{"customer_id":"customer_1833","customer_acct_id":"cusaccid_1381","serv_acct_id":"service_461","installed_service_id":"instserv_477","meter_id":"meter_1373","channel_number":"1769","interval_read_date":"2011-12-13 10:12:20","interval_received":"15","interval_measure":"hour","interval_expected":"1","received_3days":"1","expected_3days":"1","received_30days":"1","expected_30days":"1","meter_exclusion_ind":"N","provisioned_meter_ind":"N"})
(null,{"customer_id":"customer_1597","customer_acct_id":"cusaccid_1753","serv_acct_id":"service_379","installed_service_id":"instserv_1061","meter_id":"meter_1759","channel_number":"632","interval_read_date":"2013-07-22 05:49:55","interval_received":"7","interval_measure":"hour","interval_expected":"1","received_3days":"1","expected_3days":"1","received_30days":"1","expected_30days":"1","meter_exclusion_ind":"N","provisioned_meter_ind":"N"})
2017-12-13 09:04:15,626 INFO org.apache.spark.streaming.scheduler.JobGenerator (Logging.scala:logInfo(54)) - Checkpointing graph for time 1513155855000 ms
我用spark shell测试这个:
spark-shell --jars /opt/alti-spark-2.1.1/external/kafka-0-8/target/spark-streaming-kafka-0-8_2.11-2.1.1.jar --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0
暂无答案!
目前还没有任何答案,快来回答吧!