我正在尝试运行一个简单的kafka connect容器。我确实尝试了confluent connect教程,但设置略有不同(没有docker机器,没有模式注册表)。
目前,我正在使用一个docker compose设置,其中包含zookeeper和kafka。
version: '3.1'
services:
zookeeper:
image: confluentinc/cp-zookeeper
ports:
- 2181
environment:
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
- ZOOKEEPER_SYNC_LIMIT=2
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
ports:
- 9092
- 9094:9094
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
# setup :9092 for access inside the docker network, 9094 for outside (ie host)
- KAFKA_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://kafka:9094
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://localhost:9094
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_NUM_PARTITIONS=10
这对不同的用途很好,所以我不认为这是个问题。
现在,我开始一个Kafka连接容器,连接罚款Kafka。我使用以下改编自“连接”教程的命令:
docker run -d \
--name=kafka-connect-test \
--net=kafka-connect_default \
--expose 28083 \
-p 28083:28083 \
-e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 \
-e CONNECT_REST_PORT=28083 \
-e CONNECT_GROUP_ID="quickstart-test" \
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-test-config" \
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-test-offsets" \
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-test-status" \
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
-e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \
-e CONNECT_PLUGIN_PATH=/usr/share/java/kafka,/etc/kafka-connect/jars \
-v /tmp/quickstart/file:/tmp/quickstart \
-v /tmp/quickstart/jars:/etc/kafka-connect/jars \
confluentinc/cp-kafka-connect:latest
最显著的区别是我用的是 StringConverter
,因为我想用 kafkacat
插入测试数据。
容器启动良好,正在运行,并且可以在我尝试的所有公开端点上访问。因为我没有添加任何连接器,所以我查询了可用的连接器: localhost:28083/connector-plugins
:
[
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "5.4.0-ccs"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "5.4.0-ccs"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
所以现在,我只要创建一个文件接收器,将主题中的数据写入文件就足够了。我发到 localhost:28083/connectors
```
{ "name": "file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": 1,
"file": "/test.sink.txt",
"topics": "test-topic"
}
}
接受 `201 - Created` .
但是,当使用 `GET` ,我得到一个空数组作为响应。尝试一下,我也可以改变 `connector.class` 至 `FileStreamSinkConnector` 或者只是 `FileStreamSink` 仍然会得到 `201` (未添加接头)。
我做错什么了?
为什么我会在明显出了问题的时候得到“成功”的回应?
1条答案
按热度按时间rdrgkggo1#
问题在于:
内部转换器只是内部的,自从ApacheKafka发行版2.0以来,就被弃用了。如果在创建连接器后检查kafka connect worker日志,您将看到:
这是因为kafka connect将kafka本身用作状态存储,当您创建连接器时,它将其存储在kafka主题上(
CONNECT_CONFIG_STORAGE_TOPIC
). 这默认为json,看起来kafka connect不喜欢被更改(实际上,没有理由更改它)。如果您像以前一样运行相同的docker命令,但是没有这两个命令
CONNECT_INTERNAL_
转换器线,你会发现一切正常。下面是正在创建的连接器(我正在使用
PUT
而不是POST
因为它是幂等的并且更容易重新运行):现在检查它是否正在运行(使用一些bash的东西来重新格式化它):
向主题发送一些数据:
观察kafka connect写入的文件中的数据:
顺便说一句:
最显著的区别是我用的是
StringConverter
,因为我想使用kafkacat插入测试数据。注意,您可以将每个连接器的转换器设置为配置的一部分;在worker(即全局)级别设置stringconverter可能不是一个好主意,因为您很少使用它,当然是用于值。
有关Kafka连接的更多信息,请访问:
从零到英雄与Kafka连接
Kafka连接深潜-转换器和序列化解释