我正在使用strimzi,我想学习如何使用kafka桥,为了理解它的工作原理,我使用下面的yml文件创建了一个kafka集群
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 2.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
log.message.format.version: "2.6"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 100Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 100Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
这是Kafka桥的yml语法
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaBridge
metadata:
name: my-bridge
spec:
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9092
http:
port: 8080
现在是服务
ist@ist-1207:~$ kubectl get svc -A
NAMESPACE NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
default kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 3d1h
kube-system kube-dns ClusterIP 10.96.0.10 <none> 53/UDP,53/TCP,9153/TCP 3d1h
strimzi my-bridge-bridge-service ClusterIP 10.100.153.164 <none> 8080/TCP 88m
strimzi my-cluster-kafka-bootstrap ClusterIP 10.97.160.117 <none> 9091/TCP,9092/TCP,9093/TCP 109m
strimzi my-cluster-kafka-brokers ClusterIP None <none> 9091/TCP,9092/TCP,9093/TCP 109m
strimzi my-cluster-zookeeper-client ClusterIP 10.107.59.225 <none> 2181/TCP 110m
strimzi my-cluster-zookeeper-nodes ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 110m
strimzi my-connect-cluster-connect-api ClusterIP 10.110.27.15 <none> 8083/TCP 25m
现在我使用以下命令执行它的pod:
$ kubectl exec -it my-bridge-bridge-684df9fc64-bktc4 -n strimzi bash
之后呢
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X POST http://localhost:8080/consumers/my-group -H 'content-type: application/vnd.kafka.v2+json' -d '{
"name": "your-consumer",
"format": "json",
"auto.offset.reset": "earliest",
"enable.auto.commit": false
}'
{"instance_id":"your-consumer","base_uri":"http://localhost:8080/consumers/my-group/instances/your-consumer"}
然后
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X POST http://localhost:8080/consumers/my-group/instances/your-consumer/subscription -H 'content-type: application/vnd.kafka.v2+json' -d '{
"topics": [
"your-topic"
]
}'
然后
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X GET http://localhost:8080/consumers/my-group/instances/your-consumer/records \
> -H 'accept: application/vnd.kafka.json.v2+json'
[]
然后我制作了
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X POST \
> http://localhost:8080/topics/your-topic \
> -H 'content-type: application/vnd.kafka.json.v2+json' \
> -d '{
> "records": [
> {
> "key": "key-1",
> "value": "kajal verma"
> },
> {
> "key": "key-2",
> "value": "Aman verma"
> }
> ]
> }'
{"offsets":[{"partition":0,"offset":2},{"partition":0,"offset":3}]}
[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X GET http://localhost:8080/consumers/my-group/instances/your-consumer/records \
> -H 'accept: application/vnd.kafka.json.v2+json'
[]
再说一次,我没有拿到唱片,不知道为什么。
这些是吊舱桥的原木
ist@ist-1207:~$ kubectl logs -f my-bridge-bridge-684df9fc64-lstx2 -n strimzi
Kafka Bridge configuration:
# Bridge configuration
bridge.id=my-bridge
# Kafka common properties
kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
kafka.security.protocol=PLAINTEXT
# Apache Kafka Producer
# Apache Kafka Consumer
# HTTP configuration
http.enabled=true
http.host=0.0.0.0
http.port=8080
http.cors.enabled=false
http.cors.allowedOrigins=
http.cors.allowedMethods=
[2020-11-24 11:42:01,374] INFO <Application :64> [main ] Strimzi Kafka Bridge 0.19.0 is starting
[2020-11-24 11:42:02,939] WARN <onMetaSchema:337> [oop-thread-1] Unknown keyword example - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
[2020-11-24 11:42:03,402] INFO <HttpBridge :180> [oop-thread-1] Starting HTTP-Kafka bridge verticle...
[2020-11-24 11:42:03,459] INFO <ClientConfig:354> [oop-thread-1] AdminClientConfig values:
bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
client.dns.lookup = use_all_dns_ips
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
[2020-11-24 11:42:04,336] INFO <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:42:04,336] INFO <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:42:04,338] INFO <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218124265
Nov 24, 2020 11:42:05 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main]=Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 2656 ms, time limit is 2000 ms
Nov 24, 2020 11:42:06 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main]=Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 3869 ms, time limit is 2000 ms
[2020-11-24 11:42:07,589] INFO <HttpBridge :102> [oop-thread-1] HTTP-Kafka Bridge started and listening on port 8080
[2020-11-24 11:42:07,604] INFO <HttpBridge :103> [oop-thread-1] HTTP-Kafka Bridge bootstrap servers my-cluster-kafka-bootstrap:9092
[2020-11-24 11:42:07,609] INFO <Application :219> [oop-thread-0] HTTP verticle instance deployed [6bc2ded6-162c-4444-b8ac-f51c0573e389]
[2020-11-24 11:44:15,295] WARN <etworkClient:757> [dminclient-1] [AdminClient clientId=adminclient-1] Connection to node -1 (my-cluster-kafka-bootstrap/10.97.102.188:9092) could not be established. Broker may not be available.
[2020-11-24 11:44:15,301] INFO <adataManager:235> [dminclient-1] [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1606218154377, tries=1, nextAllowedTryMs=1606218255400) timed out at 1606218255300 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
[2020-11-24 11:46:09,397] INFO <eateConsumer:85> [oop-thread-1] [9562016] CREATE_CONSUMER Request: from 127.0.0.1:34080, method = POST, path = /consumers/my-group
[2020-11-24 11:46:09,508] INFO <nsumerConfig:354> [oop-thread-1] ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = your-consumer
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = my-group
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
[2020-11-24 11:46:09,759] INFO <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:46:09,759] INFO <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:46:09,760] INFO <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218369758
[2020-11-24 11:46:09,775] INFO <idgeEndpoint:140> [oop-thread-1] Created consumer your-consumer in group my-group
[2020-11-24 11:46:09,781] INFO <eateConsumer:85> [oop-thread-1] [9562016] CREATE_CONSUMER Response: statusCode = 200, message = OK
[2020-11-24 11:46:33,188] INFO <subscribe :85> [oop-thread-1] [343058276] SUBSCRIBE Request: from 127.0.0.1:34370, method = POST, path = /consumers/my-group/instances/your-consumer/subscription
[2020-11-24 11:46:33,199] INFO <idgeEndpoint:191> [oop-thread-1] Subscribe to topics [SinkTopicSubscription(topic=your-topic,partition=null,offset=null)]
[2020-11-24 11:46:33,224] INFO <subscribe :85> [oop-thread-1] [343058276] SUBSCRIBE Response: statusCode = 200, message = OK
[2020-11-24 11:46:33,231] INFO <afkaConsumer:965> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Subscribed to topic(s): your-topic
[2020-11-24 11:46:58,713] INFO <poll :85> [oop-thread-1] [690326198] POLL Request: from 127.0.0.1:34814, method = GET, path = /consumers/my-group/instances/your-consumer/records
[2020-11-24 11:46:58,756] INFO <poll :85> [oop-thread-1] [690326198] POLL Response: statusCode = 200, message = OK
[2020-11-24 11:47:25,358] INFO <send :85> [oop-thread-1] [859539310] SEND Request: from 127.0.0.1:35210, method = POST, path = /topics/your-topic
[2020-11-24 11:47:25,474] INFO <oducerConfig:354> [oop-thread-1] ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[2020-11-24 11:47:25,611] INFO <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:47:25,631] INFO <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:47:25,631] INFO <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218445608
[2020-11-24 11:47:25,649] INFO <oducerConfig:354> [oop-thread-1] ProducerConfig values:
acks = 0
batch.size = 16384
bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-2
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = false
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.3
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
[2020-11-24 11:47:25,670] INFO <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:47:25,671] INFO <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:47:25,672] INFO <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218445665
[2020-11-24 11:47:25,673] INFO <Metadata :279> [| producer-1] [Producer clientId=producer-1] Cluster ID: MJQu4vwtR4iZ7eIiDp7zLg
[2020-11-24 11:47:25,685] INFO <Metadata :279> [| producer-2] [Producer clientId=producer-2] Cluster ID: MJQu4vwtR4iZ7eIiDp7zLg
[2020-11-24 11:47:25,714] INFO <send :85> [oop-thread-1] [859539310] SEND Response: statusCode = 200, message = OK
[2020-11-24 11:47:25,938] INFO <afkaProducer:1189> [ker-thread-7] [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2020-11-24 11:47:25,944] INFO <afkaProducer:1189> [ker-thread-7] [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2020-11-24 11:47:41,679] INFO <poll :85> [oop-thread-1] [484189988] POLL Request: from 127.0.0.1:35416, method = GET, path = /consumers/my-group/instances/your-consumer/records
[2020-11-24 11:47:41,682] WARN <etworkClient:1073> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Error while fetching metadata with correlation id 2 : {your-topic=LEADER_NOT_AVAILABLE}
[2020-11-24 11:47:41,684] INFO <Metadata :279> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Cluster ID: MJQu4vwtR4iZ7eIiDp7zLg
[2020-11-24 11:47:41,688] INFO <poll :85> [oop-thread-1] [484189988] POLL Response: statusCode = 200, message = OK
[2020-11-24 11:53:01,882] INFO <poll :85> [oop-thread-1] [1610292326] POLL Request: from 127.0.0.1:39262, method = GET, path = /consumers/my-group/instances/your-consumer/records
[2020-11-24 11:53:01,883] INFO <poll :85> [oop-thread-1] [1610292326] POLL Response: statusCode = 200, message = OK
[2020-11-24 11:53:01,946] INFO <tCoordinator:815> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Discovered group coordinator my-cluster-kafka-2.my-cluster-kafka-brokers.strimzi.svc:9092 (id: 2147483645 rack: null)
[2020-11-24 11:53:01,979] INFO <tCoordinator:553> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] (Re-)joining group
3条答案
按热度按时间ecbunoof1#
你在试着使用
kafka-topics
工具从桥荚不包括Kafka二进制文件,所以没有kafka-topics
工具在里面。您尝试执行的命令可以在您部署的工具可用的kafka pods之一上工作。不管怎样,我不明白你想要达到的目标和桥的使用之间的关系。网桥只是在端口8080上提供了一个到kafka的http接口,因此在kubernetes集群的pod中,您可以对网桥端点执行get/post请求,以便通过http与kafka进行交互。整个桥梁文件可在以下位置获得:https://strimzi.io/docs/bridge/latest/ 如果您想在kubernetes集群之外公开网桥,那么您可以在这里阅读更多关于它的内容:https://strimzi.io/blog/2019/11/05/exposing-http-bridge/ 最后,这里有一个很好的桥梁介绍:https://strimzi.io/blog/2019/07/19/http-bridge-intro/jv2fixgn2#
kafka网桥服务是一个http服务,没有kafka cli工具(如果有,端口8080将不会响应该命令)。
如果要列出主题,请使用get/topics请求
假设您有一个到网桥服务的入口或端口,您不需要执行到容器中就可以做到这一点
igetnqfo3#
对strimzi版本并不完全确定,但对我来说,它只是看起来脚本不在$path中,或者它的调用有点不同。例如,这来自我的一个集群(我使用confluent的Kafka Helm 图):