我对Kafka还很陌生,我还在学习一些知识。我有一个带Kafka Connect的Ecs-Ec2集群,结构如下:
├── bitbucket-pipelines.yml
├── docker-compose.yml
├── Dockerfile
├── env.sh
├── jmx-kafka-connect-config.yml
├── password.properties
├── rest-jaas.properties
├── task-definition.json
├── worker.properties
在我的Dockerfile中,我运行COPY
来替换我处理的properties
文件,并运行CMD connect-standalone /etc/kafka-connect/worker.properties
来启动。
我有两个问题。
1.在某些教程中,设置有很大的不同。它们由环境变量传递。我的工作方式是正确的,还是我需要改正?
1.我正在运行connect-standalone
,但我已经读到,为了扩展,我需要使用connect-distributed
。我怎样才能调整这一点,使至少有3个工人在同一主题上工作?这将是一种让H.A.
docker-compose.yml
version: "3"
services:
kafka-connect:
build:
context: .
ports:
- "8083:8083"
- "8085:8085"
Dockerfile
FROM confluentinc/cp-server-connect:7.4.0
RUN confluent-hub install debezium/debezium-connector-mysql:2.1.4 --no-prompt
RUN confluent-hub install confluentinc/kafka-connect-elasticsearch:14.0.8 --no-prompt
USER root
RUN microdnf -y install vim
COPY jmx-kafka-connect-config.yml /etc/kafka-connect/jmx-kafka-connect-config.yml
COPY jmx_prometheus_javaagent-0.18.0.jar /etc/kafka-connect/jars/jmx_prometheus_javaagent.jar
COPY worker.properties /etc/kafka-connect/worker.properties
COPY password.properties /etc/kafka-connect/password.properties
COPY rest-jaas.properties /etc/kafka-connect/rest-jaas.properties
ENV KAFKA_JMX_PORT=1976
ENV KAFKA_OPTS="-Djava.security.auth.login.config=/etc/kafka-connect/rest-jaas.properties -javaagent:/etc/kafka-connect/jars/jmx_prometheus_javaagent.jar=8085:/etc/kafka-connect/jmx-kafka-connect-config.yml"
EXPOSE 8083
EXPOSE 8085
RUN sed -i 's|log4j.rootLogger=INFO|log4j.rootLogger=ERROR|g' /etc/kafka/connect-log4j.properties
CMD connect-standalone /etc/kafka-connect/worker.properties
env.sh- * 通过Bitbucket-pipeline运行 *
#!/bin/sh
touch worker.properties
echo bootstrap.servers=$KAFKA_BOOTSTRAP_SERVERS >> worker.properties
echo confluent.license=$CONFLUENT_LICENSE >> worker.properties
echo connector.client.config.override.policy=All >> worker.properties
echo consumer.sasl.mechanism=PLAIN >> worker.properties
echo consumer.sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username="\"$KAFKA_SASL_USERNAME\"" password="\"$KAFKA_SASL_PASSWORD\"";" >> worker.properties
echo consumer.security.protocol=SASL_SSL >> worker.properties
echo producer.sasl.mechanism=PLAIN >> worker.properties
echo producer.sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username="\"$KAFKA_SASL_USERNAME\"" password="\"$KAFKA_SASL_PASSWORD\"";" >> worker.properties
echo producer.security.protocol=SASL_SSL >> worker.properties
echo sasl.mechanism=PLAIN >> worker.properties
echo sasl.jaas.config="org.apache.kafka.common.security.plain.PlainLoginModule required username="\"$KAFKA_SASL_USERNAME\"" password="\"$KAFKA_SASL_PASSWORD\"";" >> worker.properties
echo security.protocol=SASL_SSL >> worker.properties
echo group.id=sv-connect >> worker.properties
echo config.storage.topic=sv-connect-configs >> worker.properties
echo offset.storage.topic=sv-connect-offsets >> worker.properties
echo status.storage.topic=sv-connect-status >> worker.properties
echo key.converter=org.apache.kafka.connect.json.JsonConverter >> worker.properties
echo value.converter=org.apache.kafka.connect.json.JsonConverter >> worker.properties
echo internal.key.converter=org.apache.kafka.connect.json.JsonConverter >> worker.properties
echo internal.value.converter=org.apache.kafka.connect.json.JsonConverter >> worker.properties
echo offset.storage.file.filename=/tmp/offset.txt >> worker.properties
echo rest.extension.classes=org.apache.kafka.connect.rest.basic.auth.extension.BasicAuthSecurityRestExtension >> worker.properties
echo log4j.root.loglevel=WARN >> worker.properties
echo log4j.loggers=org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR >> worker.properties
echo config.storage.replication.factor=3 >> worker.properties
echo offset.storage.replication.factor=3 >> worker.properties
echo status.storage.replication.factor=3 >> worker.properties
echo plugin.path=/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components >> worker.properties
echo kafka.jmx.port=1976 >> worker.properties
echo kafka.opts=-javaagent:/etc/kafka-connect/jars/jmx_prometheus_javaagent.jar=8085:/etc/kafka-connect/jmx-kafka-connect-config.yml >> worker.properties
touch password.properties
echo "$KAFKA_SASL_USERNAME: $KAFKA_SASL_PASSWORD" >> password.properties
1条答案
按热度按时间k5hmc34c1#
它们通过环境变量传递
你可以使用
kafka-connect-base
镜像,顺便说一下,因为你使用的连接器不需要许可证,但是是的,他们使用Jinja模板来定义配置。真的,没有理由复制自己的值,特别是如果它们都是静态值(不依赖于其他环境变量,例如可以在运行时确定的AWS区域)。
我在运行独立连接
图像不运行此。它的默认值是
connect-distributed.sh
,所以不要覆盖ENNTYPOINT / CMD,这样就可以了。只需缩放ECS复制副本。注意:您还可以使用EKS并在KafkaConnect k8s资源中运行Strimzi,它提供了更多有用的功能,包括内置的JMX监视和警报功能。
三个人在同一个主题上工作。
这是由
tasks.max
控制的,而不是工人。也就是说,ElasticSink设置为3将并行启动三个Kafka Consumer线程,并从一个消费者组中的至少3个主题分区中读取。如果主题中的分区较少,那么就会有空闲任务。