Kafka 如何在Spark Scala中从avro消息中提取schema id

qojgxg4l  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(103)

我有一个Spark Scala框架,其中列包含avro message的值(Array[Byte])。我知道0字节是魔术字节,位置1-4中的字节是模式ID。我如何提取这些字节(1-4)并添加新的列与模式id值在int?
我需要在spark Scala中使用一些spark函数/udf来extrace schema id值

yyhrrdl8

yyhrrdl81#

你可以这样做:

import org.apache.spark.sql.functions.udf

// extract the schema id (bytes 1-4) and convert it to integer
val extractSchemaId = udf((message: Array[Byte]) => {
  ByteBuffer.wrap(message.slice(1, 5)).getInt()
})

// assuming the "value" column holds the Avro message
df.withColumn("schema_id", extractSchemaId(df("value")))

相关问题