目前我正在使用EKS/Kubernetes集群,我安装了Apache Kafka作为服务。
在集群内部,我可以使用以下命令执行所有Kafka操作,例如创建、删除、消费或读取主题:
#List topics
kubectl exec -it <My pod> -- kafka-topics.sh --list --bootstrap-server kafka.<My namespace>.svc.cluster.local:9092
#Create topic
kubectl exec -it <My pod> -- kafka-topics.sh --create --topic test --partitions 3 --replication-factor 1 --bootstrap-server kafka.<My namespace>.svc.cluster.local:9092
#Delete topic
kubectl exec -it <My pod> -- kafka-topics.sh --delete --topic test --bootstrap-server kafka.<My namespace>.svc.cluster.local:9092
#Produce
kubectl exec -it <My pod> -- kafka-console-producer.sh --topic test --bootstrap-server kafka.<My namespace>.svc.cluster.local:9092 < value.json
#Consume
kubectl exec -it <My pod> -- kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server kafka.<My namespace>.svc.cluster.local:9092
字符串
现在,我想用lambda或一些python代码来摄取数据,而不进入集群,
from confluent_kafka.admin import AdminClient, NewTopic
def list_kafka_topics(bootstrap_servers):
admin_client = AdminClient({'bootstrap.servers': bootstrap_servers})
topics = admin_client.list_topics().topics
topic_names = list(topics.keys())
return topic_names
bootstrap_servers = 'BOOTSTRAPS'
topics = list_kafka_topics(bootstrap_servers)
print("Lista de Topics en Kafka:")
for topic in topics:
print(topic)
型
总是出现的错误是:
Failed to resolve 'kafka:9092': No such host is known. (after 2703ms in state CONNECT, 1 identical error(s) suppressed)
型
有人可以帮助我,以便摄取数据到一个Kafka集群已经安装在EKS?或事情的新策略总是与摄取数据的目的。
1条答案
按热度按时间mf98qq941#
如果我理解正确的话,你的Kafka服务是在pod中运行的。
要从集群外部访问Kubernetes Pod(即服务),您通常会使用Kubernetes
Ingress
或LoadBalancer
将服务暴露给外部世界。这允许外部流量到达您的服务。然后在Lambda中使用ingress端点。