有没有一种方法可以使用模式将来自kafka和spark的avro消息转换为Dataframe?用户记录的架构文件:
{
"fields": [
{ "name": "firstName", "type": "string" },
{ "name": "lastName", "type": "string" }
],
"name": "user",
"type": "record"
}
以及sqlnetworkwordcount示例和kafka、spark和avro的代码片段—第3部分,生成和使用avro消息以读入消息。
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
...
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
case class User(firstName: String, lastName: String)
不知何故,除了使用case类将avro消息转换为dataframe之外,我找不到其他方法。有没有可能改用模式?我在用 Spark 1.6.2
以及 Kafka 0.10
.
完整的代码,如果你感兴趣的话。
import com.twitter.bijection.Injection
import com.twitter.bijection.avro.GenericAvroCodecs
import kafka.serializer.{DefaultDecoder, StringDecoder}
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.streaming.kafka._
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}
import org.apache.spark.{SparkConf, SparkContext}
object ReadMessagesFromKafka {
object Injection {
val parser = new Schema.Parser()
val schema = parser.parse(getClass.getResourceAsStream("/user_schema.json"))
val injection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)
}
def main(args: Array[String]) {
val brokers = "127.0.0.1:9092"
val topics = "test"
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("ReadMessagesFromKafka").setMaster("local[*]")
val ssc = new StreamingContext(sparkConf, Seconds(2))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
ssc, kafkaParams, topicsSet)
messages.foreachRDD((rdd: RDD[(String, Array[Byte])]) => {
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits._
val df = rdd.map(message => Injection.injection.invert(message._2).get)
.map(record => User(record.get("firstName").toString, records.get("lastName").toString)).toDF()
df.show()
})
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
/**Case class for converting RDD to DataFrame */
case class User(firstName: String, lastName: String)
/**Lazily instantiated singleton instance of SQLContext */
object SQLContextSingleton {
@transient private var instance: SQLContext = _
def getInstance(sparkContext: SparkContext): SQLContext = {
if (instance == null) {
instance = new SQLContext(sparkContext)
}
instance
}
}
4条答案
按热度按时间o2g1uqev1#
对于任何有兴趣以一种不需要停止和重新部署spark应用程序(假设应用程序逻辑可以处理此问题)就可以处理模式更改的方式来处理此问题的人,请参阅以下问题/答案。
np8igboo2#
op可能解决了这个问题,但为了将来的参考,我解决了这个问题相当普遍,所以认为它可能是有帮助的张贴在这里。
所以一般来说,您应该将avro模式转换为spark structtype,并将rdd中的对象转换为行[any],然后使用:
为了转换avro模式,我使用了spark avro,如下所示:
rdd的转换更加棘手。。如果你的模式很简单,你可以做一个简单的Map。。像这样:
在本例中,对象有两个字段name和age。
重要的是确保行中的元素与structtype中的字段的顺序和类型相匹配。
在我的特殊情况下,我有一个更复杂的对象,我想处理它以支持将来的模式更改,所以我的代码要复杂得多。
op建议的方法应该也适用于某些casese,但很难在复杂对象(不是primitive或case类)上使用
另一个技巧是,如果类中有一个类,则应将该类转换为一行,以便 Package 类将转换为如下内容:
您还可以看看我前面提到的spark avro项目,它介绍了如何将对象转换为行。。我自己也用了一些逻辑
如果有人阅读这篇文章需要进一步的帮助,请在评论中问我,我会尽力帮助你
类似的问题在这里也得到了解决。
tquggr8v3#
我也研究过类似的问题,但用的是java。所以不确定scala,但是看看库com.databricks.spark.avro。
wi3ka0sx4#
请看一下这个https://github.com/databricks/spark-avro/blob/master/src/test/scala/com/databricks/spark/avro/avrosuite.scala
所以
你可以试试这个