在python中运行多个任务

brccelvz  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(322)

我有多台服务器,全世界有10000个客户机在使用。每个客户机都可以启动一个在服务器上运行大约需要5分钟的任务。如果服务器已被完全占用,则任务需要排队。
这里的问题是什么样的库体系结构可以支持这个问题?具体目标如下:
同时并行监视和运行多个任务
监视资源,并且只有在资源足够时才从队列中获取任务

oiopk7p5

oiopk7p51#

嗯,这可能是celery 和花卉监测使用的情况。
celery 是一个简单、灵活、可靠的分布式系统,可以处理大量的消息,同时为操作提供维护此类系统所需的工具。它是一个专注于实时处理的任务队列,同时还支持任务调度。
celery 可以满足这两个要求。虽然这可能是一个多一点的工作,但它是有可能的规模和减少cpu,内存使用时,系统闲置或您需要消耗更少的内存。
我将指出有关此主题的一些链接:
celery -最小化内存消耗
https://www.vinta.com.br/blog/2018/dealing-resource-consuming-tasks-celery/
https://medium.com/@alaminopu.me/solving-rabbitmq-high-cpu-memory-usages-problem-with-celery -d4172ba1c6b3
如果您希望将它与apachekafka集成,请检查这个问题

a14dhokn

a14dhokn2#

您必须设置一个消息代理,如rabbitmq或redis。如果队列的数据应该是持久的,我建议您使用rabbitmq。要发送和接收任务,您可以使用celery ,celery 允许您将任务发送到队列,并在celery 工人中运行这些任务。用于队列监视。在这个时代,一个非常好的实践是用docker实现所有的体系结构。我向您展示docker-compose.yml的一个示例,它设置了3个容器:兔子、花和celery 。你唯一要做的就是运行命令 docker-compose up -d :

version: '3.3'
services:
  rabbit:
    image: rabbitmq:3-management-alpine
    restart: always
    environment:
      RABBITMQ_ERLANG_COOKIE: cookie
      RABBITMQ_DEFAULT_USER: Admin 
      RABBITMQ_DEFAULT_PASS: 1234
    ports:
      - 5672:5672
      - 15672:15672

  celery:
    build: 
      context: .
    volumes: 
      - ./src:/app
    command: celery -A appendapp worker --loglevel=debug
    environment: 
      RABBITMQ_HOST: rabbit

  flower:
    image: mher/flower
    restart: always
    ports:
      - 5555:5555
    command:
      - "--broker=amqp://Admin:1234@rabbit//"

./src/appendapp.py的代码从json文件中读取一个列表,然后添加一个项,最后将更新后的列表保存到文件中。代码如下所示:

from celery import Celery
import os, json, time

# Where the downloaded files will be stored

BASEDIR="./hola.json"

RABBIT_HOST = os.getenv("RABBITMQ_HOST") or "localhost"

# Create the app and set the broker location (RabbitMQ)

app = Celery('appendapp',
             backend='rpc://',
             broker=f'pyamqp://Admin:1234@{RABBIT_HOST}//')

@app.task
def add_item(item):
    #time.sleep(2.5)
    file = open(BASEDIR)
    data = json.load(file)
    data.append(item)
    file.close()
    wfile = open(BASEDIR, "w")
    json.dump(data, wfile)
    wfile.close()
    return f"Se agrego {item}"

相关问题