【第 4 章:Celery 是什么?它解决了什么问题】

到这一章为止,你已经真正理解了 RabbitMQ

  • 它能路由
  • 能存消息
  • 能投递

那接下来一个自然的问题是:
“那我为什么还需要 Celery?直接用 RabbitMQ + pika 不行吗?”

这一章,就是来回答这个“架构分水岭问题”的。


1️⃣ 本章要解决什么问题(Why)

假设你已经用 RabbitMQ + pika 跑起来了消息通信,很快你会遇到一串工程级痛点

  1. 任务是什么?
  • 只是一个字符串?JSON?函数调用?
  • 参数、版本、序列化怎么统一?
  1. 怎么并发执行?
  • 一个进程拉几条消息?
  • 多进程还是多线程?
  • CPU 密集 vs IO 密集怎么区分?
  1. 失败怎么办?
  • 失败要不要重试?
  • 重试间隔?次数?
  • 哪些错误该重试,哪些不该?
  1. 结果去哪?
  • 谁来记录任务成功/失败?
  • 怎么查询任务状态?
  • 前端如何感知完成?
  1. 任务怎么路由?
  • 不同任务进不同队列?
  • 不同 worker 处理不同任务?
  • 动态扩缩容怎么做?

如果你全靠 pika 自己写,这意味着:

你在“手写一个简化版 Celery”。

Celery 存在的意义就是:

把“分布式任务系统”的通用复杂度,做成一套成熟框架。


2️⃣ 核心概念与角色(What)

先给 Celery 一个准确定位

Celery 是一个分布式异步任务队列/任务调度框架
它让你把“函数调用”升级为“可分布式执行的任务”。

Celery 里最重要的 4 个角色(先记住名字)

角色 本质
Task 被执行的“任务定义”(函数 + 元数据)
Producer 提交任务的一方(通常是 Web 服务)
Broker 消息中间件(RabbitMQ / Redis 等)
Worker 实际执行任务的进程

你会发现:Celery 的角色 完全覆盖了 MQ 模型,但语义更高层。


3️⃣ 工作原理(How)

我们从“你写一行 delay()”开始,拆解 Celery 到底做了什么。


3.1 一行代码背后的真实流程

1
send_email.delay(user_id=42)

背后发生的是一整套流程:

sequenceDiagram
    participant App as Web / Producer
    participant C as Celery Client
    participant B as Broker (RabbitMQ)
    participant W as Worker

    App->>C: 调用 task.delay()
    C->>C: 序列化任务名 + 参数
    C->>B: 发送消息
    B-->>W: 投递到队列
    W->>W: 反序列化 + 执行任务函数

📌 关键点

  • 你“调用函数”的感觉,其实是 发消息
  • Worker 才是真正调用函数的地方

3.2 Celery 帮你隐藏了哪些复杂度?

如果不用 Celery,你需要自己处理:

  • 任务序列化 / 反序列化
  • 消息格式与版本
  • Worker 并发模型
  • 心跳、断线重连
  • ACK / 重试 / 失败处理
  • 任务超时、限速
  • 任务路由
  • 结果存储

Celery 的核心价值一句话总结:

Celery 把“分布式执行函数”这件事,封装成了一个你能放心使用的基础设施。


4️⃣ 与 RabbitMQ / Celery 的关系(边界非常重要)

4.1 RabbitMQ vs Celery 的分工边界

维度 RabbitMQ Celery
消息存储
消息路由 ✅(Exchange/Binding) ⚠️(通过配置使用)
任务语义
重试策略
并发执行
失败处理
结果管理

📌 关键认知

RabbitMQ 是“消息基础设施”,
Celery 是“任务执行与调度系统”。

4.2 Celery 如何”使用”RabbitMQ?

  • Celery 把 任务 当作 消息
  • 把 RabbitMQ 当作 Broker
  • 利用:
    • Exchange → 决定任务路由
    • Queue → 存任务
    • Consumer → Worker 拉取任务

但:

RabbitMQ 并不知道什么是“任务”
Celery 也不负责“存消息”


5️⃣ 最小可运行示例(Practice)

这一章给你一个真正的 Celery 最小示例,目标是:

跑起来一次任务,让你“感觉到 Celery 在干活”。


5.1 环境准备

1
pip install celery

RabbitMQ 已启动(复用前面章节)。


5.2 定义 Celery 应用与任务

1
2
3
4
5
6
7
8
9
10
11
12
13
# tasks.py
from celery import Celery
import time

app = Celery(
"demo",
broker="amqp://guest:guest@localhost:5672//",
)

@app.task
def add(x, y):
time.sleep(2)
return x + y

5.3 启动 Worker(非常关键的一步)

1
celery -A tasks worker --loglevel=info

你会看到:

  • worker 启动
  • 连接 RabbitMQ
  • 声明 exchange / queue
  • 等待任务

5.4 提交任务(Producer)

1
2
3
4
5
# call.py
from tasks import add

r = add.delay(3, 5)
print("task id:", r.id)

你会观察到:

  • call.py 立刻返回
  • worker 进程中执行 add
  • 2 秒后完成

📌 这就是 “函数 → 分布式异步任务” 的最小闭环


6️⃣ 📒 本章笔记总结

  • Celery 是 分布式异步任务执行框架
  • 它解决的是:
    • 任务定义
    • 任务投递
    • 并发执行
    • 重试/失败处理
    • 路由与结果管理
  • RabbitMQ vs Celery 分工:
    • RabbitMQ:消息存储 + 路由
    • Celery:任务语义 + 执行调度
  • task.delay() ≠ 本地函数调用
    • 它是一次 消息发送
  • Worker 才是真正执行任务的地方
  • 如果你用 pika 手写任务系统,本质是在“重复造 Celery 的轮子”


【第 5 章:Celery 架构全景解析(Producer / Broker / Worker / Result Backend)】

到这一章,我们要完成一个关键跃迁
从“会用 Celery” → “看懂 Celery 的整体架构与设计取舍”

这一章结束后,你脑子里应该能自动浮现一张图:
👉 一个任务,从 Web 出发,如何穿过 RabbitMQ,被 Worker 执行,并最终(可选)返回结果。


1️⃣ 本章要解决什么问题(Why)

在第 4 章,你已经知道:

1
add.delay(1, 2)

这行代码会:

  • 不阻塞当前进程
  • 把任务交给别的进程/机器执行

但工程上,真正重要的问题是:

  1. 任务是谁发起的?
  2. 消息到底存在哪?
  3. Worker 如何拉任务?
  4. 任务结果放哪里?
  5. 如果 Worker 挂了/重启,系统还能不能继续?

如果你搞不清这些边界:

  • 你会乱配 broker / backend
  • 不知道该不该开 result backend
  • 不敢改并发、prefetch、ack
  • 出问题时完全不知道从哪查

👉 所以这一章的目标是:
把 Celery 的“角色分工 + 数据流 + 责任边界”一次性讲清楚。


2️⃣ 核心概念与角色(What)

Celery 的整体架构由 4 个核心角色组成(必须全部记住):

角色 名称 本质职责
Producer 任务生产者 提交任务(通常是 Web/API)
Broker 消息中间件 存任务、投递任务
Worker 执行者 拉任务并执行
Result Backend 结果存储 保存任务状态/结果(可选)

这 4 个角色彼此解耦,各司其职,是 Celery 架构稳定性的核心。


3️⃣ 工作原理(How)

我们从“一个任务的完整生命周期”来拆。


3.1 全景架构图(一定要能背出来)

flowchart LR
    P[Producer
Web / API] -->|send task| B[(Broker
RabbitMQ)] B -->|deliver task| W[Worker] W -->|execute task| W W -->|store result| R[(Result Backend)]

注意:Producer 从不直接和 Worker 通信


3.2 Producer(任务生产者)

它是谁?

  • Web 服务
  • 管理后台
  • 定时任务(beat)
  • CLI 脚本

它做什么?

  • 把“函数调用”转换成“任务消息”
  • 序列化:
    • task 名
    • 参数
    • 路由信息
    • 重试策略等元数据
  • 发送给 Broker
1
add.delay(3, 5)

📌 关键认知

Producer 的生命周期极短:
“发出去就不管了”(是否关心结果取决于你)。


3.3 Broker(RabbitMQ)

Broker 只做 3 件事:

  1. 接收任务消息
  2. 按 Exchange / Queue / Binding 路由
  3. 把消息交给某个 Worker

Broker 不知道:

  • 任务是什么函数
  • 是否执行成功
  • 是否需要重试

📌 重要结论

Broker ≠ 调度器 ≠ 执行器
它只是一个可靠的消息中转站


3.4 Worker(任务执行者)

Worker 是 Celery 的“重心”

它的职责包括:

  • 从 Broker 拉任务
  • 反序列化任务
  • 调用真正的 Python 函数
  • 管理并发(进程/线程/协程)
  • 捕获异常
  • 决定:
    • ack
    • retry
    • fail
sequenceDiagram
    participant W as Worker
    participant B as Broker

    W->>B: 请求任务
    B-->>W: 投递任务
    W->>W: 执行函数
    W->>B: ack / reject

📌 非常重要

Worker 是有状态的进程
而 Producer 和 Broker 都是“相对无状态”的。


3.5 Result Backend(结果存储)

这是最容易被误解的一块。

Result Backend 是干什么的?

  • 存任务状态:
    • PENDING
    • STARTED
    • SUCCESS
    • FAILURE
  • 存返回值或异常信息

常见实现:

  • Redis
  • 数据库
  • RPC backend(少见)
flowchart LR
    W[Worker] -->|update state/result| R[(Result Backend)]
    P[Producer] -->|query task result| R

📌 关键点(必记)

  • Result Backend ≠ Broker
  • Broker 存“任务消息”
  • Backend 存“任务结果”

4️⃣ 与 RabbitMQ / Celery 的关系(边界再次强调)

4.1 RabbitMQ 在架构中的位置

  • RabbitMQ = Broker
  • 它只负责:
    • 任务消息的可靠投递
  • 它不参与:
    • 执行
    • 状态管理
    • 重试决策

4.2 Celery 在架构中的职责

Celery 横跨:

  • Producer SDK
  • Worker Runtime
  • 路由/重试/并发/超时策略
  • Result Backend 接口

📌 架构总结一句话

RabbitMQ 是“运输系统”,Celery 是“任务执行与调度系统”。


5️⃣ 最小可运行示例(Practice)

目标:把 4 个角色都跑起来并“看到它们协作”


5.1 tasks.py(定义 Celery + 任务)

1
2
3
4
5
6
7
8
9
10
11
12
13
from celery import Celery
import time

app = Celery(
"demo",
broker="amqp://guest:guest@localhost:5672//",
backend="redis://localhost:6379/0", # Result Backend
)

@app.task(bind=True)
def slow_add(self, x, y):
time.sleep(3)
return x + y

5.2 启动 Worker

1
celery -A tasks worker --loglevel=info

你会看到:

  • 连接 broker
  • 注册任务
  • 启动并发池

5.3 提交任务并查询结果

1
2
3
4
5
6
from tasks import slow_add

r = slow_add.delay(10, 20)
print("task id:", r.id)

print("result:", r.get()) # 阻塞等待(只用于 demo)

📌 注意

在 Web 服务中 **不要随便用 get()**,这里只是演示 Result Backend 的作用。


6️⃣ 📒 本章笔记总结

  • Celery 架构四角色:
    • Producer:提交任务
    • Broker:存储并投递任务(RabbitMQ)
    • Worker:执行任务
    • Result Backend:存任务状态/结果(可选)
  • Producer 永远不直接找 Worker
  • Broker 永远不知道任务是否成功
  • Worker 决定 ack / retry / fail
  • Result Backend 用于:
    • 查询结果
    • 任务编排(chain/chord)
  • Broker ≠ Result Backend
  • Worker 是系统中最“重”的组件(并发、状态、资源消耗)

【第 6 章:RabbitMQ + Celery 如何协同工作(完整任务生命周期)】

到这一章,我们要做一件极其重要的事
把前 1~5 章的所有“零件”,组装成一条“能在你脑中自动播放的任务时间线”。

看完这一章,你应该能做到:

  • 看一条 Celery 日志,就知道任务走到哪一步
  • 出问题时,知道该查 RabbitMQ 还是 Celery Worker
  • 明白 “为什么 Celery 要这样设计”,而不是死记配置项

1️⃣ 本章要解决什么问题(Why)

前面我们已经分别理解了:

  • RabbitMQ:Exchange / Queue / Binding / routing
  • Celery:Producer / Broker / Worker / Result Backend

但工程中真正让人迷糊的是:

“它们是怎么在时间上、责任上配合的?”

常见困惑包括:

  • Celery 什么时候创建 Exchange / Queue?
  • routing_key 到底在哪一步生效?
  • Worker 是主动推还是被动拉?
  • ack 在什么时候发?
  • 任务失败后,消息发生了什么?

👉 本章目标:
从“调用 delay()”开始,到“任务成功/失败结束”为止,完整拆解一次任务的生命周期。


2️⃣ 核心概念与角色(What)

在这一章里,你只需要抓住 5 个关键实体

实体 属于 核心职责
Task Celery 任务定义(函数 + 元数据)
Producer Celery 序列化并发送任务
Broker RabbitMQ 存储 & 路由任务消息
Worker Celery 拉取、执行、确认任务
Result Backend Celery 保存任务状态/结果(可选)

📌 关键认知

RabbitMQ 只负责“消息”;
Celery 负责“任务语义 + 执行控制”。


3️⃣ 工作原理(How)

下面是完整任务生命周期的 8 个阶段
我会先给总览图,再逐步拆解。


3.1 完整任务生命周期(总览图)

sequenceDiagram
    participant P as Producer (Web)
    participant B as Broker (RabbitMQ)
    participant W as Worker (Celery)
    participant R as Result Backend

    P->>B: 1. publish task message
    B-->>W: 2. deliver via queue
    W->>W: 3. deserialize task
    W->>W: 4. execute function
    W->>B: 5. ack / reject
    W->>R: 6. store state/result

3.2 阶段 1:Producer 调用 delay()

1
send_email.delay(user_id=42)

Celery Producer 做了什么?

  1. 找到 task 名(如 tasks.send_email
  2. 序列化:
    • task 名
    • 参数
    • routing_key / queue
    • retry / countdown / ETA 等元数据
  3. 构造一条 消息
  4. 发送给 RabbitMQ(Exchange)

📌 注意

此时任务“还没开始执行”,只是“被投递”。


3.3 阶段 2:RabbitMQ 路由消息

RabbitMQ 做的事情非常“冷静”:

  1. Exchange 收到消息
  2. 用 routing_key 匹配 binding
  3. 把消息放进一个或多个 Queue
  4. 等待 Worker 来取
flowchart LR
    P --> E[Exchange]
    E --> Q[Queue]

📌 RabbitMQ 不知道

  • 这是 Celery 任务
  • 任务是否重要
  • 任务是否会失败

3.4 阶段 3:Worker 拉取任务(pull 模型)

Worker 启动后会:

  • 连接 Broker
  • 声明自己关心的 Queue
  • 主动从 Queue 拉任务
sequenceDiagram
    W->>B: basic.consume(queue)
    B-->>W: deliver message

📌 关键点(必记)

RabbitMQ 不“推任务”,Worker 是主动拉的
(这是背压、限速、稳定性的基础)


3.5 阶段 4:Worker 反序列化 & 执行任务

Worker 收到消息后:

  1. 反序列化消息 → 找到 task 函数
  2. 放入执行池(进程 / 线程 / 协程)
  3. 执行 Python 函数
1
message → task name → function → run

📌 工程视角

这一步才是真正“消耗 CPU / IO”的地方。


3.6 阶段 5:ACK / REJECT(非常关键)

执行完成后,Worker 决定:

  • 成功ACK
  • 失败但可重试REJECT + requeue
  • 失败不可重试REJECT(丢弃或死信)
flowchart LR
    W -->|success| ACK
    W -->|retry| REQUEUE
    W -->|fail| DROP

📌 核心结论

ACK 是 Worker 发的,不是 RabbitMQ 自动的
这也是 Celery 能保证“至少一次执行”的基础。


3.7 阶段 6:写入 Result Backend(可选)

如果你配置了 Result Backend:

  • Worker 会写:
    • 状态(STARTED / SUCCESS / FAILURE)
    • 返回值 / 异常信息

Producer 或其他任务可以:

1
2
AsyncResult(task_id).status
AsyncResult(task_id).get()

📌 注意

Result Backend 和 Broker 完全是两条线


3.8 阶段 7:任务生命周期结束

此时:

  • RabbitMQ 中:消息已被 ACK(消失)
  • Worker:释放资源
  • Backend:保留结果(直到过期)

整个任务正式结束。


4️⃣ 与 RabbitMQ / Celery 的关系(再次拉清边界)

4.1 RabbitMQ 的责任边界

  • 接收消息
  • 路由消息
  • 存储消息
  • 投递消息

不做

  • 执行
  • 重试决策
  • 状态管理

4.2 Celery 的责任边界

  • 定义任务
  • 控制并发
  • 执行函数
  • 重试策略
  • ACK / REJECT
  • 结果管理

📌 一句话

RabbitMQ = 运输系统
Celery = 任务执行与控制系统


5️⃣ 最小可运行示例(Practice)

目标:用日志亲眼看到“生命周期”


5.1 任务定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# tasks.py
from celery import Celery
import time

app = Celery(
"demo",
broker="amqp://guest:guest@localhost:5672//",
)

@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={"max_retries": 3})
def unreliable_task(self, x):
time.sleep(2)
if x % 2 == 1:
raise ValueError("fail on odd")
return x * 2

5.2 启动 Worker

1
celery -A tasks worker --loglevel=info

5.3 发送任务

1
2
3
4
from tasks import unreliable_task

unreliable_task.delay(3)
unreliable_task.delay(4)

观察:

  • 奇数任务 → fail → retry → 最终失败
  • 偶数任务 → 成功 → ACK

📌 结合日志,你能清楚看到:

  • 消息被重新入队
  • 重试间隔生效
  • ACK 只在成功后发送

6️⃣ 📒 本章笔记总结(可直接记录)

  • Celery + RabbitMQ 协同的本质:消息 + 任务语义分离
  • 完整生命周期:
    1. Producer 序列化任务 → 发消息
    2. RabbitMQ 路由 → Queue
    3. Worker 主动拉任务
    4. Worker 执行函数
    5. Worker 决定 ACK / RETRY / FAIL
    6. (可选)写 Result Backend
  • RabbitMQ 不知道任务是否成功
  • ACK 由 Worker 决定
  • Worker 是系统中最关键、最“重”的组件
  • pull 模型是系统稳定性的关键(背压)


【第 7 章:Celery 任务执行流程深度拆解】

这一章我们要把 Celery Worker 从“黑盒”拆成“管线”。
你会真正理解:

  • 任务消息进 Worker 后经历了哪些内部阶段
  • concurrencyprefetchacks_late 这些配置为什么会改变可靠性/吞吐
  • 为什么新手最容易在“并发 + ACK + 重试”上踩坑

1️⃣ 本章要解决什么问题(Why)

当系统上线后,你很快会遇到这些“工程真实问题”:

  1. 为什么我设置了并发 20,吞吐却上不去?
  2. 为什么偶尔会出现任务重复执行?
  3. 为什么 Worker 重启后,有些任务丢了/有些任务又被重跑?
  4. prefetch 是什么?为什么它会导致队列看起来“空了但任务还在跑”?
  5. IO 任务该用协程还是进程?CPU 任务怎么配?

这些问题都指向同一件事:

你需要看懂 Worker 内部的执行流程与关键开关,否则只能靠“玄学调参”。


2️⃣ 核心概念与角色(What)

本章聚焦 Worker 内部的几个关键模块(不背源码名,也能理解):

概念 作用 你要记住什么
Consumer(消费循环) 从 Broker 拉消息 pull + prefetch 发生在这里
Prefetch(预取) 先拉一批消息到本地缓存 吞吐↑,公平性↓,内存占用↑
QoS(质量控制) 控制最多“在途未确认”消息数 prefetch_count 的本质
Pool(执行池) 实际跑任务的并发单元 进程/线程/协程,决定性能模型
ACK 时机 什么时候告诉 broker “我处理完了” 可靠性与重复执行的核心开关
Retry(重试) 失败后再投递 由 Worker/Celery 决策,不是 RabbitMQ

再补一个非常重要的术语:

  • In-flight / Unacked 消息
    消息已经投递给 Worker,但还没 ack(RabbitMQ 认为“还没处理完”)

3️⃣ 工作原理(How)

我们用“消息进入 Worker 后的内部管线”来拆解:
从 RabbitMQ 投递给 Worker 的那一刻起,发生什么?


3.1 Worker 内部任务管线(总览)

flowchart LR
    A[Broker 投递消息] --> B[Worker Consumer 接收]
    B --> C[本地预取缓冲区
prefetch buffer] C --> D[反序列化 & 定位 Task] D --> E[投递到执行池 Pool] E --> F[执行任务函数] F --> G{成功/失败?} G -->|成功| H[ACK] G -->|失败| I[Retry / Reject / Fail] H --> J[(可选) 写 Result Backend] I --> J

你只要抓住:

Consumer 拉消息 + Pool 执行 + ACK 确认
这三段是 Celery Worker 的核心。


3.2 Prefetch:为什么队列”空了但任务还在跑”?

prefetch 的本质:
Worker 为了提高吞吐,会提前从 RabbitMQ 拉一批消息放在本地缓存里。

ASCII 示意:

1
2
3
RabbitMQ Queue:  [m1 m2 m3 m4 m5 m6 ...]
Worker buffer: [m1 m2 m3 m4] <- 预取到本地(未必已开始执行)
Pool running: [m1 m2] <- 真正在跑的

因此 RabbitMQ 管理台上看到:

  • 队列消息变少甚至为 0
    但实际上:
  • Worker 本地还有一批“未开始执行”的消息

📌 工程结论

队列长度不是系统“还剩多少任务”的真实指标
还要看 in-flight/unacked 和 worker 并发执行情况。


3.3 Prefetch 数 = 并发数 × prefetch_multiplier

Celery 默认(常见情况):

  • worker_concurrency = N
  • worker_prefetch_multiplier = M
  • 那么理论预取上限:N * M

例:并发 8,multiplier 4 → 预取 32 个任务到本地。

📌 影响:

  • 吞吐更高(减少 broker 往返)
  • 但会导致:
    • 任务分配不公平(某个 worker 抢太多)
    • 长任务时拖尾严重(别的 worker 空闲但任务被抢走)

✅ 建议(经验值):

  • 长任务worker_prefetch_multiplier = 1
  • 短任务:可以 >1 追求吞吐

3.4 ACK 时机:可靠性与重复执行的核心

这里是最关键的可靠性分水岭。

方案 A:早 ACK(默认常见:acks_late=False

  • Worker 一拿到任务就 ack
  • 优点:队列看起来干净、吞吐高
  • 缺点:任务执行中 worker 崩了 → 任务可能丢失(broker 以为已处理完)

方案 B:晚 ACK(acks_late=True

  • Worker 执行完才 ack
  • 优点:worker 崩了 → RabbitMQ 会重新投递 → 不丢任务
  • 缺点:可能出现重复执行(至少一次语义),任务必须幂等
sequenceDiagram
    participant B as Broker
    participant W as Worker

    B-->>W: deliver msg
    alt acks_late=False
        W->>B: ACK immediately
        W->>W: execute task
    else acks_late=True
        W->>W: execute task
        W->>B: ACK after success
    end

📌 必背结论

acks_late=True → 至少一次(可能重复,但不易丢)
acks_late=False → 至多一次(不重复,但可能丢)


3.5 Retry:失败后到底发生了什么?

Celery 的 retry(概念上)通常是:

  • 任务抛异常
  • Celery 捕获后决定:
    • 重新发送一条“同任务的新消息”(可能带 countdown/eta)
    • 或 reject/requeue(视策略)

重点:

RabbitMQ 只负责“再次投递消息”,
是否重试、重试几次、间隔多少,是 Celery 的策略。


3.6 Pool(执行池):并发模型决定性能上限

Celery worker 的并发池常见有:

  • prefork(多进程,默认常见):适合 CPU 任务、隔离好
  • threads(多线程):适合部分 IO,但受 GIL 影响
  • gevent/eventlet(协程):高并发 IO(但要配合 monkey patch)

📌 工程建议(简单可靠版):

  • 不确定选什么:prefork(最稳)
  • 大量 IO(请求第三方、爬虫):考虑 gevent(但要懂它的限制)
  • CPU 密集:prefork + 把并发设为 CPU 核数附近

4️⃣ 与 RabbitMQ / Celery 的关系

这一章要把“责任切得更细”:

  • RabbitMQ:
    • 负责把消息投递给 worker
    • 维护 unacked 状态
    • worker 掉线时重新投递 unacked(取决于 ack 时机)
  • Celery Worker:
    • 决定什么时候 ack(早/晚)
    • 决定 prefetch(一次拉多少)
    • 决定并发池怎么跑(prefork/threads/gevent)
    • 决定失败是否 retry、怎么 retry

📌 最核心边界:

RabbitMQ 只知道“消息是否 ack”。
Celery 决定“任务何时算完成”。


5️⃣ 最小可运行示例(Practice)

目标:用一个 demo 让你“肉眼看到”

  • prefetch 的效果
  • acks_late 的差异(重启 worker 时)

5.1 tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from celery import Celery
import time
import os

app = Celery(
"demo",
broker="amqp://guest:guest@localhost:5672//",
backend="redis://localhost:6379/0",
)

@app.task(bind=True)
def sleep_task(self, seconds, tag):
print(f"[{os.getpid()}] start {tag}")
time.sleep(seconds)
print(f"[{os.getpid()}] done {tag}")
return tag

5.2 启动 worker(观察预取)

建议这样启动(长任务更公平):

1
celery -A tasks worker --loglevel=info --concurrency=2

并在配置里设置(你可以临时在 tasks.py 中加):

1
app.conf.worker_prefetch_multiplier = 1

5.3 发送一堆任务

1
2
3
4
from tasks import sleep_task

for i in range(10):
sleep_task.delay(5, f"t{i}")

观察现象:

  • multiplier 大时:某个 worker 会“抢走很多任务”,另一个可能闲
  • multiplier=1 时:任务分配更均匀

5.4 观察 ack 行为(概念实验)

  • acks_late=False:任务一到就 ack,worker 执行中崩溃可能丢
  • acks_late=True:执行完才 ack,worker 中途崩溃会被重新投递(可能重复)

(生产环境要配合幂等设计,后面第 9 章会系统讲。)


6️⃣ 📒 本章笔记总结(可直接记录)

  • Worker 内部管线:接收 → 预取缓冲 → 反序列化 → 投递执行池 → 执行 → ACK/Retry →(可选)写结果
  • Prefetch:
    • 预先拉取一批消息到 worker 本地
    • prefetch_count ≈ concurrency * prefetch_multiplier
    • 长任务建议 prefetch_multiplier=1
  • ACK 时机:
    • acks_late=False:早 ACK → 吞吐高但可能丢
    • acks_late=True:晚 ACK → 不易丢但可能重复(至少一次)
  • RabbitMQ 只关心消息是否 ACK;Celery 决定任务完成语义
  • 并发池选择:
    • 默认 prefork 最稳
    • IO 高并发才考虑 gevent(理解其限制)
  • 调参原则:
    • 先保证可靠性(ACK/幂等)
    • 再调公平性(prefetch)
    • 最后追吞吐(并发/池模型)



本站总访问量 加载中...
今日访客数 加载中...
本页总访客数 加载中...
发表了 36 篇文章 🔸 总计 111.4k 字