程序员最近都爱上了这个网站  程序员们快来瞅瞅吧!  it98k网:it98k.com

本站消息

站长简介/公众号

  出租广告位,需要合作请联系站长

+关注
已关注

分类  

暂无分类

标签  

暂无标签

日期归档  

暂无数据

Celery and docker : 列出所有 celery 服务中的所有任务

发布于2023-02-03 21:30     阅读(1015)     评论(0)     点赞(24)     收藏(5)


我有 web、celery_upload、celery_model 作为 3 个服务(最后两个是在 docker-compose 中从 web 创建的,所有 3 个都有不同的启动命令)。

我有以下设置

# celery_app.py

celery_app = Celery(__name__, broker=REDIS_URL, backend=REDIS_URL)

imports = (
    "proj.upload.tasks",
)

task_queues = (
    Queue(name="upload", exchange=Exchange(name="upload", type="direct"), routing_key="upload"),
    Queue(name="model", exchange=Exchange(name="model", type="direct"), routing_key="model"),
)

task_routes = ([
    ("proj.upload.tasks.job_one", {"queue": "upload", "routing_key": "upload"}),
    ("proj.upload.tasks.job_two", {'queue': "model", "routing_key": "model"}),
],)

celery_app.conf.update(
    # task_create_missing_queues=True,
    result_extended=True,
    worker_prefetch_multiplier=1,
    imports=imports,
    task_queues=task_queues,
    task_routes=task_routes,
)

# tasks.py both long running tasks

# case-1 的 run.py

result_one = job_one.apply_async(args=(n_clicks,), queue="upload", routing_key="upload")
result_two = job_two.apply_async(args=(n_clicks,), queue="model", routing_key="model")

# 案例 2 的 run.py

result_one = job_one.apply_async(args=(n_clicks,), queue="upload", routing_key="upload")
result_two = job_two.apply_async(args=(n_clicks,), queue="upload", routing_key="upload")
result = job_model.apply_async(args=(n_clicks,), queue="model", routing_key="model")

# docker-compose

celery_upload:
 command: ["celery", "--app=proj.celery_app", "worker", "--queues=upload"]

celery_model:
 command: ["celery", "--app=proj.celery_app", "worker", "--queues=model"]

案例 1: 2 个任务 job_one、job_two 当前在同一个文件中/upload/task.py,但分别在 2 个单独的服务 celery_upload、celery_model 上运行。

问题: 所有服务启动并运行。但是在 docker-compose up 的时候,我在两个 services 下都观察到了。在芹菜标志附近,在“任务”关键字下,两个任务都列在两个服务中。

问题:

  • 这种行为或我编写代码的方式是否可以?
  • 这个保证或我如何检查任务 job_one、job_two 是否分别在单独的服务(celery_upload、celery_model)上运行?

案例 2: 现在假设我有 2 个不同的文件,每个文件中的任务很少。

  • 上传/tasks.py
    • job_one
    • 工作二
  • 模型/tasks.py
    • 工作模型

因为我已经创建了 2 个单独的服务(celery_upload、celery_model)来从两个文件运行任务,如上所示。

问题:

  • 当我将使用 imports=imports [这里我将同时使用"proj.upload.tasks""proj.model.tasks"]这是否也与案例 1 的行为相同,案例 1 将列出两个文件中的所有任务。(我还没有尝试过,但感觉就像案例 1)?
  • 是否可以编写基于服务列出任务的代码,即 celery_upload 服务将只列出和执行/upload/task.py模型的任务。这将确保相关服务正在监听相关队列并执行其相关任务,或者这种行为可以吗?

所有服务中所有任务的列表让我产生了疑问和困惑。请帮忙。

Update-1 为简单起见,我将所有任务都包含在tasks.py文件中celery_app

当我的服务启动时,我看到以下输出

 celery_model |  
 celery_model |  -------------- model@abc.com v5.2.7 (dawn-chorus)
 celery_model | --- ***** ----- 
 celery_model | -- ******* ---- Linux-5.15.0-57-generic-x86_64-with-glibc2.31 2023-01-08 22:42:17
 celery_model | - *** --- * --- 
 celery_model | - ** ---------- [config]
 celery_model | - ** ---------- .> app:          proj.tasks:0x7f076bdc3110
 celery_model | - ** ---------- .> transport:   redis://redis:6379/0
 celery_model | - ** ---------- .> results:     redis://redis:6379/0
 celery_model | - *** --- * --- .> concurrency: 1 (prefork)
 celery_model | -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
 celery_model | --- ***** ----- 
 celery_model |  -------------- [queues]
 celery_model |                 .> model_queue      exchange=model_queue(direct) key=model_queue
 celery_model |                 
 celery_model | 
 celery_model | [tasks]
 celery_model |   . job_one
 celery_model |   . job_two
 celery_model |   . job_model
 celery_upload |  
 celery_upload |  -------------- upload@abc.com v5.2.7 (dawn-chorus)
 celery_upload | --- ***** ----- 
 celery_upload | -- ******* ---- Linux-5.15.0-57-generic-x86_64-with-glibc2.31 2023-01-08 22:42:17
 celery_upload | - *** --- * --- 
 celery_upload | - ** ---------- [config]
 celery_upload | - ** ---------- .> app:          proj.tasks:0x7f8e00f26550
 celery_upload | - ** ---------- .> transport:   redis://redis:6379/0
 celery_upload | - ** ---------- .> results:     redis://redis:6379/0
 celery_upload | - *** --- * --- .> concurrency: 1 (prefork)
 celery_upload | -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
 celery_upload | --- ***** ----- 
 celery_upload |  -------------- [queues]
 celery_upload |                 .> upload_queue     exchange=upload_queue(direct) key=upload_queue
 celery_upload |                 
 celery_upload | 
 celery_upload | [tasks]
 celery_upload |   . job_one
 celery_upload |   . job_two
 celery_upload |   . job_model
 celery_upload | 

正如我们所看到的,所有任务都列在两个服务中。这是正常的还是我遗漏了什么?这造成了混乱。

请建议。


解决方案


暂无回答



所属网站分类: 技术文章 > 问答

作者:黑洞官方问答小能手

链接:https://www.pythonheidong.com/blog/article/1895310/790bee191d918adfbdd0/

来源:python黑洞网

任何形式的转载都请注明出处,如有侵权 一经发现 必将追究其法律责任

24 0
收藏该文
已收藏

评论内容:(最多支持255个字符)