我正在做一个项目,使用Kafka的生产者和消费者,以便每两个小时从news_API获取文章(有特定主题),然后与消费者一起将它们保存在mongodb中。
所以我创建了三个类一个用于KafkaAdminClient一个用于KafkaProducer一个用于KafkaConsumer。
我的Kafka的服务器运行在一个Docker容器上。主要的应用程序是一个 flask 应用程序,我在那里启动所有的线程和Kafka的线程。
我一直试图改变很多小东西,但它似乎很不稳定,我不知道为什么。首先,数据在消费者上,并最终在随机时间进入mongodb。然后,消费者中的旧主题不会被删除,数据库不断填充新旧值。
现在我在消费者中放置了一个组,并添加了kafkaAdminClient类,我在消费者中根本没有得到消息。
articleretrieval-flask_api-1 | WARNING:kafka.cluster:Topic health is not available during auto-create initialization articleretrieval-flask_api-1 | WARNING:kafka.cluster:Topic business is not available during auto-create initialization articleretrieval-flask_api-1 | WARNING:kafka.cluster:Topic war is not available during auto-create initialization articleretrieval-flask_api-1
| WARNING:kafka.cluster:Topic motorsport is not available during auto-create initialization articleretrieval-flask_api-1
| WARNING:kafka.cluster:Topic sources is not available during auto-create initialization articleretrieval-flask_api-1
| WARNING:kafka.cluster:Topic science is not available during auto-create initialization articleretrieval-flask_api-1
| WARNING:kafka.cluster:Topic technology is not available during auto-create initialization articleretrieval-flask_api-1
| WARNING:kafka.cluster:Topic education is not available during auto-create initialization articleretrieval-flask_api-1
| WARNING:kafka.cluster:Topic space is not available during auto-create initialization articleretrieval-flask_api-1
| INFO:kafka.consumer.subscription_state:Updated partition assignment: [] articleretrieval-flask_api-1
| INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka:29092 <connected> [IPv4 ('172.19.0.4', 29092)]>: Closing connection.
kafkaConsumerThread.py:
x一个一个一个一个x一个一个二个x
kafkaAdminClient.py:
class KafkaAdminThread:
def __init__(self,topics):
self.topics = topics
def start(self):
admin_client = KafkaAdminClient(
bootstrap_servers=['kafka:29092'],
client_id='my_client'
)
topic_list = []
for topic in self.topics:
topic_list.append(NewTopic(name=topic, num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
app.py:
if __name__ == "__main__":
# Creating a new connection with mongo
# threading.Thread(target=lambda: app.run(port=8080, host="0.0.0.0",debug=True,use_reloader=False)).start()
executor = ThreadPoolExecutor(max_workers=4)
producerThread = KafkaProducerThread(TOPICS,logging)
adminThread = KafkaAdminThread(TOPICS)
executor.submit(adminThread.start)
flaskThread = threading.Thread(target=lambda: app.run(port=8080, host="0.0.0.0", debug=True, use_reloader=False))
executor.submit(flaskThread.start())
time.sleep(15)
executor.submit(producerThread.start)
consumerThread = KafkaConsumerThread(TOPICS, db,logging)
executor.submit(consumerThread.start)
docker-compose.yml:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
container_name: kafka_broker_1
image: wurstmeister/kafka
links:
- zookeeper
ports:
- "9092:9092"
- "29092:29092"
depends_on:
- zookeeper
environment:
KAFKA_ADVERTISED_HOSTNAME: kafka
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock%
flask_api:
build:
context: . #Very important it refers where the root will be for the build.
dockerfile: Dockerfile
links:
- kafka
environment:
- FLASK-KAFKA_BOOTSTRAP-SERVERS=kafka:29092
- SERVER_PORT=8080
ports:
- "8080:8080"
depends_on:
- kafka
1条答案
按热度按时间qacovj5a1#
消费者中的旧主题不会被删除
在你显示的代码中没有删除主题。它们被删除的唯一方法是Kafka容器重新启动,因为你没有为Kafka或Zookeeper挂载一个卷来持久化它们。
并且数据库不断地被新的和旧的值填充。
我假设你的生产者没有记录到目前为止它读过的源代码?如果是这样,你将在主题中得到重复的结果。我建议使用
kafka-console-consumer
来调试生产者是否真的按照你想要的方式工作。同样,您禁用了消费者自动提交,我没有看到手动提交代码,所以当消费者重新启动时,它将重新处理主题中的任何现有数据。Group / AdminClient设置应该不会影响这一点,但设置一个组将允许您实际维护偏移跟踪。
在稳定性方面,我以前使用过没有线程的Flask和Kafka,它工作得很好。至少,一个生产者......我的建议是为负责写入数据库的消费者创建一个完全独立的容器。您不需要Flask框架的开销。或者,推荐使用Kafka Connect Mongo sink来代替。
顺便说一句,wurstmeister容器支持通过环境变量自己创建主题。