Kafka生产者和消费者无法在带有Docker的python上正常工作

guicsvcw  于 2023-01-20  发布在  Apache
关注(0)|答案(1)|浏览(157)

我正在做一个项目,使用Kafka的生产者和消费者,以便每两个小时从news_API获取文章(有特定主题),然后与消费者一起将它们保存在mongodb中。
所以我创建了三个类一个用于KafkaAdminClient一个用于KafkaProducer一个用于KafkaConsumer。
我的Kafka的服务器运行在一个Docker容器上。主要的应用程序是一个 flask 应用程序,我在那里启动所有的线程和Kafka的线程。
我一直试图改变很多小东西,但它似乎很不稳定,我不知道为什么。首先,数据在消费者上,并最终在随机时间进入mongodb。然后,消费者中的旧主题不会被删除,数据库不断填充新旧值。
现在我在消费者中放置了一个组,并添加了kafkaAdminClient类,我在消费者中根本没有得到消息。

articleretrieval-flask_api-1          | WARNING:kafka.cluster:Topic health is not available during auto-create initialization articleretrieval-flask_api-1          | WARNING:kafka.cluster:Topic business is not available during auto-create initialization articleretrieval-flask_api-1          | WARNING:kafka.cluster:Topic war is not available during auto-create initialization articleretrieval-flask_api-1         
 | WARNING:kafka.cluster:Topic motorsport is not available during auto-create initialization articleretrieval-flask_api-1          
| WARNING:kafka.cluster:Topic sources is not available during auto-create initialization articleretrieval-flask_api-1         
 | WARNING:kafka.cluster:Topic science is not available during auto-create initialization articleretrieval-flask_api-1         
 | WARNING:kafka.cluster:Topic technology is not available during auto-create initialization articleretrieval-flask_api-1         
 | WARNING:kafka.cluster:Topic education is not available during auto-create initialization articleretrieval-flask_api-1          
| WARNING:kafka.cluster:Topic space is not available during auto-create initialization articleretrieval-flask_api-1          
| INFO:kafka.consumer.subscription_state:Updated partition assignment: [] articleretrieval-flask_api-1    
| INFO:kafka.conn:<BrokerConnection node_id=bootstrap-0 host=kafka:29092 <connected> [IPv4 ('172.19.0.4', 29092)]>: Closing connection.

kafkaConsumerThread.py:
x一个一个一个一个x一个一个二个x
kafkaAdminClient.py:

class KafkaAdminThread:
    def __init__(self,topics):
        self.topics = topics

    def start(self):
        admin_client = KafkaAdminClient(
            bootstrap_servers=['kafka:29092'], 
                client_id='my_client'
        )
        topic_list = []
        for topic in self.topics:
            topic_list.append(NewTopic(name=topic, num_partitions=1, replication_factor=1))
        admin_client.create_topics(new_topics=topic_list, validate_only=False)

app.py:

if __name__ == "__main__":
    # Creating a new connection with mongo
    # threading.Thread(target=lambda: app.run(port=8080, host="0.0.0.0",debug=True,use_reloader=False)).start()
    executor = ThreadPoolExecutor(max_workers=4)
    producerThread = KafkaProducerThread(TOPICS,logging)
    adminThread = KafkaAdminThread(TOPICS)
    executor.submit(adminThread.start)
    flaskThread = threading.Thread(target=lambda: app.run(port=8080, host="0.0.0.0", debug=True, use_reloader=False))
    executor.submit(flaskThread.start())
    time.sleep(15)
    executor.submit(producerThread.start)
    consumerThread = KafkaConsumerThread(TOPICS, db,logging)
    executor.submit(consumerThread.start)

docker-compose.yml:

zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"

  kafka:
    container_name: kafka_broker_1
    image: wurstmeister/kafka
    links:
      - zookeeper
    ports:
      - "9092:9092"
      - "29092:29092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_ADVERTISED_HOSTNAME: kafka
      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:29092,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: INSIDE://0.0.0.0:29092,OUTSIDE://0.0.0.0:9092
      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock%

  flask_api:
    build:
      context: . #Very important it refers where the root will be for the build.
      dockerfile: Dockerfile
    links:
        - kafka
    environment:
      - FLASK-KAFKA_BOOTSTRAP-SERVERS=kafka:29092
      - SERVER_PORT=8080
    ports:
      - "8080:8080"
    depends_on:
      - kafka
qacovj5a

qacovj5a1#

消费者中的旧主题不会被删除
在你显示的代码中没有删除主题。它们被删除的唯一方法是Kafka容器重新启动,因为你没有为Kafka或Zookeeper挂载一个卷来持久化它们。
并且数据库不断地被新的和旧的值填充。
我假设你的生产者没有记录到目前为止它读过的源代码?如果是这样,你将在主题中得到重复的结果。我建议使用kafka-console-consumer来调试生产者是否真的按照你想要的方式工作。
同样,您禁用了消费者自动提交,我没有看到手动提交代码,所以当消费者重新启动时,它将重新处理主题中的任何现有数据。Group / AdminClient设置应该不会影响这一点,但设置一个组将允许您实际维护偏移跟踪。
在稳定性方面,我以前使用过没有线程的Flask和Kafka,它工作得很好。至少,一个生产者......我的建议是为负责写入数据库的消费者创建一个完全独立的容器。您不需要Flask框架的开销。或者,推荐使用Kafka Connect Mongo sink来代替。
顺便说一句,wurstmeister容器支持通过环境变量自己创建主题。

相关问题