合流kafka:python用户中avro序列化与模式处理的混淆

2w3kk1z5  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(363)

我试图理解合流kafka上的avro序列化以及模式注册表的用法。直到最后一切都很顺利,但阿夫罗的最终期望让我很困惑。根据我的阅读和理解,avro序列化为我们提供了灵活性,当模式发生变化时,我们可以简单地管理它,而不影响较老的生产者/消费者。
在此之后,我开发了一个python生成器,它将检查schema registry中是否存在模式,如果没有,则创建它并开始生成如下所示的json消息。当我需要更改模式时,我只需在producer中更新它,这将生成具有新模式的消息。
我的旧模式:

data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'

生产商-1的样本数据:

{u'mobile': 9819841242, u'lname': u'Rogers', u'passport_expiry_date': u'2026-05-21', u'passport_make_date': u'2016-05-21', u'fname': u'tom', u'ipaddress': u'208.103.236.60', u'email': u'tom_Rogers@TEST.co.nz', u'principal': u'tom@EXAMPLE.COM'}

我的新架构:

data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"new_passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"new_passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'

生产商-2的样本数据:

{u'mobile': 9800647004, u'new_passport_make_date': u'2011-05-22', u'lname': u'Reed', u'fname': u'Paul', u'new_passport_expiry_date': u'2021-05-22', u'ipaddress': u'134.124.7.28', u'email': u'Paul_Reed@nbc.com', u'principal': u'Paul@EXAMPLE.COM'}

案例1:当我有两个具有以上两个模式的生产者同时运行时,我可以使用下面的代码成功地使用消息。在这之前一切都很好。

while True:
    try:
        msg = c.poll(10)

    except SerializerError as e:
        xxxxx 
        break
    print msg.value()

案例2:当我深入了解json字段时,事情会变得混乱和混乱。
首先,假设我有一个生产者使用上面的“我的旧模式”运行,一个消费者成功地消费了这些消息。

print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]

当我用上面提到的'mynewschema'运行第二个producer时,我的旧用户中断了,因为没有字段passport\u expiry\u date和passport\u make\u date,这是真的。
问题:
有时我想,这是意料之中的,因为是我(开发人员)在使用消息中没有的字段名。但阿夫罗能帮上什么忙呢?缺少的字段不应该由avro处理吗?我在java中看到了一些例子,在这些例子中,这种情况得到了正确的处理,但在python中没有找到任何例子。例如,下面github提供了处理此场景的完美示例。当字段不存在时,使用者只打印“无”。
https://github.com/learningjournal/apachekafkatutorials
案例3:当我运行像old producer和old consumer这样的组合,然后在另一个终端中运行新producer和new consumer这样的组合时,producer/consumer混合在一起,事情就破裂了,没有json字段。
老消费者==>

print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]

新消费者==>

print msg.value()["fname"] , msg.value()["lname"] , msg.value()["new_passport_make_date"], msg.value()["new_passport_expiry_date"]

问题:
我再次认为,这是意料之中的。但是,avro让我认为正确的消费者应该用正确的模式获得正确的信息。如果我使用msg.value()并且总是使用编程解析使用者端的字段,而不使用任何avro角色,那么使用avro的好处在哪里呢?用消息发送模式/存储在sr中有什么好处?
最后,有没有办法检查附加到消息的模式?我知道,在avro中,schema id附加在消息中,在读写消息时,schema registry会进一步使用该消息。但我从来没有看到它的信息。
非常感谢。

pepwfjgg

pepwfjgg1#

不清楚您在注册表上使用的是什么兼容性设置,但我将反向假设,这意味着您需要添加一个带有默认值的字段。
听起来你有条Python KeyError 因为那些钥匙不存在。
而不是 msg.value()["non-existing-key"] ,你可以试试
选择一:把它当作 dict() ```
msg.value().get("non-existing-key", "Default value")

选项2:逐个检查可能不存在的所有密钥

some_var = None # What you want to parse
val = msg.value()
if "non-existing-key" not in val:
some_var = "Default Value"

否则,必须将较新的模式“投影”到较旧的数据上,这就是java代码使用 `SpecificRecord` 子类。这样,较旧的数据将使用较新的模式进行解析,该模式的默认值为较新的字段。
如果你用过 `GenericRecord` 相反,在java中,您会遇到类似的问题。我不确定在python中是否有一个等价于java的 `SpecificRecord` . 
顺便说一下,我不认为绳子 `"None"` 可以申请 `logicalType=timestamp` 

相关问题