我正在尝试使用TestContainers来集成测试一个Kafka Connect自定义SMT(Simple Message Transformer),它需要使用一个外部库(例如:Apache Commons)。
测试应:
1.启动(a)Kafka,(B)ZooKeeper,(c)PostgreSQL数据库,(d)Kafka使用TestContainers library连接容器。
- Connect示例应针对PostgreSQL数据库配置JDCB Source Connector。
1.集成测试向测试数据库表中添加一行。 - JDBC源连接器应该检测DB插入并创建Kafka消息.
1..并使用自定义SMT对其进行转换,然后将其发布到Kafka主题。
1.该测试从topic读取转换后的消息,并Assert字段是正确的。
我的问题是创建一个包含自定义SMT的文件.
我发现一篇博客文章Testing Kafka Connectors,作者以图解的方式创建了一个JavaScript文件(包含一个自定义SMT),作为集成测试的一部分,如下所示:
private static void createConnectorJar() throws IOException {
Manifest manifest = new Manifest();
manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION, "1.0");
JarOutputStream target = new JarOutputStream(new FileOutputStream("target/kcetcd-connector/etcd-connector.jar"), manifest);
add(new File("target/classes"), target);
target.close();
}
然后将其连接到Docker TestContainer镜像,作为集成测试的一部分,使用withFileSystemBind
方法连接到classes/kcetcd-connector
文件夹中的jar文件:
public static DebeziumContainer connectContainer = new DebeziumContainer("debezium/connect-base:1.9.5.Final")
.withFileSystemBind("target/kcetcd-connector", "/kafka/connect/kcetcd-connector")
.withNetwork(network)
.withKafka(kafkaContainer)
.dependsOn(kafkaContainer);
然而,这个解决方案只适用于简单的SMT,它不回复外部库(例如,使用Gradle引入)。我的测试Kafka Connect容器没有成功地拾取JAR文件中的类。
有人有使用测试容器测试更复杂的SMT的解决方案吗?我认为集成测试本质上需要创建一个绑定到docker卷的影子卷。
更新(2023年10月6日)
我已经上传了一个工作示例到以下GitHub repo:
代码结构如下:
src
main
java
smt
test
RandomField.java Custom SMT which adds a field containing random characters
test
java
smt
test
RandomFieldTest.java Unit test to test RandomField
SmtIntegrationTest.java Integration test using TestContainers
SmtIntegrationTest
集成测试类试图完成我在上面提到的任务(即:通过实际的Docker容器(如Kafka、ZooKeeper、SchemaRegistry、Postgres和KafkaConnect)端到端测试所有内容。
如果您通过./gradlew test
执行所有测试.
> Task :test
smt.test.SmtIntegrationTest
Test db_insert_creates_kakfa_message_with_smt() FAILED
> :test > Executing test smt.test.SmtIntegrationTest
java.lang.AssertionError: 1 expectation failed.
Expected status code <201> but was <400>.
at smt.test.SmtIntegrationTest.setupConnector(SmtIntegrationTest.java:352)
Test db_insert_creates_kakfa_message() PASSED (1.3s)
smt.test.RandomFieldTest
Test topLevelStructRequired() PASSED
Test schemalessInsertRandomField() PASSED
Test copySchemaAndInsertUuidField() PASSED
FAILURE: Executed 5 tests in 22.9s (1 failed)
.除了集成测试类SmtIntegrationTest
的db_insert_creates_kakfa_message_with_smt
测试方法之外,所有测试都通过了。
通过的集成测试方法(db_insert_creates_kakfa_message
)没有使用自定义SMT,这证明了其他一切都可以工作,即将行插入到数据库表中会被正确拾取,并导致创建Kafka消息并将其发布到Kakfa主题。
这两个测试之间的唯一区别是,在设置JdbcSourceConnector
连接器时,* 失败测试方法 * 设置了一些Transformer属性(例如,transforms=randomfield
),而 *passing方法 * 则没有。这是通过setupConnector
方法完成的,如下所示(注意if
语句,它切换Transformer属性):
private void setupConnector(String databaseTable, String randomFieldName, int size, boolean useLetters, boolean useNumbers) throws IOException {
Map<String, Object> configMap = new HashMap<>();
configMap.put("connector.class", "io.confluent.connect.jdbc.JdbcSourceConnector");
configMap.put("tasks.max", "1");
configMap.put("connection.url",
format("jdbc:postgresql://%s:5432/%s?loggerLevel=OFF", POSTGRES_NETWORK_ALIAS, DB_NAME));
configMap.put("connection.user", DB_USERNAME);
configMap.put("connection.password", DB_PASSWORD);
configMap.put("table.whitelist", databaseTable);
configMap.put("mode", "timestamp+incrementing");
configMap.put("validate.non.null", "false");
configMap.put("topic.prefix", TOPIC_PREFIX + ".");
configMap.put("timestamp.column.name", "updated");
configMap.put("incrementing.column.name", "id");
// Configure SMT if randomFieldName is set
if (randomFieldName != null) {
configMap.put("transforms", "randomfield");
configMap.put("transforms.randomfield.type", "smt.test.RandomField$Value");
configMap.put("transforms.randomfield.random.field.name", randomFieldName);
configMap.put("transforms.randomfield.random.field.size", String.valueOf(size));
configMap.put("transforms.randomfield.random.use.letters", String.valueOf(useLetters));
configMap.put("transforms.randomfield.random.use.numbers", String.valueOf(useNumbers));
}
String payload = MAPPER.writeValueAsString(ImmutableMap.of(
"name", CONNECTOR_NAME, "config", configMap));
given()
.log().headers()
.contentType(ContentType.JSON)
.accept(ContentType.JSON)
.body(payload)
.when()
.post(getKafkaConnectUrl() + "/connectors")
.andReturn()
.then()
.log().all()
.statusCode(HttpStatus.SC_CREATED);
}
检查中断测试的控制台输出显示:-
{
"error_code": 400,
"message": "Connector configuration is invalid and contains the following 2 error(s):
Invalid value smt.test.RandomField$Value for configuration transforms.randomfield.type: Class smt.test.RandomField$Value could not be found.
Invalid value null for configuration transforms.randomfield.type: Not a Transformation
You can also find the above list of errors at the endpoint `/connector-plugins/{connectorType}/config/validate`"
}
Class smt.test.RandomField$Value could not be found
表示我们的自定义Transformer转换器文件没有被拾取。
当SmtIntegrationTest
测试运行时,您可以通过运行docker ps
来查看所有正在运行的Docker容器:
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
670697d6f149 confluentinc/cp-kafka-connect:7.5.0 "bash -c 'confluent-…" 12 seconds ago Up 11 seconds (health: starting) 9092/tcp, 0.0.0.0:58037->8083/tcp connect-1fadc921-2a97-4b78-b325-de10c127c0ef
48982e55c008 confluentinc/cp-kafka:7.5.0 "/etc/confluent/dock…" 12 seconds ago Up 11 seconds 0.0.0.0:58027->9092/tcp blissful_shockley
62e194054ec4 confluentinc/cp-schema-registry:7.5.0 "/etc/confluent/dock…" 12 seconds ago Up 12 seconds 0.0.0.0:58031->8081/tcp blissful_dijkstra
bc54519a449d postgres:11 "docker-entrypoint.s…" 12 seconds ago Up 12 seconds 0.0.0.0:58032->5432/tcp nifty_mestorf
b668c6def0a2 confluentinc/cp-zookeeper:7.5.0 "/etc/confluent/dock…" 12 seconds ago Up 11 seconds 2888/tcp, 3888/tcp, 0.0.0.0:58035->2181/tcp pensive_euler
8f2b26a2057a testcontainers/ryuk:0.5.1 "/bin/ryuk" 13 seconds ago Up 12 seconds 0.0.0.0:58028->8080/tcp testcontainers-ryuk-53dd0444-3f1f-499b-a555-c6d9eac11077
集成测试创建一个包含RandomField
Transformer类的JAR文件,并将其绑定到 kafka-connect 容器中的/usr/share/java/kafka-smt-plugins
文件夹。
我们可以通过docker execute命令运行ls
命令来仔细检查这个错误是否存在:
$ docker exec $(docker ps | grep kafka-connect | cut -d " " -f1) ls -al /usr/share/java/kafka-smt-plugins
total 16
drwxr-xr-x 2 root root 4096 Oct 6 12:20 .
drwxr-xr-x 1 appuser appuser 4096 Oct 6 12:20 ..
-rw-r--r-- 1 root root 5873 Oct 6 12:20 kafka-smt.jar
接下来,如果我们检查 kafka-connect 容器日志,您将看到以下内容(为了可读性,删除了时间戳):
$ docker logs -f $(docker ps | grep kafka-connect | cut -d " " -f1)
...
INFO Loading plugin from: /usr/share/java/kafka-smt-plugins (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/kafka-smt-plugins/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Loading plugin from: /usr/share/java/confluent-control-center (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Registered loader: PluginClassLoader{pluginLocation=file:/usr/share/java/confluent-control-center/} (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Added plugin 'org.apache.kafka.connect.tools.VerifiableSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
INFO Added plugin 'org.apache.kafka.connect.tools.MockSinkConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
...
前两行显示Kafak Connect正在扫描我们的自定义文件夹/usr/share/java/kafka-smt-plugins/
。接下来的4个显示了扫描内置的Kafka Connect文件夹/usr/share/java/confluent-control-center
之一。
检查这些日志,我们的自定义文件夹似乎没有注册任何插件(由于缺乏Added plugin
日志),而confluent-control-center
注册了几个插件,例如。org.apache.kafka.connect.tools.VerifiableSinkConnector
。
我的直觉是我们的自定义SMT jar(通过createConnectorPlugin
方法创建)不包含依赖库,如Apache Commons(包含random()
方法),但日志不会显示此场景的错误。
1条答案
按热度按时间t9aqgxwy1#
在这里张贴一个答案,希望这对其他人有用,并感谢 @OneCricketeer 您的评论/帮助。
我更新了测试GitHub repo:
.如果你做一个gradle构建,单元测试和集成测试都可以工作:
@OneCricketeer提到:
JarOutputStream
复制Gradle shadow jar插件的输出(即一个uberjar),它包含了你的外部依赖项,并将它们复制到一个文件挂载中。*但是,我无法使用
JarOutputStream
创建Uber的JavaScript文件。我使用的解决方案是使用Java并将./gradlew shadowJar
作为外部进程运行(使用JavaRuntime.exec(String command)
方法),即它会删除该JAR文件(如果已经存在),并检查是否在Windows与Linux类型的环境中运行。然后它通过
execCmd
方法执行gradlew.bat shadowJar
(Windows)或./gradlew shadowJar
(其他环境):该文件以
File
对象的形式返回,然后绑定到Kafka Connect docker容器:现在,如果你回到最初的问题,当Kafka Connect解析
/usr/share/java/kafka-smt-plugins/
文件夹时,没有Added plugin ...
日志(这表明它无法成功加载任何SMT类)。现在,如果我们在
SmtIntegrationTest
运行时检查Kafka Connect日志,我们将看到(* 时间戳/额外日志已删除,以提高可读性 *):Added plugin 'smt.test.RandomField$Value
日志的出现表明我们的SMT类已经成功加载,我们的集成测试方法...现在成功通过了注意,
System.out.println
显示:正如您所看到的,
my_random_field
存在于Kafka记录的payload
和schema
部分中,这证实了我们的RandomField
正在工作。