我试图理解合流kafka上的avro序列化以及模式注册表的用法。直到最后一切都很顺利,但阿夫罗的最终期望让我很困惑。根据我的阅读和理解,avro序列化为我们提供了灵活性,当模式发生变化时,我们可以简单地管理它,而不影响较老的生产者/消费者。
在此之后,我开发了一个python生成器,它将检查schema registry中是否存在模式,如果没有,则创建它并开始生成如下所示的json消息。当我需要更改模式时,我只需在producer中更新它,这将生成具有新模式的消息。
我的旧模式:
data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'
生产商-1的样本数据:
{u'mobile': 9819841242, u'lname': u'Rogers', u'passport_expiry_date': u'2026-05-21', u'passport_make_date': u'2016-05-21', u'fname': u'tom', u'ipaddress': u'208.103.236.60', u'email': u'tom_Rogers@TEST.co.nz', u'principal': u'tom@EXAMPLE.COM'}
我的新架构:
data = '{"schema":"{\\"type\\":\\"record\\",\\"name\\":\\"value\\",\\"namespace\\":\\"my.test\\",\\"fields\\":[{\\"name\\":\\"fname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"lname\\",\\"type\\":\\"string\\"},{\\"name\\":\\"email\\",\\"type\\":\\"string\\"},{\\"name\\":\\"principal\\",\\"type\\":\\"string\\"},{\\"name\\":\\"ipaddress\\",\\"type\\":\\"string\\"},{\\"name\\":\\"mobile\\",\\"type\\":\\"long\\"},{\\"name\\":\\"new_passport_make_date\\",\\"type\\":[\\"string\\",\\"null\\"],\\"logicalType\\":\\"timestamp\\",\\"default\\":\\"None\\"},{\\"name\\":\\"new_passport_expiry_date\\",\\"type\\":\\"string\\",\\"logicalType\\":\\"date\\"}]}"}'
生产商-2的样本数据:
{u'mobile': 9800647004, u'new_passport_make_date': u'2011-05-22', u'lname': u'Reed', u'fname': u'Paul', u'new_passport_expiry_date': u'2021-05-22', u'ipaddress': u'134.124.7.28', u'email': u'Paul_Reed@nbc.com', u'principal': u'Paul@EXAMPLE.COM'}
案例1:当我有两个具有以上两个模式的生产者同时运行时,我可以使用下面的代码成功地使用消息。在这之前一切都很好。
while True:
try:
msg = c.poll(10)
except SerializerError as e:
xxxxx
break
print msg.value()
案例2:当我深入了解json字段时,事情会变得混乱和混乱。
首先,假设我有一个生产者使用上面的“我的旧模式”运行,一个消费者成功地消费了这些消息。
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]
当我用上面提到的'mynewschema'运行第二个producer时,我的旧用户中断了,因为没有字段passport\u expiry\u date和passport\u make\u date,这是真的。
问题:
有时我想,这是意料之中的,因为是我(开发人员)在使用消息中没有的字段名。但阿夫罗能帮上什么忙呢?缺少的字段不应该由avro处理吗?我在java中看到了一些例子,在这些例子中,这种情况得到了正确的处理,但在python中没有找到任何例子。例如,下面github提供了处理此场景的完美示例。当字段不存在时,使用者只打印“无”。
https://github.com/learningjournal/apachekafkatutorials
案例3:当我运行像old producer和old consumer这样的组合,然后在另一个终端中运行新producer和new consumer这样的组合时,producer/consumer混合在一起,事情就破裂了,没有json字段。
老消费者==>
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["passport_make_date"], msg.value()["passport_expiry_date"]
新消费者==>
print msg.value()["fname"] , msg.value()["lname"] , msg.value()["new_passport_make_date"], msg.value()["new_passport_expiry_date"]
问题:
我再次认为,这是意料之中的。但是,avro让我认为正确的消费者应该用正确的模式获得正确的信息。如果我使用msg.value()并且总是使用编程解析使用者端的字段,而不使用任何avro角色,那么使用avro的好处在哪里呢?用消息发送模式/存储在sr中有什么好处?
最后,有没有办法检查附加到消息的模式?我知道,在avro中,schema id附加在消息中,在读写消息时,schema registry会进一步使用该消息。但我从来没有看到它的信息。
非常感谢。
1条答案
按热度按时间pepwfjgg1#
不清楚您在注册表上使用的是什么兼容性设置,但我将反向假设,这意味着您需要添加一个带有默认值的字段。
听起来你有条Python
KeyError
因为那些钥匙不存在。而不是
msg.value()["non-existing-key"]
,你可以试试选择一:把它当作
dict()
```msg.value().get("non-existing-key", "Default value")
some_var = None # What you want to parse
val = msg.value()
if "non-existing-key" not in val:
some_var = "Default Value"