每当我重新启动系统时,它会显示“utf-8”编解码器无法将kafka中的字节0x98解码为spark

hgncfbus  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(317)

这是我的以下代码,用于从kafka获取数据以触发流式处理首先它正在运行,但当我重新启动系统时,它再次显示以下错误:
unicodedecodeerror:“utf-8”编解码器无法解码位置5中的字节0x98:无效的起始字节

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.streaming.kafka import KafkaUtils
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from confluent_kafka.avro.serializer.message_serializer import MessageSerializer

schema_registry_client = CachedSchemaRegistryClient(url='http://0.0.0.0:8081')
serializer = MessageSerializer(schema_registry_client)
sc = SparkContext()
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)

# def decoder(s):

# decoded_message = serializer.decode_message(s)

# return decoded_message

kvs = KafkaUtils.createDirectStream(ssc, ["demo.Appointment_Attendance.Patient"], {
                                "metadata.broker.list": "localhost:9092"}, 
                                 valueDecoder=serializer.decode_message)
keyDecoder=lambda x: x, valueDecoder=lambda x: x)
lines = kvs.map(lambda x: x[1])
lines.pprint()

ssc.start()
ssc.awaitTermination()
wsewodh2

wsewodh21#

0x98是西里尔字母и 在utf-8中,实际上这个字母由字节0xd0表示,0x98在字符集windows-1251中被描述为未定义。你有权使用'и'? 你能把它改成小写字母吗и'? 另一种选择是替换и' 对一些唯一的标记,如==cyr\u i==进行反向替换。

i5desfxk

i5desfxk2#

正在移动要回答的评论。。。
首先尝试使用原始字节
topic=ssc[“demo.appointment\u attention.patient”]kvs=kafkautils.createdirectstream(主题,{“metadata.broker.list”:localhost:9092"})
如果可行,那么在稍后的阶段尝试通过map/mapvalues反序列化

kvs.mapValues(lambda v: serializer.decode_message(v))

一旦成功了,试着重新使用valuedecoder

相关问题