如何进入Kafka大桥

jgovgodb  于 2021-06-04  发布在  Kafka
关注(0)|答案(3)|浏览(515)

我正在使用strimzi,我想学习如何使用kafka桥,为了理解它的工作原理,我使用下面的yml文件创建了一个kafka集群

apiVersion: kafka.strimzi.io/v1beta1

kind: Kafka

metadata:

  name: my-cluster

spec:

  kafka:

    version: 2.6.0

    replicas: 3

    listeners:

      - name: plain

        port: 9092

        type: internal

        tls: false

      - name: tls

        port: 9093

        type: internal

        tls: true

    config:

      offsets.topic.replication.factor: 3

      transaction.state.log.replication.factor: 3

      transaction.state.log.min.isr: 2

      log.message.format.version: "2.6"

    storage:

      type: jbod

      volumes:

      - id: 0

        type: persistent-claim

        size: 100Gi

        deleteClaim: false

  zookeeper:

    replicas: 3

    storage:

      type: persistent-claim

      size: 100Gi

      deleteClaim: false

  entityOperator:

    topicOperator: {}

    userOperator: {}

这是Kafka桥的yml语法

apiVersion: kafka.strimzi.io/v1alpha1

kind: KafkaBridge

metadata:

  name: my-bridge

spec:

  replicas: 1

  bootstrapServers: my-cluster-kafka-bootstrap:9092

  http:

    port: 8080

现在是服务

ist@ist-1207:~$ kubectl   get svc -A

NAMESPACE     NAME                             TYPE        CLUSTER-IP       EXTERNAL-IP   PORT(S)                      AGE

default       kubernetes                       ClusterIP   10.96.0.1        <none>        443/TCP                      3d1h

kube-system   kube-dns                         ClusterIP   10.96.0.10       <none>        53/UDP,53/TCP,9153/TCP       3d1h

strimzi       my-bridge-bridge-service         ClusterIP   10.100.153.164   <none>        8080/TCP                     88m

strimzi       my-cluster-kafka-bootstrap       ClusterIP   10.97.160.117    <none>        9091/TCP,9092/TCP,9093/TCP   109m

strimzi       my-cluster-kafka-brokers         ClusterIP   None             <none>        9091/TCP,9092/TCP,9093/TCP   109m

strimzi       my-cluster-zookeeper-client      ClusterIP   10.107.59.225    <none>        2181/TCP                     110m

strimzi       my-cluster-zookeeper-nodes       ClusterIP   None             <none>        2181/TCP,2888/TCP,3888/TCP   110m

strimzi       my-connect-cluster-connect-api   ClusterIP   10.110.27.15     <none>        8083/TCP                     25m

现在我使用以下命令执行它的pod:

$ kubectl  exec -it my-bridge-bridge-684df9fc64-bktc4 -n strimzi bash

之后呢

[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X POST http://localhost:8080/consumers/my-group   -H 'content-type: application/vnd.kafka.v2+json'   -d '{
    "name": "your-consumer",
    "format": "json",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": false
}'
{"instance_id":"your-consumer","base_uri":"http://localhost:8080/consumers/my-group/instances/your-consumer"}

然后

[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$   curl -X POST http://localhost:8080/consumers/my-group/instances/your-consumer/subscription   -H 'content-type: application/vnd.kafka.v2+json'   -d '{
    "topics": [
        "your-topic"
    ]
}'

然后

[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$   curl -X GET http://localhost:8080/consumers/my-group/instances/your-consumer/records \
>   -H 'accept: application/vnd.kafka.json.v2+json'
[]

然后我制作了

[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$ curl -X POST \
>   http://localhost:8080/topics/your-topic \
>   -H 'content-type: application/vnd.kafka.json.v2+json' \
>   -d '{
>     "records": [
>         {
>             "key": "key-1",
>             "value": "kajal verma"
>         },
>         {
>             "key": "key-2",
>             "value": "Aman verma"
>         }
>     ]
> }'
{"offsets":[{"partition":0,"offset":2},{"partition":0,"offset":3}]}

[strimzi@my-bridge-bridge-684df9fc64-bktc4 strimzi]$   curl -X GET http://localhost:8080/consumers/my-group/instances/your-consumer/records \
>   -H 'accept: application/vnd.kafka.json.v2+json'
[]

再说一次,我没有拿到唱片,不知道为什么。
这些是吊舱桥的原木

ist@ist-1207:~$ kubectl logs -f my-bridge-bridge-684df9fc64-lstx2 -n strimzi
Kafka Bridge configuration:

# Bridge configuration

bridge.id=my-bridge

# Kafka common properties

kafka.bootstrap.servers=my-cluster-kafka-bootstrap:9092
kafka.security.protocol=PLAINTEXT

# Apache Kafka Producer

# Apache Kafka Consumer

# HTTP configuration

http.enabled=true
http.host=0.0.0.0
http.port=8080
http.cors.enabled=false
http.cors.allowedOrigins=
http.cors.allowedMethods=

[2020-11-24 11:42:01,374] INFO  <Application :64> [main        ] Strimzi Kafka Bridge 0.19.0 is starting
[2020-11-24 11:42:02,939] WARN  <onMetaSchema:337> [oop-thread-1] Unknown keyword example - you should define your own Meta Schema. If the keyword is irrelevant for validation, just use a NonValidationKeyword
[2020-11-24 11:42:03,402] INFO  <HttpBridge  :180> [oop-thread-1] Starting HTTP-Kafka bridge verticle...
[2020-11-24 11:42:03,459] INFO  <ClientConfig:354> [oop-thread-1] AdminClientConfig values: 
    bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
    client.dns.lookup = use_all_dns_ips
    client.id = 
    connections.max.idle.ms = 300000
    default.api.timeout.ms = 60000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS

[2020-11-24 11:42:04,336] INFO  <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:42:04,336] INFO  <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:42:04,338] INFO  <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218124265
Nov 24, 2020 11:42:05 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main]=Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 2656 ms, time limit is 2000 ms
Nov 24, 2020 11:42:06 AM io.vertx.core.impl.BlockedThreadChecker
WARNING: Thread Thread[vert.x-eventloop-thread-1,5,main]=Thread[vert.x-eventloop-thread-1,5,main] has been blocked for 3869 ms, time limit is 2000 ms
[2020-11-24 11:42:07,589] INFO  <HttpBridge  :102> [oop-thread-1] HTTP-Kafka Bridge started and listening on port 8080
[2020-11-24 11:42:07,604] INFO  <HttpBridge  :103> [oop-thread-1] HTTP-Kafka Bridge bootstrap servers my-cluster-kafka-bootstrap:9092
[2020-11-24 11:42:07,609] INFO  <Application :219> [oop-thread-0] HTTP verticle instance deployed [6bc2ded6-162c-4444-b8ac-f51c0573e389]
[2020-11-24 11:44:15,295] WARN  <etworkClient:757> [dminclient-1] [AdminClient clientId=adminclient-1] Connection to node -1 (my-cluster-kafka-bootstrap/10.97.102.188:9092) could not be established. Broker may not be available.
[2020-11-24 11:44:15,301] INFO  <adataManager:235> [dminclient-1] [AdminClient clientId=adminclient-1] Metadata update failed
org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1606218154377, tries=1, nextAllowedTryMs=1606218255400) timed out at 1606218255300 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.
[2020-11-24 11:46:09,397] INFO  <eateConsumer:85> [oop-thread-1] [9562016] CREATE_CONSUMER Request: from 127.0.0.1:34080, method = POST, path = /consumers/my-group
[2020-11-24 11:46:09,508] INFO  <nsumerConfig:354> [oop-thread-1] ConsumerConfig values: 
    allow.auto.create.topics = true
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
    bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
    check.crcs = true
    client.dns.lookup = use_all_dns_ips
    client.id = your-consumer
    client.rack = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = false
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = my-group
    group.instance.id = null
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    internal.throw.on.fetch.stable.offset.unsupported = false
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer

[2020-11-24 11:46:09,759] INFO  <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:46:09,759] INFO  <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:46:09,760] INFO  <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218369758
[2020-11-24 11:46:09,775] INFO  <idgeEndpoint:140> [oop-thread-1] Created consumer your-consumer in group my-group
[2020-11-24 11:46:09,781] INFO  <eateConsumer:85> [oop-thread-1] [9562016] CREATE_CONSUMER Response:  statusCode = 200, message = OK
[2020-11-24 11:46:33,188] INFO  <subscribe   :85> [oop-thread-1] [343058276] SUBSCRIBE Request: from 127.0.0.1:34370, method = POST, path = /consumers/my-group/instances/your-consumer/subscription
[2020-11-24 11:46:33,199] INFO  <idgeEndpoint:191> [oop-thread-1] Subscribe to topics [SinkTopicSubscription(topic=your-topic,partition=null,offset=null)]
[2020-11-24 11:46:33,224] INFO  <subscribe   :85> [oop-thread-1] [343058276] SUBSCRIBE Response:  statusCode = 200, message = OK
[2020-11-24 11:46:33,231] INFO  <afkaConsumer:965> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Subscribed to topic(s): your-topic
[2020-11-24 11:46:58,713] INFO  <poll        :85> [oop-thread-1] [690326198] POLL Request: from 127.0.0.1:34814, method = GET, path = /consumers/my-group/instances/your-consumer/records
[2020-11-24 11:46:58,756] INFO  <poll        :85> [oop-thread-1] [690326198] POLL Response:  statusCode = 200, message = OK
[2020-11-24 11:47:25,358] INFO  <send        :85> [oop-thread-1] [859539310] SEND Request: from 127.0.0.1:35210, method = POST, path = /topics/your-topic
[2020-11-24 11:47:25,474] INFO  <oducerConfig:354> [oop-thread-1] ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-1
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    internal.auto.downgrade.txn.commit = false
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

[2020-11-24 11:47:25,611] INFO  <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:47:25,631] INFO  <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:47:25,631] INFO  <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218445608
[2020-11-24 11:47:25,649] INFO  <oducerConfig:354> [oop-thread-1] ProducerConfig values: 
    acks = 0
    batch.size = 16384
    bootstrap.servers = [my-cluster-kafka-bootstrap:9092]
    buffer.memory = 33554432
    client.dns.lookup = use_all_dns_ips
    client.id = producer-2
    compression.type = none
    connections.max.idle.ms = 540000
    delivery.timeout.ms = 120000
    enable.idempotence = false
    interceptor.classes = []
    internal.auto.downgrade.txn.commit = false
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 1048576
    metadata.max.age.ms = 300000
    metadata.max.idle.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 2147483647
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    security.providers = null
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
    ssl.endpoint.identification.algorithm = https
    ssl.engine.factory.class = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLSv1.3
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer

[2020-11-24 11:47:25,670] INFO  <ppInfoParser:117> [oop-thread-1] Kafka version: 2.6.0
[2020-11-24 11:47:25,671] INFO  <ppInfoParser:118> [oop-thread-1] Kafka commitId: 62abe01bee039651
[2020-11-24 11:47:25,672] INFO  <ppInfoParser:119> [oop-thread-1] Kafka startTimeMs: 1606218445665
[2020-11-24 11:47:25,673] INFO  <Metadata    :279> [| producer-1] [Producer clientId=producer-1] Cluster ID: MJQu4vwtR4iZ7eIiDp7zLg
[2020-11-24 11:47:25,685] INFO  <Metadata    :279> [| producer-2] [Producer clientId=producer-2] Cluster ID: MJQu4vwtR4iZ7eIiDp7zLg
[2020-11-24 11:47:25,714] INFO  <send        :85> [oop-thread-1] [859539310] SEND Response:  statusCode = 200, message = OK
[2020-11-24 11:47:25,938] INFO  <afkaProducer:1189> [ker-thread-7] [Producer clientId=producer-2] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2020-11-24 11:47:25,944] INFO  <afkaProducer:1189> [ker-thread-7] [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
[2020-11-24 11:47:41,679] INFO  <poll        :85> [oop-thread-1] [484189988] POLL Request: from 127.0.0.1:35416, method = GET, path = /consumers/my-group/instances/your-consumer/records
[2020-11-24 11:47:41,682] WARN  <etworkClient:1073> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Error while fetching metadata with correlation id 2 : {your-topic=LEADER_NOT_AVAILABLE}
[2020-11-24 11:47:41,684] INFO  <Metadata    :279> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Cluster ID: MJQu4vwtR4iZ7eIiDp7zLg
[2020-11-24 11:47:41,688] INFO  <poll        :85> [oop-thread-1] [484189988] POLL Response:  statusCode = 200, message = OK
[2020-11-24 11:53:01,882] INFO  <poll        :85> [oop-thread-1] [1610292326] POLL Request: from 127.0.0.1:39262, method = GET, path = /consumers/my-group/instances/your-consumer/records
[2020-11-24 11:53:01,883] INFO  <poll        :85> [oop-thread-1] [1610292326] POLL Response:  statusCode = 200, message = OK
[2020-11-24 11:53:01,946] INFO  <tCoordinator:815> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] Discovered group coordinator my-cluster-kafka-2.my-cluster-kafka-brokers.strimzi.svc:9092 (id: 2147483645 rack: null)
[2020-11-24 11:53:01,979] INFO  <tCoordinator:553> [mer-thread-0] [Consumer clientId=your-consumer, groupId=my-group] (Re-)joining group
ecbunoof

ecbunoof1#

你在试着使用 kafka-topics 工具从桥荚不包括Kafka二进制文件,所以没有 kafka-topics 工具在里面。您尝试执行的命令可以在您部署的工具可用的kafka pods之一上工作。不管怎样,我不明白你想要达到的目标和桥的使用之间的关系。网桥只是在端口8080上提供了一个到kafka的http接口,因此在kubernetes集群的pod中,您可以对网桥端点执行get/post请求,以便通过http与kafka进行交互。整个桥梁文件可在以下位置获得:https://strimzi.io/docs/bridge/latest/ 如果您想在kubernetes集群之外公开网桥,那么您可以在这里阅读更多关于它的内容:https://strimzi.io/blog/2019/11/05/exposing-http-bridge/ 最后,这里有一个很好的桥梁介绍:https://strimzi.io/blog/2019/07/19/http-bridge-intro/

jv2fixgn

jv2fixgn2#

kafka网桥服务是一个http服务,没有kafka cli工具(如果有,端口8080将不会响应该命令)。
如果要列出主题,请使用get/topics请求
假设您有一个到网桥服务的入口或端口,您不需要执行到容器中就可以做到这一点

igetnqfo

igetnqfo3#

对strimzi版本并不完全确定,但对我来说,它只是看起来脚本不在$path中,或者它的调用有点不同。例如,这来自我的一个集群(我使用confluent的Kafka Helm 图):

root@confluent-kafka-connect-6bf9c944f-fzbgc:/# which kafka-topics
/usr/bin/kafka-topics
root@confluent-kafka-connect-6bf9c944f-fzbgc:/# kafka-topics --help
This tool helps to create, delete, describe, or change a topic.
Option                                   Description         
------                                   -----------                            
--alter                                  Alter the number of partitions,        
                                           replica assignment, and/or           
                                           configuration for the topic.         
(...)

相关问题