使用测试容器测试自定义Kafka Connect SMT

34gzjxbg  于 2023-10-15  发布在  Apache
关注(0)|答案(1)|浏览(122)

我正在尝试使用TestContainers来集成测试一个Kafka Connect自定义SMT(Simple Message Transformer),它需要使用一个外部库(例如:Apache Commons)。
测试应:
1.启动(a)Kafka,(B)ZooKeeper,(c)PostgreSQL数据库,(d)Kafka使用TestContainers library连接容器。

  1. Connect示例应针对PostgreSQL数据库配置JDCB Source Connector
    1.集成测试向测试数据库表中添加一行。
  2. JDBC源连接器应该检测DB插入并创建Kafka消息.
    1..并使用自定义SMT对其进行转换,然后将其发布到Kafka主题。
    1.该测试从topic读取转换后的消息,并Assert字段是正确的。
    我的问题是创建一个包含自定义SMT的文件.
    我发现一篇博客文章Testing Kafka Connectors,作者以图解的方式创建了一个JavaScript文件(包含一个自定义SMT),作为集成测试的一部分,如下所示:
private static void createConnectorJar() throws IOException {
        Manifest manifest = new Manifest();
        manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
        JarOutputStream target = new JarOutputStream(new FileOutputStream("target/kcetcd-connector/etcd-connector.jar"), manifest);
        add(new File("target/classes"), target);
        target.close();
    }

然后将其连接到Docker TestContainer镜像,作为集成测试的一部分,使用withFileSystemBind方法连接到classes/kcetcd-connector文件夹中的jar文件:

public static DebeziumContainer connectContainer = new DebeziumContainer("debezium/connect-base:1.9.5.Final")
            .withFileSystemBind("target/kcetcd-connector", "/kafka/connect/kcetcd-connector")
            .withNetwork(network)
            .withKafka(kafkaContainer)
            .dependsOn(kafkaContainer);

然而,这个解决方案只适用于简单的SMT,它不回复外部库(例如,使用Gradle引入)。我的测试Kafka Connect容器没有成功地拾取JAR文件中的类。
有人有使用测试容器测试更复杂的SMT的解决方案吗?我认为集成测试本质上需要创建一个绑定到docker卷的影子卷。

更新(2023年10月6日)

我已经上传了一个工作示例到以下GitHub repo:

代码结构如下:

src
  main
    java
      smt
        test
          RandomField.java          Custom SMT which adds a field containing random characters

  test
    java
      smt
        test
          RandomFieldTest.java      Unit test to test RandomField
          SmtIntegrationTest.java   Integration test using TestContainers

SmtIntegrationTest集成测试类试图完成我在上面提到的任务(即:通过实际的Docker容器(如Kafka、ZooKeeper、SchemaRegistry、Postgres和KafkaConnect)端到端测试所有内容。
如果您通过./gradlew test执行所有测试.

> Task :test
smt.test.SmtIntegrationTest

  Test db_insert_creates_kakfa_message_with_smt() FAILED
> :test > Executing test smt.test.SmtIntegrationTest
  java.lang.AssertionError: 1 expectation failed.
  Expected status code <201> but was <400>.
      at smt.test.SmtIntegrationTest.setupConnector(SmtIntegrationTest.java:352)

  Test db_insert_creates_kakfa_message() PASSED (1.3s)

smt.test.RandomFieldTest

  Test topLevelStructRequired() PASSED
  Test schemalessInsertRandomField() PASSED
  Test copySchemaAndInsertUuidField() PASSED

FAILURE: Executed 5 tests in 22.9s (1 failed)

.除了集成测试类SmtIntegrationTestdb_insert_creates_kakfa_message_with_smt测试方法之外,所有测试都通过了。
通过的集成测试方法(db_insert_creates_kakfa_message)没有使用自定义SMT,这证明了其他一切都可以工作,即将行插入到数据库表中会被正确拾取,并导致创建Kafka消息并将其发布到Kakfa主题。
这两个测试之间的唯一区别是,在设置JdbcSourceConnector连接器时,* 失败测试方法 * 设置了一些Transformer属性(例如,transforms=randomfield),而 *passing方法 * 则没有。这是通过setupConnector方法完成的,如下所示(注意if语句,它切换Transformer属性):

private void setupConnector(String databaseTable, String randomFieldName, int size, boolean useLetters, boolean useNumbers) throws IOException {
        Map<String, Object> configMap = new HashMap<>();
        configMap.put("connector.class", "io.confluent.connect.jdbc.JdbcSourceConnector");
        configMap.put("tasks.max", "1");
        configMap.put("connection.url",
            format("jdbc:postgresql://%s:5432/%s?loggerLevel=OFF", POSTGRES_NETWORK_ALIAS, DB_NAME));
        configMap.put("connection.user", DB_USERNAME);
        configMap.put("connection.password", DB_PASSWORD);
        configMap.put("table.whitelist", databaseTable);
        configMap.put("mode", "timestamp+incrementing");
        configMap.put("validate.non.null", "false");
        configMap.put("topic.prefix", TOPIC_PREFIX + ".");
        configMap.put("timestamp.column.name", "updated");
        configMap.put("incrementing.column.name", "id");

        // Configure SMT if randomFieldName is set
        if (randomFieldName != null) {
            configMap.put("transforms", "randomfield");
            configMap.put("transforms.randomfield.type", "smt.test.RandomField$Value");
            configMap.put("transforms.randomfield.random.field.name", randomFieldName);
            configMap.put("transforms.randomfield.random.field.size", String.valueOf(size));
            configMap.put("transforms.randomfield.random.use.letters", String.valueOf(useLetters));
            configMap.put("transforms.randomfield.random.use.numbers", String.valueOf(useNumbers));
        }

        String payload = MAPPER.writeValueAsString(ImmutableMap.of(
            "name", CONNECTOR_NAME, "config", configMap));
        given()
            .log().headers()
            .contentType(ContentType.JSON)
            .accept(ContentType.JSON)
            .body(payload)
            .when()
            .post(getKafkaConnectUrl() + "/connectors")
            .andReturn()
            .then()
            .log().all()
            .statusCode(HttpStatus.SC_CREATED);
    }

检查中断测试的控制台输出显示:-

{
    "error_code": 400,
    "message": "Connector configuration is invalid and contains the following 2 error(s):
        Invalid value smt.test.RandomField$Value for configuration transforms.randomfield.type: Class smt.test.RandomField$Value could not be found.
        Invalid value null for configuration transforms.randomfield.type: Not a Transformation
        You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}

Class smt.test.RandomField$Value could not be found表示我们的自定义Transformer转换器文件没有被拾取。
SmtIntegrationTest测试运行时,您可以通过运行docker ps来查看所有正在运行的Docker容器:

$ docker ps 

CONTAINER ID   IMAGE                                   COMMAND                  CREATED          STATUS                             PORTS                                         NAMES
670697d6f149   confluentinc/cp-kafka-connect:7.5.0     "bash -c 'confluent-…"   12 seconds ago   Up 11 seconds (health: starting)   9092/tcp, 0.0.0.0:58037->8083/tcp             connect-1fadc921-2a97-4b78-b325-de10c127c0ef
48982e55c008   confluentinc/cp-kafka:7.5.0             "/etc/confluent/dock…"   12 seconds ago   Up 11 seconds                      0.0.0.0:58027->9092/tcp                       blissful_shockley
62e194054ec4   confluentinc/cp-schema-registry:7.5.0   "/etc/confluent/dock…"   12 seconds ago   Up 12 seconds                      0.0.0.0:58031->8081/tcp                       blissful_dijkstra
bc54519a449d   postgres:11                             "docker-entrypoint.s…"   12 seconds ago   Up 12 seconds                      0.0.0.0:58032->5432/tcp                       nifty_mestorf
b668c6def0a2   confluentinc/cp-zookeeper:7.5.0         "/etc/confluent/dock…"   12 seconds ago   Up 11 seconds                      2888/tcp, 3888/tcp, 0.0.0.0:58035->2181/tcp   pensive_euler
8f2b26a2057a   testcontainers/ryuk:0.5.1               "/bin/ryuk"              13 seconds ago   Up 12 seconds                      0.0.0.0:58028->8080/tcp                       testcontainers-ryuk-53dd0444-3f1f-499b-a555-c6d9eac11077

集成测试创建一个包含RandomField Transformer类的JAR文件,并将其绑定到 kafka-connect 容器中的/usr/share/java/kafka-smt-plugins文件夹。
我们可以通过docker execute命令运行ls命令来仔细检查这个错误是否存在:

$ docker exec $(docker ps | grep kafka-connect | cut -d " " -f1) ls -al /usr/share/java/kafka-smt-plugins 

total 16
drwxr-xr-x 2 root    root    4096 Oct  6 12:20 .
drwxr-xr-x 1 appuser appuser 4096 Oct  6 12:20 ..
-rw-r--r-- 1 root    root    5873 Oct  6 12:20 kafka-smt.jar

接下来,如果我们检查 kafka-connect 容器日志,您将看到以下内容(为了可读性,删除了时间戳):

$ docker logs -f $(docker ps | grep kafka-connect | cut -d " " -f1)

...
INFO Loading plugin from: /usr/share/java/kafka-smt-plugins (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-smt-plugins/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Loading plugin from: /usr/share/java/confluent-control-center (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/confluent-control-center/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Added plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
...

前两行显示Kafak Connect正在扫描我们的自定义文件夹/usr/share/java/kafka-smt-plugins/。接下来的4个显示了扫描内置的Kafka Connect文件夹/usr/share/java/confluent-control-center之一。
检查这些日志,我们的自定义文件夹似乎没有注册任何插件(由于缺乏Added plugin日志),而confluent-control-center注册了几个插件,例如。org.apache.kafka.connect.tools.VerifiableSinkConnector
我的直觉是我们的自定义SMT jar(通过createConnectorPlugin方法创建)不包含依赖库,如Apache Commons(包含random()方法),但日志不会显示此场景的错误。

t9aqgxwy

t9aqgxwy1#

在这里张贴一个答案,希望这对其他人有用,并感谢 @OneCricketeer 您的评论/帮助。
我更新了测试GitHub repo:

.如果你做一个gradle构建,单元测试和集成测试都可以工作:

$ ./gradlew test

> Task :test

smt.test.SmtIntegrationTest

  Test db_insert_creates_kakfa_message_with_smt() PASSED (2s)
  Test db_insert_creates_kakfa_message() PASSED (6s)

smt.test.RandomFieldTest

  Test topLevelStructRequired() PASSED
  Test schemalessInsertRandomField() PASSED
  Test copySchemaAndInsertUuidField() PASSED

SUCCESS: Executed 5 tests in 1m 20s

@OneCricketeer提到:

  • 因此,您仍然可以使用JarOutputStream复制Gradle shadow jar插件的输出(即一个uberjar),它包含了你的外部依赖项,并将它们复制到一个文件挂载中。*

但是,我无法使用JarOutputStream创建Uber的JavaScript文件。我使用的解决方案是使用Java并将./gradlew shadowJar作为外部进程运行(使用Java Runtime.exec(String command)方法),即

public static File createConnectorPluginWithGradle() {
        // Check if file exists and delete if it does
        File existingJarFile = findFile("build/libs", "-all.jar");
        if (existingJarFile != null && existingJarFile.exists()) {
            existingJarFile.delete();
        }
        boolean isWindows = System.getProperty("os.name").toLowerCase().startsWith("windows");
        String gradleCmd = isWindows ? "gradlew.bat" : "./gradlew";
        String output = execCmd(gradleCmd + " shadowJar");
        if (output == null || !output.contains("BUILD SUCCESSFUL")) {
            throw new RuntimeException("Gradle build unsuccessful\n\n" + output);
        }
        File jarFile = findFile("build/libs", "-all.jar");
        if (!jarFile.exists()) {
            throw new RuntimeException("Shadow jar does not exist: " + jarFile.getAbsolutePath());
        }
        return jarFile;
    }

它会删除该JAR文件(如果已经存在),并检查是否在Windows与Linux类型的环境中运行。然后它通过execCmd方法执行gradlew.bat shadowJar(Windows)或./gradlew shadowJar(其他环境):

public static String execCmd(String cmd) {
        try {
            Scanner s = new Scanner(Runtime.getRuntime().exec(cmd).getInputStream()).useDelimiter("\\A");
            return s.hasNext() ? s.next() : "";
        } catch (IOException ioEx) {
            throw new RuntimeException("Error executing: " + cmd, ioEx);
        }
    }

该文件以File对象的形式返回,然后绑定到Kafka Connect docker容器:

private static final String PLUGIN_JAR_FILE = "kafka-smt.jar";

    ...

    kafkaConnect.withFileSystemBind(jarFile.getAbsolutePath(),
            "/usr/share/java/kafka-smt-plugins/" + PLUGIN_JAR_FILE);

现在,如果你回到最初的问题,当Kafka Connect解析/usr/share/java/kafka-smt-plugins/文件夹时,没有Added plugin ...日志(这表明它无法成功加载任何SMT类)。
现在,如果我们在SmtIntegrationTest运行时检查Kafka Connect日志,我们将看到(* 时间戳/额外日志已删除,以提高可读性 *):

$ docker logs -f $(docker ps | grep kafka-connect | cut -d " " -f1)

...

INFO Loading plugin from: /usr/share/java/kafka-smt-plugins (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-smt-plugins/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Added plugin 'smt.test.RandomField$Key' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Added plugin 'smt.test.RandomField$Value' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
...
INFO Loading plugin from: /usr/share/java/confluent-telemetry (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)

Added plugin 'smt.test.RandomField$Value日志的出现表明我们的SMT类已经成功加载,我们的集成测试方法...

@Test
    public void db_insert_creates_kakfa_message_with_smt() throws Exception {
        String topicName = TOPIC_PREFIX + "." + DB_TABLE_EMPLOYEE;
        setupConnector(DB_TABLE_EMPLOYEE, "my_random_field", 23, true, false);

        databaseInsert(DB_TABLE_EMPLOYEE, "id, name, manager", 1002, "John", "David");

        awaitForTopicCreation(topicName);
        List<ConsumerRecord> records = getRecordsFromKafkaTopic(topicName, 1);
        assertThat(records).hasSize(1);
        Map<String, Object> record = MAPPER.readValue((String) records.get(0).value(), Map.class);
        System.out.println(MAPPER.writeValueAsString(record));
        Map<String, Object> payload = (Map) record.get("payload");
        assertThat(payload).containsOnlyKeys("id", "name", "updated", "manager", "my_random_field")
            .containsEntry("id", 1002)
            .containsEntry("name", "John")
            .containsEntry("manager", "David");
        assertThat((String) payload.get("my_random_field")).matches("[a-zA-Z]{23}");
    }

现在成功通过了注意,System.out.println显示:

{
  "payload" : {
    "id" : 1002,
    "name" : "John",
    "manager" : "David",
    "updated" : 1696930054712,
    "my_random_field" : "NpAmwjtwezVetpcNWKgzpbN"
  },
  "schema" : {
    "name" : "employee",
    "type" : "struct",
    "fields" : [ {
      "type" : "int32",
      "optional" : false,
      "field" : "id"
    }, {
      ...
    }, {
      "type" : "string",
      "optional" : false,
      "field" : "my_random_field"
    } ]
  }
}

正如您所看到的,my_random_field存在于Kafka记录的payloadschema部分中,这证实了我们的RandomField正在工作。

相关问题