docker容器中的kafka连接:未添加连接器

vhmi4jdf  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(405)

我正在尝试运行一个简单的kafka connect容器。我确实尝试了confluent connect教程,但设置略有不同(没有docker机器,没有模式注册表)。
目前,我正在使用一个docker compose设置,其中包含zookeeper和kafka。

version: '3.1'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    ports:
      - 2181
    environment:
      - ZOOKEEPER_CLIENT_PORT=2181
      - ZOOKEEPER_TICK_TIME=2000
      - ZOOKEEPER_SYNC_LIMIT=2
  kafka:
    image: confluentinc/cp-kafka
    depends_on:
      - zookeeper
    ports:
      - 9092
      - 9094:9094
    environment:
      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
      # setup :9092 for access inside the docker network, 9094 for outside (ie host)
      - KAFKA_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://kafka:9094
      - KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://localhost:9094
      - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
      - KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
      - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
      - KAFKA_NUM_PARTITIONS=10

这对不同的用途很好,所以我不认为这是个问题。
现在,我开始一个Kafka连接容器,连接罚款Kafka。我使用以下改编自“连接”教程的命令:

docker run -d \
  --name=kafka-connect-test \
  --net=kafka-connect_default \
  --expose 28083 \
  -p 28083:28083 \
  -e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 \
  -e CONNECT_REST_PORT=28083 \
  -e CONNECT_GROUP_ID="quickstart-test" \
  -e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-test-config" \
  -e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-test-offsets" \
  -e CONNECT_STATUS_STORAGE_TOPIC="quickstart-test-status" \
  -e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 \
  -e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 \
  -e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 \
  -e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
  -e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
  -e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
  -e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
  -e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" \
  -e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG \
  -e CONNECT_PLUGIN_PATH=/usr/share/java/kafka,/etc/kafka-connect/jars \
  -v /tmp/quickstart/file:/tmp/quickstart \
  -v /tmp/quickstart/jars:/etc/kafka-connect/jars \
  confluentinc/cp-kafka-connect:latest

最显著的区别是我用的是 StringConverter ,因为我想用 kafkacat 插入测试数据。
容器启动良好,正在运行,并且可以在我尝试的所有公开端点上访问。因为我没有添加任何连接器,所以我查询了可用的连接器: localhost:28083/connector-plugins :

[
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "5.4.0-ccs"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "5.4.0-ccs"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

所以现在,我只要创建一个文件接收器,将主题中的数据写入文件就足够了。我发到 localhost:28083/connectors ```
{ "name": "file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": 1,
"file": "/test.sink.txt",
"topics": "test-topic"
}
}

接受 `201 - Created` .
但是,当使用 `GET` ,我得到一个空数组作为响应。尝试一下,我也可以改变 `connector.class` 至 `FileStreamSinkConnector` 或者只是 `FileStreamSink` 仍然会得到 `201` (未添加接头)。
我做错什么了?
为什么我会在明显出了问题的时候得到“成功”的回应?
rdrgkggo

rdrgkggo1#

问题在于:

-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" \

内部转换器只是内部的,自从ApacheKafka发行版2.0以来,就被弃用了。如果在创建连接器后检查kafka connect worker日志,您将看到:

ERROR Found configuration for connector 'connector-file-sink' in wrong format: class java.lang.String (org.apache.kafka.connect.storage.KafkaConfigBackingStore)

这是因为kafka connect将kafka本身用作状态存储,当您创建连接器时,它将其存储在kafka主题上( CONNECT_CONFIG_STORAGE_TOPIC ). 这默认为json,看起来kafka connect不喜欢被更改(实际上,没有理由更改它)。
如果您像以前一样运行相同的docker命令,但是没有这两个命令 CONNECT_INTERNAL_ 转换器线,你会发现一切正常。
下面是正在创建的连接器(我正在使用 PUT 而不是 POST 因为它是幂等的并且更容易重新运行):

curl -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:28083/connectors/file-sink/config \
    -d '{
     "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
     "tasks.max": 1,
     "file": "/test.sink.txt",
     "topics": "test-topic"
}'
HTTP/1.1 201 Created
Date: Wed, 11 Mar 2020 09:16:04 GMT
Location: http://localhost:28083/connectors/file-sink
Content-Type: application/json
Content-Length: 211
Server: Jetty(9.4.20.v20190813)

{"name":"file-sink","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","tasks.max":"1","file":"/test.sink.txt","topics":"test-topic","name":"file-sink"},"tasks":[],"type":"sink"}%

现在检查它是否正在运行(使用一些bash的东西来重新格式化它):

curl -s "http://localhost:28083/connectors?expand=info&expand=status" | \
       jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
       column -s : -t| sed 's/\"//g'| sort
sink  |  file-sink  |  RUNNING  |  RUNNING  | org.apache.kafka.connect.file.FileStreamSinkConnector

向主题发送一些数据:

➜ kafkacat -b localhost:9094 -t test-topic -P -K:
1:foo
2:bar

观察kafka connect写入的文件中的数据:

➜ docker exec -t kafka-connect-test bash -c 'tail -f /test.sink.txt'
foo
bar

顺便说一句:
最显著的区别是我用的是 StringConverter ,因为我想使用kafkacat插入测试数据。
注意,您可以将每个连接器的转换器设置为配置的一部分;在worker(即全局)级别设置stringconverter可能不是一个好主意,因为您很少使用它,当然是用于值。
有关Kafka连接的更多信息,请访问:
从零到英雄与Kafka连接
Kafka连接深潜-转换器和序列化解释

相关问题