我正在尝试使用docker实现kafka到mongodb和mysql的连接。
我想要的是下图:
Kafka连接mongodb:
我看过docker编写的官方mongodb存储库。它有两个问题:
这对我来说太复杂了。因为它运行了mongodb的多个容器,并且使用了大量消耗大量资源的图像。
有一些问题没有解决,最终导致kafka到mongodb的连接出现故障。在这里你可以看到我的问题。
我在docker-compose.yml中使用debezium实现了以下连接:
version: '3.2'
services:
kafka:
image: wurstmeister/kafka:latest
ports:
- target: 9094
published: 9094
protocol: tcp
mode: host
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_LOG_DIRS: /kafka/logs
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- kafka:/kafka
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
volumes:
- zookeeper:/opt/zookeeper-3.4.13
mongo:
image: mongo
container_name: mongo
ports:
- 27017:27017
connect:
image: debezium/connect
container_name: connect
ports:
- 8083:8083
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
volumes:
kafka:
zookeeper:
正如@cricket\u007所说,我不应该使用 debezium
为了我的目的。所以我用了 confluentinc/kafka-connect-datagen
形象。我在docker-compose.yml文件中添加了以下内容,而不是 debezium
:
connect:
image: confluentinc/kafka-connect-datagen
build:
context: .
dockerfile: Dockerfile
hostname: connect
container_name: connect
depends_on:
- zookeeper
ports:
- 8083:8083
environment:
CONNECT_BOOTSTRAP_SERVERS: 'kafka:9092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
CONNECT_LOG4J_ROOT_LOGLEVEL: "INFO"
CONNECT_PLUGIN_PATH: /usr/share/confluent-hub-components
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
# Assumes image is based on confluentinc/kafka-connect-datagen:latest which is pulling 5.2.2 Connect image
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-5.2.2.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
command: "bash -c 'if [ ! -d /usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen ]; then echo \"WARNING: Did not find directory for kafka-connect-datagen (did you remember to run: docker-compose up -d --build ?)\"; fi ; /etc/confluent/docker/run'"
volumes:
- ../build/confluent/kafka-connect-mongodb:/usr/share/confluent-hub-components/kafka-connect-mongodb
dockerfile文件:
FROM confluentinc/cp-kafka-connect
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen
问题:
这个 Kafka-connect-datagen
图像生成假数据,正如存储库中提到的,它不适合生产。我想要的只是把Kafka和mongodb连接起来,既不能少也不能多。明确地说,我如何从Kafka发送数据 curl
把它们保存在mongodb收藏中?
我面对 CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL is required.
错误。正如@cricket\u007所说 schema-registry
是可选的。那我怎么才能摆脱那个形象呢?
在最后一步中,我尝试运行存储库的docker compose文件,如readme.md中所述,不幸的是,我遇到了另一个错误:
警告:无法访问上配置的kafka系统http://localhost:8083注意:此脚本需要curl。
每当我没有对配置进行任何更改时,就会遇到另一个错误:
Kafka Connectors:
{"error_code":409,"message":"Cannot complete request momentarily due to stale configuration (typically caused by a concurrent config change)"}
请帮我找到问题的答案。
我的输出:
Building the MongoDB Kafka Connector
> Task :shadowJar
FatJar: /home/mostafa/Documents/Docker/kafka-mongo/build/libs/kafka-mongo-0.3-SNAPSHOT-all.jar (2.108904 MB)
Deprecated Gradle features were used in this build, making it incompatible with Gradle 6.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/5.2/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 4h 26m 25s
7 actionable tasks: 7 executed
Unzipping the confluent archive plugin....
Archive: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT.zip
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/MongoSinkConnector.properties
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/etc/MongoSourceConnector.properties
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/lib/
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/lib/kafka-mongo-0.3-SNAPSHOT-all.jar
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/manifest.json
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/mongodb-leaf.png
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/assets/mongodb-logo.png
creating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/README.md
inflating: ./build/confluent/mongodb-kafka-connect-mongodb-0.3-SNAPSHOT/doc/LICENSE.txt
Starting docker .
Creating volume "docker_rs2" with default driver
Creating volume "docker_rs3" with default driver
Building connect
Step 1/3 : FROM confluentinc/cp-kafka-connect:5.2.2
---> 32bb41f78617
Step 2/3 : ENV CONNECT_PLUGIN_PATH="/usr/share/confluent-hub-components"
---> Using cache
---> 9e4fd4f10a38
Step 3/3 : RUN confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
---> Using cache
---> 5f879008bb73
Successfully built 5f879008bb73
Successfully tagged confluentinc/kafka-connect-datagen:latest
Recreating mongo1 ...
Recreating mongo1 ... done
Creating mongo3 ... done
Starting broker ... done
Creating mongo2 ... done
Starting schema-registry ... done
Starting connect ... done
Creating rest-proxy ... done
Creating ksql-server ... done
Creating docker_kafka-topics-ui_1 ... done
Creating control-center ... done
Creating ksql-cli ... done
Waiting for the systems to be ready.............
WARNING: Could not reach configured kafka system on http://localhost:8082
Note: This script requires curl.
SHUTTING DOWN
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 68 100 68 0 0 23 0 0:00:02 0:00:02 --:--:-- 23
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 61 100 61 0 0 4066 0 --:--:-- --:--:-- --:--:-- 4066
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 63 100 63 0 0 9000 0 --:--:-- --:--:-- --:--:-- 9000
MongoDB shell version v4.0.12
connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("80ebb904-f81a-4230-b63b-4e62f65fbeb7") }
MongoDB server version: 4.0.12
{
"ok" : 1,
"operationTime" : Timestamp(1567235833, 1),
"$clusterTime" : {
"clusterTime" : Timestamp(1567235833, 1),
"signature" : {
"hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="),
"keyId" : NumberLong(0)
}
}
}
Stopping ksql-cli ... done
Stopping control-center ... done
Stopping docker_kafka-topics-ui_1 ... done
Stopping ksql-server ... done
Stopping rest-proxy ... done
Stopping mongo1 ... done
Stopping mongo2 ... done
Stopping mongo3 ... done
Stopping connect ... done
Stopping broker ... done
Stopping zookeeper ... done
Removing ksql-cli ...
Removing control-center ... done
Removing docker_kafka-topics-ui_1 ... done
Removing ksql-server ... done
Removing rest-proxy ... done
Removing mongo1 ... done
Removing mongo2 ... done
Removing mongo3 ... done
Removing connect ... done
Removing schema-registry ... done
Removing broker ... done
Removing zookeeper ... done
Removing network docker_default
Removing network docker_localnet
WARNING: Could not reach configured kafka system on http://localhost:8082
Note: This script requires curl.
2条答案
按热度按时间6mw9ycah1#
我创建了以下docker compose文件(查看github中的所有文件):
执行后
docker-compose up
您必须配置mongodb群集:确保已安装插件:
正如您在上面看到的,mongodb连接器插件可以使用。假设您有一个名为
mydb
以及一个名为products
我创建了名为sink-connector.json的json文件:现在使用connect restful api创建连接器:
您可以查看连接器的状态:
现在让我们创建一个Kafka主题。首先,我们必须连接到Kafka容器:
然后创建主题:
现在生产产品进入主题:
您可以在图像中查看结果:
希望这对你有帮助。
5lwkijsr2#
debezium从mongo读取数据。如果你想要一个接收器连接器,你需要使用你找到的官方连接器,但是在github上也有其他的连接器。
kafka connect使用restapi,因此您还需要创建一个包含所有连接和主题细节的json负载。你找到的那份回购协议里有一些指南
它运行了多个mongodb容器,还使用了许多消耗大量资源的图像。
您不需要ksql、controlcenter、rest代理、topicui等,只需要kafka、zookeeper、connect、mongo和schema注册表(可选)。所以只需删除compose文件中的其他容器。您也可能不需要多个mongo容器,但是您需要重新配置环境变量,以便只调整到一个示例
如何使用curl从kafka发送数据并将其保存在mongodb集合中?
如果你真的想用
curl
,则需要启动rest代理容器。那会让你度过难关Could not reach configured kafka system on http://localhost:8082
错误消息。