我正在我的本地机器上使用C#学习Kafka。我有一个生产者/消费者/流,可以处理字符串,现在我正在尝试使用模式注册表来允许复杂类型。
我正在尝试使用以下控制台应用程序注册一个类:
static async Task Main(string[] args)
{
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://localhost:29092"
};
var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);
var schema = @"{
""type"": ""TotallyCoolCustomClass"",
""properties"": {
""FavouriteQuote"": {""type"": ""string""},
""FavouriteNumber"": {""type"": ""integer""}
}
}";
var subject = "SimpleTest";
var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, schema);
}
每当我运行schema registry应用程序时,我都会收到一个Http异常:
System.Net.Http.HttpRequestException: '[http://localhost:29092/] HttpRequestException: An error occurred while sending the request.'
docker在我的Kafka示例下报告了这一点:
WARN [SocketServer listenerType=ZK_BROKER, nodeId=1] Unexpected error from /172.22.0.1 (channelId=172.22.0.3:29092-172.22.0.1:34802-28); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)
我的消息显然很小,没有打破默认的100 Mb-所以我不知道为什么它认为它是。谁能告诉我为什么我不能注册一个模式?
我已经设置了Kafka,Zookeeper和一个Schema Registry:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:latest
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- 22181:2181
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
schema-registry:
image: confluentinc/cp-schema-registry:latest
depends_on:
- kafka
ports:
- 28081:28081
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
# SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081
然后用这个生成它(尽管我认为这对于这个问题来说很多余,因为模式没有注册,所以发布永远不会工作):
public class KafkaPProducerHostedService : IHostedService
{
private readonly ILogger<KafkaPProducerHostedService> _logger;
private readonly IProducer<string, TotallyCoolCustomClass> _producer;
public KafkaPProducerHostedService(ILogger<KafkaPProducerHostedService> logger)
{
_logger = logger;
var config = new ProducerConfig();
config.BootstrapServers = "localhost:29092";
var schemaRegistryConfig = new SchemaRegistryConfig
{
Url = "http://localhost:28081/"
};
//video on serializers
_producer = new ProducerBuilder<string, TotallyCoolCustomClass>(config)
.SetValueSerializer(new JsonSerializer<TotallyCoolCustomClass>(new CachedSchemaRegistryClient(schemaRegistryConfig)))
.Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
int i = 0;
var rand = new Random();
while (true) {
Thread.Sleep(2000);
var customClass = new TotallyCoolCustomClass(rand);
var key = $"UpdatedKey-{i}";
await _producer.ProduceAsync("SimpleTest", new Message<string, TotallyCoolCustomClass>()
{
Key = key,
Value = customClass
},
cancellationToken);
Console.WriteLine($"Published: Key: {key} with favourite quote {customClass.FavouriteQuote} and favourite number {customClass.FavouriteNumber}");
i++;
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_producer?.Dispose();
return Task.CompletedTask;
}
}
1条答案
按热度按时间92dk7w1h1#
您已经配置了模式注册表客户端,以便在Kafka代理上发送数据,而Kafka代理不是HTTP服务器
使用端口28081
但是您注解了
SCHEMA_REGISTRY_LISTENERS
,因此端口Map不正确,应该是默认的8081架构未注册,因此发布将永远不会工作
生产者自动注册模式。没有必要手动执行这些操作,除非您确实需要在某个地方使用模式ID