我在我的python项目中使用confluent_Kafka==2.2.0。我想为一个主题设置多个模式。我正在阅读AvroSerializer的文档,它说它有配置选项来设置RecordNameStrategy(请参阅此处的文档https://docs.confluent.io/platform/current/clients/confluent-Kafka-python/html/index.html#avroserializer)。此外,在lib的发布版本中,它说它开始支持非默认模式策略(请参阅此处的https://github.com/confluentinc/confluent-kafka-python/releases/tag/v1.4.0)。然而,由于两个原因,我一直坚持这样做:
1.我有几个带有模式的.avsc文件。我如何向AvroSerializer提供几个模式?它只接收一个模式参数。
- confluent_Kafka.schema_registry.record_subject_name_strategy收到两个参数:ctx和record_name,我应该在那里传递什么?我的代码看起来像下面:
import asyncio
from confluent_kafka.admin import AdminClient, NewTopic
from confluent_kafka import SerializingProducer, DeserializingConsumer
from confluent_kafka.serialization import StringSerializer, StringDeserializer
from confluent_kafka.schema_registry import record_subject_name_strategy, SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer, AvroDeserializer
from config import Config
# Utility functions
def create_admin(config: dict):
return AdminClient(config)
async def create_new_topic(admin: AdminClient, topic_name: str):
if topic_name in admin.list_topics().topics:
print("Already exist!")
return
futures = admin.create_topics([
NewTopic(topic_name, num_partitions=1, replication_factor=1),
])
await asyncio.wrap_future(futures[topic_name])
print("Topic created!")
# Main function
async def main():
admin = create_admin(Config.KAFKA)
await create_new_topic(admin, "test_multischema")
schema_registry_client = SchemaRegistryClient(Config.SCHEMA_REGISTRY)
# Reading schema strings
with open("event1.avsc", "r") as f:
schema_1_str = f.read()
with open("event2.avsc", "r") as f:
schema_2_str = f.read()
# Here is the problem
producer_config = Config.KAFKA | {
"key.serializer": StringSerializer(),
"value.serializer": AvroSerializer(
schema_registry_client,
# This serializer should be able to deserialize messages of both schema_1 and schema_2.
# How should I write config to pass both to one producer?
schema_1_str,
conf={
# What args should I pass?
"subject.name.strategy": record_subject_name_strategy()
}
)
}
producer = SerializingProducer(producer_config)
# After I will produce msgs and then consume them
字符串
也许有人可以提供如何配置SerializingProducer和SerializingConsumer的代码片段,以便他们能够使用这两个模式从主题中读取?
试图在文档中找到解决方案,但它要么声明它还不支持(旧的),要么只声明策略,并没有提供代码中实际应该如何显示的示例(如这里的How to change SubjectNameStrategy and use different schemas in confluent-kafka-python's AvroProducer?)。
1条答案
按热度按时间ni65a41a1#
如何向AvroSerializer提供多个架构?
你不能。你需要用不同的模式构造序列化器的不同示例,手动调用每个示例的serialize,然后直接生成字节数组给Kafka。
没有提供代码中实际应该是什么样子的示例(如这里...
谢谢你找到我的另一个答案。文档指向here。但是这个文件不包括策略参数,不。如果你想的话,可以打开一个github issue / PR。
一个可调用对象只是一个函数,比如lambda
字符串
或external function,如所有内置的。
第一个月
一个都没有。你不打电话给
从这两个模式的主题中读取?
做同样的事情,但是反过来。constructed bytes然后手动调用constructalize。你可以将Avro消耗到Python dict中。constructalizer不需要schema。