我使用avro模式,使用合流kafka客户机,从c#应用程序向kafka集群动态生成消息。数据类型在编译时是未知的,所以我使用 Avro.Generic
命名空间,如下所述:https://www.confluent.io/blog/decoupling-systems-with-apache-kafka-schema-registry-and-avro/.
但是,我有一个问题-如果架构有一个可以包含null值的字段,仍然需要将该字段添加到 GenericRecord
通过使用add方法,将null作为值。我的应用程序不知道可以为null的字段,我也不认为应该为null,因为这会违背模式中可为null的字段的目的。
avro架构:
{
"namespace": "Test",
"type": "record",
"doc": "Test bool type",
"name": "BoolType",
"version": "1",
"fields": [
{
"name": "Data",
"type": [ "null", "boolean" ],
"default": null
},
{
"name": "Source",
"type": "string"
}
]
}
c代码:
var valueRecord = new GenericRecord( valueAvroSchema );
valueRecord.Add( "Data", null );
valueRecord.Add( "Source", "Test app .NET" );
var messageToSend = new Message<GenericRecord, GenericRecord>
{
Key = keyRecord,
Value = valueRecord
};
await _producer.ProduceAsync( _topicName, messageToSend );
如果线路:
valueRecord.Add( "Data", null );
不存在,则 ProduceAsync
方法抛出 Confluent.Kafka.ProduceException
,如下面的屏幕截图所示。
是否有任何方法可以自动填充genericrecord中可以为null的字段?如果我必须用字段的默认值填充字段,同样的方法也适用。
有没有什么方法可以用标准的方式来实现这一点,或者我是否需要编写自己的代码来读取模式,如果应用程序还没有设置任何可为null的字段,则在发布之前在末尾添加这些字段?
谢谢您!
1条答案
按热度按时间o2rvlv0m1#
avro违约仅与消费者相关。制作人必须始终设置每个字段