我正在使用confluent kafka python客户端创建protobuf使用者,在配置中,它指定 value.deserializer
消息类型:
protobuf_deserializer = ProtobufDeserializer(newTopic_pb2.Foo)
string_deserializer = StringDeserializer('utf_8')
consumer_conf = {'bootstrap.servers': config['bootstrap_servers'],
'key.deserializer': string_deserializer,
'value.deserializer': protobuf_deserializer,
'group.id': config['group'],
'auto.offset.reset': "earliest"}
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe('topic1')
newtopic.proto看起来像:
syntax = "proto3";
message Foo {
string f1 = 1;
}
现在如果我想在proto中添加另一种消息类型,
syntax = "proto3";
message Foo {
string f1 = 1;
}
message Bar {
int32 id = 1;
}
我必须一起创造一个新的消费者吗?我是否可以使用相同的使用者示例来处理消息?
暂无答案!
目前还没有任何答案,快来回答吧!