如何使用ci服务器上的kafka作为测试服务?

3df52oht  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(493)

有人在ci环境中成功地将kafka设置为服务吗?我正在尝试使用 lensesio/fast-data-dev docker图像。也尝试过合流图片,见下。。。
我有以下运行测试的ci作业。设置 ADV_HOST 在同一docker网络上通过gitlab ci服务访问kafka容器。尝试联系Kafka容器时作业挂起。
使用lensesio/fast data dev的gitlab ci作业

tests:
  stage: test
  variables:
    ADV_HOST: kafka
    DISABLE: azure-documentdb,blockchain,bloomberg,cassandra,coap,druid,elastic,elastic5,ftp,hazelcast,hbase,influxdb,jms,kudu,mongodb,mqtt,redis,rethink,voltdb,yahoo,hdfs,jdbc,elasticsearch,s3,twitter
    CONNECT_HEAP: 512m
    MINIO_BUCKET: images
    SAMPLEDATA: 0
    REST_PORT: 8082
    FORWARDLOGS: 0
    RUNTESTS: 0
    DISABLE_JMX: 1
    WEB_PORT: 0
    DISABLE: hive-1.1

  ##
  # Services
  # - Kafka
  # - Mosquitto (MQTT)
  # - Minio (S3)
  ##
  services:
  - name: lensesio/fast-data-dev:2.5.1-L0
    alias: kafka
  - name: dcs3spp/minio:version-1.0.2
    alias: minio
  - name: eclipse-mosquitto:1.6.9
    alias: mqtt

  script:
    - >
      dotnet test --no-restore
      --logger:trx
      --settings:Tests/coverlet.runsettings
      --collect:"XPlat Code Coverage"
      WebApp.sln

ci作业挂起试图联系Kafka容器

info: WebApp.Kafka.Admin.KafkaAdminService[0]
      Admin service trying to create Kafka Topic...
info: WebApp.Kafka.Admin.KafkaAdminService[0]
      Topic::eventbus, ReplicationCount::1, PartitionCount::3
info: WebApp.Kafka.Admin.KafkaAdminService[0]
      Bootstrap Servers::kafka:9092

融合的Kafka Docker 形象/

tests:
  stage: test
  variables:
    ZOOKEEPER_CLIENT_PORT: 2181
    ZOOKEEPER_TICK_TIME: 2000
    KAFKA_BROKER_ID: 1
    KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
    KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  ##
  # Services
  # - Kafka
  # - Mosquitto (MQTT)
  # - Minio (S3)
  ##
  services:
  - name: confluentinc/cp-zookeeper:5.1.0
    alias: zookeeper
  - name: confluentinc/cp-kafka:5.1.0
    alias: kafka
  - name: dcs3spp/minio:version-1.0.2
    alias: minio
  - name: eclipse-mosquitto:1.6.9
    alias: mqtt

  script:
    - >
      dotnet test --no-restore
      --logger:trx
      --settings:Tests/coverlet.runsettings
      --collect:"XPlat Code Coverage"
      WebApp.sln

合流图像错误日志

2020-11-15T12:15:44.558992574Z [2020-11-15 12:15:44,536] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer)
2020-11-15T12:15:44.558996094Z [2020-11-15 12:15:44,536] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer)
2020-11-15T12:15:44.558999139Z [2020-11-15 12:15:44,542] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer)
2020-11-15T12:15:44.559002735Z [2020-11-15 12:15:44,542] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer)
2020-11-15T12:15:44.559006873Z [2020-11-15 12:15:44,542] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer)
2020-11-15T12:15:44.559009945Z [2020-11-15 12:15:44,542] INFO Server environment:os.version=4.19.78-coreos (org.apache.zookeeper.server.ZooKeeperServer)
2020-11-15T12:15:44.559012987Z [2020-11-15 12:15:44,542] INFO Server environment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer)
2020-11-15T12:15:44.559015990Z [2020-11-15 12:15:44,542] INFO Server environment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer)
2020-11-15T12:15:44.559019219Z [2020-11-15 12:15:44,544] INFO Server environment:user.dir=/ (org.apache.zookeeper.server.ZooKeeperServer)
2020-11-15T12:15:44.646950748Z [2020-11-15 12:15:44,642] INFO tickTime set to 2000 (org.apache.zookeeper.server.ZooKeeperServer)
2020-11-15T12:15:44.646965707Z [2020-11-15 12:15:44,642] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
2020-11-15T12:15:44.646969289Z [2020-11-15 12:15:44,642] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer)
2020-11-15T12:15:44.748601072Z [2020-11-15 12:15:44,725] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)
2020-11-15T12:15:44.780680437Z [2020-11-15 12:15:44,754] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

*********

Pulling docker image mcr.microsoft.com/dotnet/core/sdk:3.1-alpine ...
Using docker image sha256:b0c526e8732fdcf06a1cc277f04523b4d3f10a6554d2b9df855e683524ee7ddf for mcr.microsoft.com/dotnet/core/sdk:3.1-alpine with digest mcr.microsoft.com/dotnet/core/sdk@sha256:3982ac41d8777b78ad7a2efe4c9674338975ebf9a25eeceb943348e45edf91b1 ...
Preparing environment
00:01
ERROR: Job failed (system failure): prepare environment: Error response from daemon: Cannot link to a non running container: /runner-z3wu8uu--project-20752619-concurrent-0-5f56b4323d8253b2-confluentinc__cp-kafka-1 AS /runner-z3wu8uu--project-20752619-concurrent-0-5f56b4323d8253b2-predefined-0/kafka (docker.go:817:0s).

是由kafka容器启动尝试连接到我在“kafka\u disposed\u listeners”变量中设置的kafka容器引起的错误。例如,它无法连接到kafka:29092 because 容器尚未启动?

vngu2lb8

vngu2lb81#

在阅读了这个aspnetcore问题之后,发现问题出在我的 IHostedService 向Kafka提出请求的实现。
这个 StartAsync 方法正在执行任务,一直运行到请求完成。按照设计,这种方法意味着一发不可收拾,即启动任务,然后继续。更新了我的 KafkaAdmin 服务成为 BackgroundService ,覆盖 ExecuteAsync 方法,如下所示。随后,测试不再阻塞。

using System;
using System.Threading;
using System.Threading.Tasks;

using Confluent.Kafka;
using Confluent.Kafka.Admin;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

using KafkaAdmin.Kafka.Config;

namespace KafkaAdmin.Kafka
{
    public delegate IAdminClient KafkaAdminFactory(KafkaConfig config);

    /// <summary>Background Service to make a request from Kafka to create a topic</summary>
    public class KafkaAdminService : BackgroundService, IDisposable
    {
        private KafkaAdminFactory _Factory { get; set; }
        private ILogger<KafkaAdminService> _Logger { get; set; }
        private KafkaConfig _Config { get; set; }

        /// <summary>
        /// Retrieve KafkaConfig from appsettings
        /// </summary>
        /// <param name="config">Config POCO from appsettings file</param>
        /// <param name="clientFactory"><see cref="KafkaAdminFactory"/></param>
        /// <param name="logger">Logger instance</param>
        public KafkaAdminService(
            IOptions<KafkaConfig> config,
            KafkaAdminFactory clientFactory,
            ILogger<KafkaAdminService> logger)
        {
            if (clientFactory == null)
                throw new ArgumentNullException(nameof(clientFactory));

            if (config == null)
                throw new ArgumentNullException(nameof(config));

            _Config = config.Value ?? throw new ArgumentNullException(nameof(config));
            _Factory = clientFactory ?? throw new ArgumentNullException(nameof(clientFactory));
            _Logger = logger ?? throw new ArgumentNullException(nameof(logger));
        }

        /// <summary>
        /// Create a Kafka topic if it does not already exist
        /// </summary>
        /// <param name="token">Cancellation token required by IHostedService</param>
        /// <exception name="CreateTopicsException">
        /// Thrown for exceptions encountered except duplicate topic
        /// </exception>
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            using (var client = _Factory(_Config))
            {
                try
                {
                    _Logger.LogInformation("Admin service trying to create Kafka Topic...");
                    _Logger.LogInformation($"Topic::{_Config.Topic.Name}, ReplicationCount::{_Config.Topic.ReplicationCount}, PartitionCount::{_Config.Topic.PartitionCount}");
                    _Logger.LogInformation($"Bootstrap Servers::{_Config.Consumer.BootstrapServers}");

                    await client.CreateTopicsAsync(new TopicSpecification[] {
                        new TopicSpecification {
                            Name = _Config.Topic.Name,
                            NumPartitions = _Config.Topic.PartitionCount,
                            ReplicationFactor = _Config.Topic.ReplicationCount
                        }
                    }, null);

                    _Logger.LogInformation($"Admin service successfully created topic {_Config.Topic.Name}");
                }
                catch (CreateTopicsException e)
                {
                    if (e.Results[0].Error.Code != ErrorCode.TopicAlreadyExists)
                    {
                        _Logger.LogInformation($"An error occured creating topic {_Config.Topic.Name}: {e.Results[0].Error.Reason}");
                        throw e;
                    }
                    else
                    {
                        _Logger.LogInformation($"Topic {_Config.Topic.Name} already exists");
                    }
                }
            }

            _Logger.LogInformation("Kafka Consumer thread started");

            await Task.CompletedTask;
        }

        /// <summary>
        /// Call base class dispose
        /// </summary>
        public override void Dispose()
        {
            base.Dispose();
        }
    }
}

还不明白为什么livewebapp会成功启动。为什么这只是testserver的一个问题?
我使用了confluent/cpKafkadocker图像。exmaple docker compose文件是:

---
version: "3.8"

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:6.0.0
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    networks:
      - camnet
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
      ZOOKEEPER_LOG4J_ROOT_LOGLEVEL: WARN

  kafka:
    image: confluentinc/cp-kafka:6.0.0
    hostname: kafka
    container_name: kafka
    depends_on:
      - zookeeper
    networks:
      - camnet
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_NUM_PARTITIONS: 3
      KAFKA_HEAP_OPTS: -Xmx512M -Xms512M
      KAFKA_LOG4J_ROOT_LOGLEVEL: WARN
      KAFKA_LOG4J_LOGGERS: "org.apache.zookeeper=WARN,org.apache.kafka=WARN,kafka=WARN,kafka.cluster=WARN,kafka.controller=WARN,kafka.coordinator=WARN,kafka.log=WARN,kafka.server=WARN,kafka.zookeeper=WARN,state.change.logger=WARN"

  netclient-test:
    build:
      context: ../
      dockerfile: docker/Dockerfile
    container_name: netclient-test
    image: dcs3spp/netclient
    networks:
      - camnet
    depends_on:
      - kafka
      - netclient-run
    entrypoint: []
    command:
      - bash
      - -c
      - |-
        echo 'Giving Kafka a bit of time to start up…'
        while ! nc -z kafka 9092;
        do
          sleep 1;
        done;

        echo 'Giving netclient-run a bit of time to start up…'
        while ! nc -z netclient-run 80;
        do
          sleep 1;
        done;

        echo .NET Client test container ready. Running test that uses WebApplicationFactory TestServer to start WebApp with KafkaAdmin background service
        echo This runs successfully in a local development environment on MacOS and Ubuntu Linux 16.04.
        echo This fails when running on a GitLab CI Server. It can be seen that the test server bootstraps the WebApp.....
        echo The KafkaAdmin background service blocks when requesting topic creation from the kafka service
        dotnet test --runtime linux-musl-x64 -c Release --no-restore  --nologo tests/KafkaAdmin.Kafka.IntegrationTests/

  netclient-run:
    build:
      context: ../
      dockerfile: docker/Dockerfile
    container_name: netclient-run
    image: dcs3spp/netclient
    networks:
      - camnet
    depends_on:
      - kafka
    entrypoint: []
    command:
      - bash
      - -c
      - |-
        echo 'Giving Kafka a bit of time to start up…'
        while ! nc -z kafka 9092;
        do
          sleep 1;
        done;
        echo .NET Run Web App Ready. Starting WebApp that contains KafkaAdmin background service.
        dotnet run --runtime linux-musl-x64 -c Release --no-restore --project src/KafkaAdmin.WebApp/

networks:
  camnet:

我还创建了一个存储库来演示问题和修复。主分支包含显示问题的源代码。feat/fix分支包含包含修复的源代码。
希望这能帮助其他遇到类似问题的人!

相关问题