简单的C# Kafka Producer与Schema Regitry实现抛出无效的接收大小

mfpqipee  于 2023-06-28  发布在  Apache
关注(0)|答案(1)|浏览(115)

我正在我的本地机器上使用C#学习Kafka。我有一个生产者/消费者/流,可以处理字符串,现在我正在尝试使用模式注册表来允许复杂类型。
我正在尝试使用以下控制台应用程序注册一个类:

static async Task Main(string[] args)
    {
        var schemaRegistryConfig = new SchemaRegistryConfig
        {
            Url = "http://localhost:29092"
        };

        var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig);

        var schema = @"{
            ""type"": ""TotallyCoolCustomClass"",
            ""properties"": {
                ""FavouriteQuote"": {""type"": ""string""},
                ""FavouriteNumber"": {""type"": ""integer""}
            }
        }";

        var subject = "SimpleTest";
        var schemaId = await schemaRegistry.RegisterSchemaAsync(subject, schema);
    }

每当我运行schema registry应用程序时,我都会收到一个Http异常:

System.Net.Http.HttpRequestException: '[http://localhost:29092/] HttpRequestException: An error occurred while sending the request.'

docker在我的Kafka示例下报告了这一点:

WARN [SocketServer listenerType=ZK_BROKER, nodeId=1] Unexpected error from /172.22.0.1 (channelId=172.22.0.3:29092-172.22.0.1:34802-28); closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1347375956 larger than 104857600)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:94)

我的消息显然很小,没有打破默认的100 Mb-所以我不知道为什么它认为它是。谁能告诉我为什么我不能注册一个模式?

我已经设置了Kafka,Zookeeper和一个Schema Registry:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
  
  kafka:
    image: confluentinc/cp-kafka:latest
    depends_on:
      - zookeeper
    ports:
      - 29092:29092
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  schema-registry:
    image: confluentinc/cp-schema-registry:latest
    depends_on: 
      - kafka
    ports:
      - 28081:28081
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'PLAINTEXT://kafka:9092'
      # SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:28081

然后用这个生成它(尽管我认为这对于这个问题来说很多余,因为模式没有注册,所以发布永远不会工作):

public class KafkaPProducerHostedService : IHostedService
        {
            private readonly ILogger<KafkaPProducerHostedService> _logger;
            private readonly IProducer<string, TotallyCoolCustomClass> _producer;
            public KafkaPProducerHostedService(ILogger<KafkaPProducerHostedService> logger)
            {
                _logger = logger;
                var config = new ProducerConfig();
                config.BootstrapServers = "localhost:29092";

                var schemaRegistryConfig = new SchemaRegistryConfig
                {
                    Url = "http://localhost:28081/"
                };

                //video on serializers
                _producer = new ProducerBuilder<string, TotallyCoolCustomClass>(config)
                    .SetValueSerializer(new JsonSerializer<TotallyCoolCustomClass>(new CachedSchemaRegistryClient(schemaRegistryConfig)))
                    .Build();
            }

            public async Task StartAsync(CancellationToken cancellationToken)
            {
                int i = 0;

                var rand = new Random();

                while (true) {
                    Thread.Sleep(2000);
                    var customClass = new TotallyCoolCustomClass(rand);
                    var key = $"UpdatedKey-{i}";
                    await _producer.ProduceAsync("SimpleTest", new Message<string, TotallyCoolCustomClass>()
                    {
                        Key = key,
                        Value = customClass
                    },                    
                    cancellationToken);

                    Console.WriteLine($"Published: Key: {key} with favourite quote {customClass.FavouriteQuote} and favourite number {customClass.FavouriteNumber}");
                    i++;
                }
            }

            public Task StopAsync(CancellationToken cancellationToken)
            {
                _producer?.Dispose();
                return Task.CompletedTask;
            }
        }
92dk7w1h

92dk7w1h1#

您已经配置了模式注册表客户端,以便在Kafka代理上发送数据,而Kafka代理不是HTTP服务器
使用端口28081
但是您注解了SCHEMA_REGISTRY_LISTENERS,因此端口Map不正确,应该是默认的8081
架构未注册,因此发布将永远不会工作
生产者自动注册模式。没有必要手动执行这些操作,除非您确实需要在某个地方使用模式ID

相关问题