我有一个spark2.0应用程序,它使用spark-streaming-kafka-0-10_.11从kafka读取消息。
结构化流看起来非常酷,所以我想尝试迁移代码,但我不知道如何使用它。
在常规流中,我使用kafkautils创建dstrean,在我传递的参数中是值反序列化器。
在结构化流媒体中,doc说我应该使用dataframe函数反序列化,但我不知道这到底意味着什么。
我看了一些例子,比如这个例子,但是我在Kafka中的avro对象很复杂,不能像例子中的字符串那样简单地转换。。
到目前为止,我尝试了这种代码(我在这里看到的是另一个问题):
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()
ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
我得到“数据类型不匹配:无法将binarytype转换为structtype(structfield(…)”
如何反序列化值?
4条答案
按热度按时间p1tboqfb1#
使用以下步骤:
定义Kafka信息。
定义一个consumer实用程序,它返回avroObject的数据集。
定义逻辑代码。
Kafka信息:
Kafka消费者:
更新
实用程序:
mmvthczy2#
如上所述,从spark2.1.0开始,批处理读取器支持avro,但sparksession.readstream()不支持。下面是我如何让它在scala中工作的其他React的基础上。为了简洁起见,我简化了这个模式。
uttx8gqw3#
所以实际上我公司有人帮我解决了这个问题,所以我会把它贴在这里给未来的读者。。
基本上,我错过了米古诺建议的解码部分:
现在,你可以阅读Kafka的信息,并像这样解码它们:
KafkaMessage
只是一个case类,它包含从kafka读取时得到的泛型对象(key,value,topic,partition,offset,timestamp)
AvroTo<YourObject>Decoder
是一个类,它将在给定模式注册表url的情况下解码对象。例如使用confluent的
KafkaAvroDeserializer
和架构注册表。从这些,打电话
.deserialize(topicName, bytes).asInstanceOf[GenericRecord]
获取avro对象。希望这对别人有帮助
kuarbcqp4#
我还不太熟悉spark的序列化是如何与新的/实验性的结构化流媒体结合使用的,但是下面的方法确实有效——尽管我不确定这是不是最好的方法(我觉得这种方法有点尴尬)。
我将尝试在一个自定义数据类型的示例中回答您的问题(这里是
Foo
案例类),而不是具体的avro,但我希望它会帮助你无论如何。其思想是使用kryo序列化来序列化/反序列化您的自定义类型,请参阅spark文档中的tuning:data serialization。注意:spark支持通过内置(隐式)编码器对case类进行开箱即用的序列化,您可以通过
import spark.implicits._
. 但是为了这个例子,让我们忽略这个功能。假设您定义了以下内容
Foo
case类作为自定义类型(tl;提示:为了防止遇到奇怪的投诉/错误,你应该把代码放到一个单独的Foo.scala
文件):现在您有了以下结构化流式代码来从kafka读取数据,其中输入主题包含kafka消息,其消息值是二进制编码的
String
,你的目标是创造Foo
基于这些消息值的示例(例如,类似于将二进制数据反序列化为avro类示例的方式):现在,我们正在将这些值反序列化到自定义的示例中
Foo
类型,首先需要为其定义一个隐式Encoder[Foo]
:回到你的avro问题,你需要做的是:
创建一个适当的
Encoder
为了你的需要。替换
Foo(new String(row.getAs[Array[Byte]]("value"))
使用代码将二进制编码的avro数据反序列化为avro pojo,即从消息值中取出二进制编码的avro数据的代码(row.getAs[Array[Byte]]("value")
)返回,比如说,一个avroGenericRecord
或者别的什么SpecificCustomAvroObject
你已经在别处定义了。如果其他人知道一个更简洁/更好的/。。。回答塔尔问题的方式,我洗耳恭听
另请参见:
如何在数据集中存储自定义对象?
尝试将Dataframe行Map到更新的行时发生编码器错误