import org.apache.avro.Schema
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.schemaregistry.client.rest.RestService
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.avro.SchemaConverters
import scala.collection.JavaConverters._
import java.time.LocalDateTime
spark.sparkContext.setLogLevel("Error")
val brokerServers = "xxx.yyy.zzz:9092"
val topicName = "mytopic"
val schemaRegistryURL = "http://xxx.yyy.zzz:8081"
val restService = new RestService(schemaRegistryURL)
val exParser = new Schema.Parser
//-- For both key and value
val schemaNames = Seq("key", "value")
val schemaStrings = schemaNames.map(i => (i -> restService.getLatestVersion(s"$topicName-$i").getSchema)).toMap
val tempStructMap = schemaStrings.transform((k,v) => SchemaConverters.toSqlType(exParser.parse(v)).dataType)
val schemaStruct = new StructType().add("key", tempStructMap("key")).add("value", tempStructMap("value"))
//-- For key only
// val schemaStrings = restService.getLatestVersion(s"$topicName-key").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType
//-- For value only
// val schemaStrings = restService.getLatestVersion(s"$topicName-value").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType
val query = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerServers)
.option("subscribe", topicName)
.load()
.writeStream
.outputMode("append")
//.option("checkpointLocation", s"cos://$bucket.service/checkpoints/$tableName")
.foreachBatch((batchDF: DataFrame, batchId: Long) => {
val bcTopicName = sc.broadcast(topicName)
val bcSchemaRegistryURL = sc.broadcast(schemaRegistryURL)
val bcSchemaStrings = sc.broadcast(schemaStrings)
val rstDF = batchDF.map {
row =>
val props = Map("schema.registry.url" -> bcSchemaRegistryURL.value)
//-- For both key and value
val isKeys = Map("key" -> true, "value" -> false)
val deserializers = isKeys.transform{ (k,v) =>
val des = new KafkaAvroDeserializer
des.configure(props.asJava, v)
des
}
//-- For key only
// val deserializer = new KafkaAvroDeserializer
// deserializer.configure(props.asJava, true)
//-- For value only
// val deserializer = new KafkaAvroDeserializer
// deserializer.configure(props.asJava, false)
val inParser = new Schema.Parser
//-- For both key and value
val values = bcSchemaStrings.value.transform( (k,v) =>
deserializers(k).deserialize(bcTopicName.value, row.getAs[Array[Byte]](k), inParser.parse(v)).toString)
s"""{"key": ${values("key")}, "value": ${values("value")} }"""
//-- For key only
// deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("key"), inParser.parse(bcSchemaStrings.value)).toString
//-- For value only
// deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("value"), inParser.parse(bcSchemaStrings.value)).toString
}
.select(from_json(col("value"), schemaStruct).as("root"))
.select("root.*")
println(s"${LocalDateTime.now} --- Batch $batchId: ${rstDF.count} rows")
rstDF.printSchema
rstDF.show(false)
})
.trigger(Trigger.ProcessingTime("60 seconds"))
.start()
query.awaitTermination()
//Used Confluent version 3.2.2 to write this.
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema
case class DeserializedFromKafkaRecord(key: String, value: String)
val schemaRegistryURL = "http://127.0.0.1:8081"
val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"
//create RestService object
val restService = new RestService(schemaRegistryURL)
//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)
//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)
//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)
//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)
//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null
//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "127.0.0.1:9092")
.option("subscribe", topicName)
.option("startingOffsets", "earliest")
.option("maxOffsetsPerTrigger", 20) //remove for prod
.load()
//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map{
row =>
if (keyDeserializer == null) {
keyDeserializer = new KafkaAvroDeserializer
keyDeserializer.configure(props.asJava, true) //isKey = true
}
if (valueDeserializer == null) {
valueDeserializer = new KafkaAvroDeserializer
valueDeserializer.configure(props.asJava, false) //isKey = false
}
//Pass the Avro schema.
val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString
DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)
}
val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
.outputMode("append")
.format("console")
.option("truncate", false)
.start()
def main(args: Array[String]): Unit = {
val cmd: CommandLine = parseArg(args)
val master = cmd.getOptionValue("master", "local[*]")
val spark = SparkSession.builder()
.appName(App.getClass.getName)
.master(master)
.getOrCreate()
val bootstrapServers = cmd.getOptionValue("bootstrap-server")
val topic = cmd.getOptionValue("topic")
val schemaRegistryUrl = cmd.getOptionValue("schema-registry")
consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)
spark.stop()
}
// ... still continues
然后,对Kafka主题进行消费和反序列化的重要方法
private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = {
import spark.implicits._
// Setup the Avro deserialization UDF
schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient)
spark.udf.register("deserialize", (bytes: Array[Byte]) =>
kafkaAvroDeserializer.deserialize(bytes)
)
// Load the raw Kafka topic (byte stream)
val rawDf = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.load()
// Deserialize byte stream into strings (Avro fields become JSON)
import org.apache.spark.sql.functions._
val jsonDf = rawDf.select(
// 'key.cast(DataTypes.StringType), // string keys are simplest to use
callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
callUDF("deserialize", 'value).as("value")
// excluding topic, partition, offset, timestamp, etc
)
// Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
val dfValueSchema = {
val rawSchema = lookupTopicSchema(topic)
avroSchemaToSparkSchema(rawSchema)
}
// Apply structured schema to JSON stream
val parsedDf = jsonDf.select(
'key, // keys are usually plain strings
// values are JSONified Avro records
from_json('value, dfValueSchema.dataType).alias("value")
).select(
'key,
$"value.*" // flatten out the value
)
// parsedDf.printSchema()
// Sample schema output
// root
// |-- key: string (nullable = true)
// |-- header: struct (nullable = true)
// | |-- time: long (nullable = true)
// | ...
// TODO: Do something interesting with this stream
parsedDf.writeStream
.format("console")
.outputMode(OutputMode.Append())
.option("truncate", false)
.start()
.awaitTermination()
}
// still continues
命令行解析器允许传入引导服务器、模式注册表、主题名和spark master。
private def parseArg(args: Array[String]): CommandLine = {
import org.apache.commons.cli._
val options = new Options
val masterOption = new Option("m", "master", true, "Spark master")
masterOption.setRequired(false)
options.addOption(masterOption)
val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
bootstrapOption.setRequired(true)
options.addOption(bootstrapOption)
val topicOption = new Option("t", "topic", true, "Kafka topic")
topicOption.setRequired(true)
options.addOption(topicOption)
val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
schemaRegOption.setRequired(true)
options.addOption(schemaRegOption)
val parser = new BasicParser
parser.parse(options, args)
}
// still continues
7条答案
按热度按时间wd2eg0qa1#
基于@cricket\u 007的答案,我创建了以下可以在集群环境中运行的解决方案,包括以下新特性:
您需要添加广播变量来将一些值传输到集群环境的Map操作中。schema.parser和kafkaavrodeserializer都不能在spark中序列化,因此需要在Map操作中初始化它们
我的结构化流使用foreachbatch输出接收器。
我应用org.apache.spark.sql.avro.schemaconverters将avro schema格式转换为spark structtype,这样您就可以在from\ json column函数中使用它来解析kafka主题字段(key和value)中的dataframe。
首先,您需要加载一些包:
以下是我在spark shell中测试的全部代码:
brvekthn2#
我花了几个月的时间阅读源代码并进行测试。简而言之,spark只能处理字符串和二进制序列化。必须手动反序列化数据。在spark中,创建合流rest服务对象以获取模式。使用avro解析器将响应对象中的模式字符串转换为avro模式。接下来,照常读Kafka的主题。然后用合流的kafkaavrodeserializerMap二进制类型的“value”列。我强烈建议进入这些类的源代码,因为这里有很多内容,所以为了简洁起见,我将省略许多细节。
zhte4eai3#
对于任何想做这件事的人
pyspark
:之前felipe引用的库在jvm上运行得很好,所以我编写了一个小 Package 函数,将它集成到python中。这看起来非常粗糙,因为scala语言中的很多隐式类型都必须在py4j中显式指定。到目前为止,即使是在spark 2.4.1中,它也运行得很好。为此,您必须将库添加到spark包中,例如。
5q4ezhmt4#
由于另一个最有用的答案被删除了,我想重新添加一些重构和注解。
以下是所需的依赖项。
下面是scala实现(仅在本地测试)
master=local[*]
)第一部分,定义导入、一些字段和一些helper方法来获取模式
然后定义一个简单的main方法来解析cmd参数以获得kafka的详细信息
然后,对Kafka主题进行消费和反序列化的重要方法
命令行解析器允许传入引导服务器、模式注册表、主题名和spark master。
为了让上面的udf工作,那么就需要一个反序列化器来将字节的Dataframe转换成包含反序列化avro的Dataframe
把这些块放在一起,它在添加后在intellij中工作
-b localhost:9092 -s http://localhost:8081 -t myTopic
运行配置>程序参数q9yhzks05#
这个图书馆将为你做这项工作。它通过spark结构化流连接到合流模式注册表。
对于confluent,它处理与有效负载一起发送的模式id。
在自述文件中,您将找到一段代码片段,说明如何执行此操作。
披露:我为absa工作,我开发了这个图书馆。
ecbunoof6#
这是我将spark结构化流与kafka和schema registry(scala中的代码)集成在一起的代码示例
在阅读Kafka主题时,我们有这样一种模式:
key:binary | value:binary | topic:string | partition:integer | offset:long | timestamp:timestamp | timestamptype:integer|
如我们所见,键和值是二进制的,所以我们需要将键转换为字符串,在本例中,值是avro格式的,所以我们可以通过调用
from_avro
功能。与spark和kafka的依赖关系不同,我们需要以下依赖关系:
nx7onnlm7#
databricks现在提供此功能,但您必须为此付费:-(
请参见:https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html 更多信息
一个好的免费选择是阿布里斯。请参见:https://github.com/absaoss/abris 我们可以看到的唯一缺点是,您需要在运行时提供avro模式的文件,以便框架可以在将其发布到kafka主题之前在您的Dataframe上实施此模式。