【第 1 章:为什么需要异步任务与消息队列】
1️⃣ 本章要解决什么问题(Why)
你在做一个 Web 服务(比如下单、注册、生成报告、发邮件、图片处理)。典型的“天真写法”是:
用户请求来了 → 服务器把所有事情都做完 → 最后再返回响应
这在低并发时没问题,但一上量就会出现三个工程灾难:
- 响应时间爆炸
- 发邮件 2s、调用第三方 3s、生成 PDF 10s
- 用户等 15 秒,前端超时,体验崩了
- 吞吐下降 + 资源被拖死
- Web 线程/进程被长任务占着
- 新请求进不来,排队堆积,最后雪崩
- 故障扩散(最关键)
- 第三方接口抖一下,你的主链路就跟着挂
- “下单成功”这种核心路径被“发短信失败”拖垮,非常不划算
所以我们需要把系统拆成两类工作:
- 必须立刻完成的(同步):返回给用户的核心逻辑
- 可以稍后完成的(异步):耗时、可重试、可并行的任务
这就是“异步任务”和“消息队列”出现的根本原因:
把主链路变短,把非关键工作移出请求线程,用队列实现解耦、削峰、可靠交付。
2️⃣ 核心概念与角色(What)
先把名词讲清楚(这些会贯穿 RabbitMQ + Celery 全套体系):
✅ 异步任务(Async Task)
- “现在先别做,稍后做”
- 关键点:提交任务 与 执行任务 分离
✅ 消息队列(Message Queue, MQ)
- 一种“中间缓冲层”
- 生产者把消息放进去,消费者从里面取出来处理
- 关键价值:解耦、削峰、异步、可靠传递
✅ 角色关系说明(必须记住)
- Producer(生产者):产生任务的人(通常是你的 Web 服务)
- Broker(消息代理):队列系统(RabbitMQ 就是它)
- Consumer/Worker(消费者/工人):执行任务的人(Celery worker)
- Message/Task(消息/任务):传递的工作描述(“做什么 + 参数”)
用一个非常直白的类比:
1 | Web 服务 = 前台收银员(Producer) |
3️⃣ 工作原理(How)
3.1 从“同步直做”到“异步投递”
同步直做(问题模型):
1 | Client -> Web Server -> (发邮件/调用第三方/生成报表...) -> Response |
异步 + MQ(改造模型):
1 | Client -> Web Server -> 把任务丢进 MQ -> Response(立刻返回) |
3.2 Mermaid 流程图:任务的最小生命周期(概念版)
flowchart LR
A[Client 请求] --> B[Web 服务/Producer]
B -->|Send Task Message| C[(Message Broker / RabbitMQ)]
C -->|Consume| D[Worker/Consumer]
D --> E[执行任务逻辑]
E --> F[(结果存储 可选)]
这里你先记住一句话:
MQ 的核心是“存下来再说”,Worker 的核心是“取出来去做”。
3.3 为什么 MQ 能解决那些灾难?
- 解耦:Web 不关心任务怎么执行、执行多久、失败怎么办
- 削峰:流量瞬间 10 万请求,队列先堆住,Worker 慢慢消化
- 可靠性:任务可以持久化,Worker 宕机后能恢复继续处理
- 可扩展:任务多了就加 Worker,不必扩 Web
4️⃣ 与 RabbitMQ / Celery 的关系
这一章你只需要建立“分工边界”的直觉:
RabbitMQ(Broker 的一种实现)
- 负责:可靠地收消息、存消息、按规则投递消息
- 不负责:执行任务逻辑
Celery(分布式任务框架)
- 负责:把函数变成任务、把任务投递给 Broker、Worker 拉取执行、管理重试/并发/路由/结果
- 不负责:实现消息队列(它依赖 RabbitMQ/Redis 等 Broker)
所以 RabbitMQ + Celery 的经典组合就是:
1 | Celery Producer -> RabbitMQ(Broker) -> Celery Workers |
RabbitMQ 是“任务的物流系统”,Celery 是“任务的生产线管理系统”。
5️⃣ 最小可运行示例(Practice)
这一章的“最小示例”目标不是让你立刻跑 RabbitMQ(第 2 章开始我们会系统跑起来),
而是让你先形成异步任务拆分的代码形态:主流程只负责投递,耗时逻辑放到任务函数里。
5.1 目录结构(先记住这个形状)
1 | proj/ |
5.2 tasks.py:把“耗时逻辑”封装成任务函数
1 | # tasks.py |
5.3 app.py:主流程“只投递,不等待”(概念版)
1 | # app.py |
5.4 每一部分在系统中的作用(非常重要)
register():主链路(同步),必须快、必须稳send_welcome_email():异步任务,慢也没关系,失败可重试- “enqueue task”:未来用 RabbitMQ + Celery 实现真正投递
新手常见误区(提前点名)
- 把所有事情都异步化
- 核心一致性逻辑(比如扣库存、支付确认)乱异步会导致业务错乱
- 异步 = 不可靠
- 实际上设计得当,异步系统可以比同步更可靠(因为可重试、可恢复)
- 以为 MQ 只是性能工具
- 更重要的是“隔离故障 + 解耦依赖”,性能只是副产品
6️⃣ 📒 本章笔记总结
- 异步任务要解决的核心矛盾:主链路必须短、快、稳;非关键工作耗时且易失败
- 消息队列(MQ)的核心价值:
- 解耦:生产者不依赖消费者的执行细节
- 削峰:突发流量先堆队列,慢慢消费
- 可靠传递:消息可持久化,宕机可恢复
- 水平扩展:任务多就加 worker
- 关键角色:Producer / Broker / Worker / Message(Task)
- RabbitMQ vs Celery 分工:
- RabbitMQ:负责“收、存、投递消息”
- Celery:负责“把函数任务化、投递、消费执行、并发/重试/路由/结果管理”
- 工程原则:同步做必须立刻完成的,异步做可以延迟且可重试的
7️⃣ 📒 异步任务 & MQ 问题总结
一、Web 框架已经是 async 了,为什么还需要 MQ?
❓问题本质
FastAPI / asyncio 已经是异步并发了,为什么还会有“阻塞 / 等待 / 雪崩”的问题?
✅ 核心结论
- Web async ≠ 异步任务系统
- Web async 解决的是:IO 等待不阻塞线程
- MQ + Worker 解决的是:系统级解耦、资源隔离、稳定性
📌 关键区别
| 维度 | Web async | MQ + Worker |
|---|---|---|
| 执行位置 | Web 进程内 | 独立进程 / 机器 |
| CPU 密集 | ❌ | ✅ |
| 外部依赖抖动 | ❌ 容易拖垮 | ✅ 被隔离 |
| 限速 / 削峰 | ❌ | ✅ |
| 失败重试 | ❌ | ✅ |
二、异步任务丢进 MQ 就返回了,结果什么时候知道?
❓问题本质
没有同步返回结果,那任务“完成”的信号在哪里?
✅ 三种主流结果模型
1️⃣ Fire-and-Forget(最常见)
- 不关心结果
- 如:发邮件 / 发短信 / 同步数据
2️⃣ 轮询 / 查询结果
- 提交任务 → 返回 task_id
- 前端 / 调用方轮询 DB / Redis / Celery backend
3️⃣ 推送 / 回调(高级)
- WebSocket / SSE / 回调接口
- 少数强实时场景使用
📌 工程经验
90% 任务不需要立刻知道结果
异步系统天然是“最终一致性”
三、多长时间 / 多大并发的任务适合用 MQ?
❌ 错误判断方式
- “超过 1s 才用 MQ”
- “必须是 100s 才值得”
✅ 正确判断维度(是否用 MQ)
| 判断点 | 如果是 |
|---|---|
| 是否阻塞 Web 主链路 | 用 MQ |
| 是否调用外部系统 | 用 MQ |
| 是否可能失败重试 | 用 MQ |
| 是否可能突发流量 | 用 MQ |
| 是否不希望跑在 Web 进程 | 用 MQ |
📌 黄金法则(必记)
凡是你不希望跑在 Web 进程里的逻辑,都应该进任务队列
四、RabbitMQ 可以 Docker 跑吗?能给多个应用用吗?
✅ Docker 运行 RabbitMQ
- 完全支持
- 生产环境常态
1 | rabbitmq:3-management |
- 5672:业务连接
- 15672:管理后台
✅ 一个 MQ 服务多个应用(标准架构)
- 使用 Virtual Host / Exchange / Queue 隔离
- 权限、命名、路由解耦
1 | 一个 RabbitMQ 集群 |
五、Celery 支持任务嵌套 / 长任务拆分吗?
✅ 支持,而且是核心能力
Celery 不推荐:
- 在任务里
get()等子任务结果(❌ 会阻塞 worker)
Celery 推荐:
- 任务编排(Canvas)
常用编排方式
| 方式 | 场景 |
|---|---|
| chain | 串行步骤依赖 |
| group | 并行子任务 |
| chord | 并行 + 汇总 |
| 动态生成任务 | 扇出数量运行时决定 |
📌 工程原则
- 长任务 → 拆阶段
- 可并行 → group / chord
- 不要在 worker 里“同步等待”
【第 2 章:RabbitMQ 是什么?核心思想与工作模型】
这一章的目标只有一个:
把 RabbitMQ 从“一个黑盒中间件”拆成你脑子里能运转的模型。
看完这一章,你应该能回答:
👉 “一条消息进来后,RabbitMQ 到底对它做了什么?”
1️⃣ 本章要解决什么问题(Why)
在第 1 章我们已经知道:
MQ 是为了解耦生产者和消费者、保护系统稳定性而存在的。
但这会立刻引出 3 个工程问题:
- 消息进来后放哪?
- 有多个消费者时,消息怎么分发?
- 不同类型的消息,怎么路由到不同处理逻辑?
如果 MQ 只是一个简单队列(FIFO):
- 所有消息挤在一起
- 所有消费者只能“抢”
- 无法区分任务类型
- 扩展性和可维护性极差
RabbitMQ 诞生的核心动机就是:
把“消息存储”和“消息路由”彻底解耦,让消息投递规则可配置、可扩展。
2️⃣ 核心概念与角色(What)
这一章你先记住 4 个角色,它们构成 RabbitMQ 的最小心智模型:
| 角色 | 本质作用 |
|---|---|
| Producer | 发送消息的应用 |
| Exchange | 消息路由器(不存消息) |
| Queue | 真正存消息的地方 |
| Consumer | 从 Queue 取消息处理 |
⚠️ 非常重要的一句话
Producer 从不直接把消息发给 Queue,而是发给 Exchange
这是 RabbitMQ 和“朴素队列”的根本区别。
3️⃣ 工作原理(How)
我们一步一步拆解“一条消息的完整旅程”。
3.1 最小消息流(核心路径)
flowchart LR
P[Producer] -->|publish| E[Exchange]
E -->|route| Q[Queue]
Q -->|consume| C[Consumer]
注意职责边界:
- Producer:只管发消息,不知道 Queue 存在哪
- Exchange:只负责路由决策
- Queue:只负责存消息
- Consumer:只管取消息执行
3.2 为什么一定要有 Exchange?
假设没有 Exchange(Producer 直连 Queue):
- Producer 必须知道所有 Queue
- 新增消费者要改 Producer 代码
- 路由逻辑散落在业务代码里
有了 Exchange:
- Producer 只面向“规则”
- 路由策略集中管理
- 系统更像“可配置基础设施”
📌 工程视角
Exchange 是 RabbitMQ 的“控制平面”,Queue 是“数据平面”。
3.3 Exchange 的核心工作:路由(不是存储)
Exchange 做的只有一件事:
根据“规则”,把消息复制 / 投递到 0~N 个 Queue
Exchange 不存消息,消息一旦路由完成,就和 Exchange 无关了。
4️⃣ 与 Celery 的关系
这一章你要在脑子里建立这张映射表:
| RabbitMQ | Celery 中的角色 |
|---|---|
| Producer | task.delay() / apply_async() |
| Exchange | Celery 自动创建的 task exchange |
| Queue | Celery 的任务队列 |
| Consumer | Celery worker |
当你写:
1 | send_email.delay(user_id) |
本质发生的是:
1 | Celery Producer |
📌 关键认知
Celery 把 RabbitMQ 当作“可编程任务路由器”来用。
5️⃣ 📒 本章笔记总结
- RabbitMQ 的核心思想:消息存储 ≠ 消息路由
- Producer 永远把消息发给 Exchange,而不是 Queue
- Exchange 不存消息,只做路由决策
- Queue 是消息的唯一存储单元
- Consumer 只从 Queue 拉消息
- RabbitMQ 通过 Exchange 实现“配置化路由”
- Celery 利用 RabbitMQ 的 Exchange/Queue 机制完成任务分发
6️⃣ 📒 第 2 章相关疑问
Q1:pika 是什么?和 Celery 什么关系?
- pika:Python 的 RabbitMQ(AMQP)客户端库,偏底层。
- 能做:连接 RabbitMQ、声明 exchange/queue/binding、publish/consume、ack 等。
- Celery:分布式任务框架(任务定义、投递、worker 并发、重试、路由、结果等一整套)。
- 两者关系:
- 用 pika:你在“手写消息通信细节”(更底层、学习 RabbitMQ 原理很适合)。
- 用 Celery:Celery 通过自己的消息通信抽象(通常是 kombu)来和 RabbitMQ/Redis 等 broker 通信;一般不需要你直接写 pika。
- 工程结论:
- 学原理 → pika 很好
- 落地任务系统 → 直接 Celery
Q2:除了 RabbitMQ,还有什么消息队列?
按“模型/适用场景”分两类更好记:
A 类:传统消息队列(偏任务分发/路由/ACK)
- RabbitMQ(AMQP 模型,路由能力强)
- ActiveMQ / Artemis(JMS 生态)
- NATS / JetStream(轻量高速)
- Redis(常用作轻量 broker,但可靠性/语义边界要清楚)
B 类:事件流/日志平台(偏高吞吐、可回放、流处理)
- Kafka(吞吐高、分区顺序、可回放)
- Pulsar(类似 Kafka,但多租户/存储分层等特性不同)
选型速记
- “任务队列 + 路由 + 重试/ACK” → RabbitMQ 常见
- “海量事件流 + 回放 + 数据管道” → Kafka/Pulsar
- “轻量、好部署、快速落地” → Redis(需明确可靠性策略)
Q3:Exchange 和 Queue 都是 RabbitMQ 的能力吗?
- 是:在 RabbitMQ(以及 AMQP 风格模型)里,Exchange/Queue/Binding 是核心抽象。
- 但要更精确:
- Queue:几乎所有 MQ 都有“存消息的地方”(名字可能叫 queue/topic/stream)。
- Exchange:更像 RabbitMQ/AMQP 的标志性“路由层抽象”,不是所有 MQ 都有同名概念(例如 Kafka 没有 Exchange)。
Q4:AMQP 是什么?
AMQP(Advanced Message Queuing Protocol):一种消息通信协议/规范。
定义了:客户端如何与 broker 通信,包括
- connection/channel
- publish/consume
- ack/nack
- 声明与绑定(exchange/queue/binding 的交互语义)
一句话:
AMQP 是“消息中间件的协议与语义标准”,RabbitMQ 是其典型实现之一。
pika 本质上就是在“讲 AMQP 这门协议”。
Q5:如果使用 Redis,当 broker 时就不存在 Exchange 吗?
- 基本结论:对,Redis broker 通常没有 RabbitMQ 那种 Exchange 路由层。
- RabbitMQ:路由在 broker 内(Exchange 决策,支持 direct/topic/fanout 等)
- Redis:更常见是 Producer 直接写入某个队列 key(List/Stream),
“路由”往往变成:你(或 Celery)选择写到哪个队列名/哪个 key - 工程差异速记:
- RabbitMQ:路由能力强、规则集中、天然支持广播/复杂匹配
- Redis:部署简单、轻量,但复杂路由更多在应用/Celery 侧实
【第 3 章:RabbitMQ 核心组件详解(Exchange / Queue / Binding)】
1️⃣ 本章要解决什么问题(Why)
在第 2 章你已经有了最小心智模型:
Producer → Exchange → Queue → Consumer
但只知道“有 Exchange/Queue”还不够,上工程会马上遇到这些问题:
- 一条消息应该进哪个队列?(按任务类型/业务域隔离)
- 一个消息要不要发给多个队列?(广播、同时触发多种处理)
- 如何做到“规则可配置”而不是写死在代码里?
- Celery 说的 routing_key / queue / exchange 到底对应 RabbitMQ 的什么?
如果没有 Binding 这套机制,你就只能让 Producer 直接指定队列名(强耦合),系统会迅速变成“改一个队列,所有服务改代码”。
RabbitMQ 的解法是:
用 Exchange 做路由入口,用 Binding 把“路由规则”配置化,把消息送入一个或多个 Queue。
2️⃣ 核心概念与角色(What)
2.1 三大组件定义(最关键)
- Exchange(交换机):消息的“路由器”,决定消息去哪些队列
- ✅ 做路由
- ❌ 不存消息
- Queue(队列):消息的“存储容器”,消费者从这里取
- ✅ 存消息(内存/磁盘,取决于持久化设置)
- ✅ 支持消费者竞争消费/并行扩展
- Binding(绑定):把 Exchange 和 Queue 连起来的“路由规则”
- 它本质上是一条规则:
(exchange, binding_key) → queue
- 它本质上是一条规则:
2.2 角色关系说明(把图刻进脑子)
flowchart LR
P[Producer 生产者] -->|publish + routing_key| E[Exchange 交换机]
E -->|match: routing_key vs binding_key| Q1[Queue A]
E -->|match: routing_key vs binding_key| Q2[Queue B]
E -->|match: routing_key vs binding_key| Q3[Queue C]
Q1 -->|consume| C1[Consumer/Worker A]
Q2 -->|consume| C2[Consumer/Worker B]
Q3 -->|consume| C3[Consumer/Worker C]
读法:生产者发布到 Exchange → Exchange 根据匹配规则把消息路由到一个或多个 Queue → 消费者从 Queue 拉取处理
这里有 2 个“Key”新手最容易混:
- routing_key:Producer 发消息时带的“标签”
- binding_key:Queue 绑定到 Exchange 时声明的“匹配规则”
📌 核心句(必背)
routing_key 是消息的“地址标签”,binding_key 是队列的“接收规则”。
Exchange 做的事:拿 routing_key 去匹配 binding_key。
3️⃣ 工作原理(How)
RabbitMQ 的路由能力主要由 Exchange 类型决定。你必须掌握四种类型:
- direct
- fanout
- topic
- headers(较少用)
我用同一套“邮件/短信/日志”例子带你理解。
3.1 direct:精确匹配(最像”定向投递”)
规则:routing_key == binding_key 才投递
flowchart LR P[Producer rk=task.email] --> E[direct exchange] E -->|binding_key=task.email| Q1[email-queue] E -->|binding_key=task.sms| Q2[sms-queue]
- 发
task.email→ 进email-queue - 发
task.sms→ 进sms-queue
✅ 适合:任务类型明确、队列划分明确(Celery 常用)
3.2 fanout:广播(最像”群发”)
规则:忽略 routing_key,把消息复制到所有绑定队列
flowchart LR P[Producer] --> E[fanout exchange] E --> Q1[queue-A] E --> Q2[queue-B] E --> Q3[queue-C]
✅ 适合:
- 配置变更广播
- 缓存失效通知
- “所有下游都要知道的事件”
⚠️ 误区:
- fanout 会让消息量 队列数倍增,要评估成本
3.3 topic:通配符匹配(最强但要规范)
topic 是 RabbitMQ 工程里最常用也最容易用乱的。
规则:routing_key 用 . 分段,binding_key 支持通配符:
*匹配 恰好 1 段#匹配 0~多段
例子:
- routing_key:
task.email.send - binding_key:
task.*.send✅(匹配 task + 1段 + send)task.#✅(匹配以 task 开头的任何)task.email.*✅(匹配 task.email 下 1 段)
flowchart LR P[Producer rk=task.email.send] --> E[topic exchange] E -->|binding_key=task.email.*| Q1[email-queue] E -->|binding_key=task.#| Q2[all-task-queue]
✅ 适合:业务域分层路由(系统变大后很好用)
⚠️ 新手常见误区:
- routing_key 命名没规范,topic 立刻变“玄学路由”
- 建议统一:
域.子域.动作.版本或业务.模块.事件
3.4 headers:按消息头匹配(很少用)
规则:不看 routing_key,看 message headers(键值对)
✅ 适合:需要复杂条件组合,但一般 topic 已足够
⚠️ 复杂度高、可读性差,工程中不常见
3.5 一条消息能去哪?(0~N 个队列)
RabbitMQ 路由结果可能是:
- 0 个队列:消息被丢弃或触发 Return(看 mandatory 等设置)
- 1 个队列:最常见
- N 个队列:广播/多订阅者模型
flowchart LR P --> E[Exchange] E --> Q1 E --> Q2 E --> Q3
4️⃣ 与 RabbitMQ / Celery 的关系
这一节只讲“映射关系”,让你不再被术语绕晕。
4.1 Celery 的”路由”对应 RabbitMQ 的什么?
当你在 Celery 里配置:
task_routes/queue/routing_key/exchange
对应 RabbitMQ:
- Celery Producer 发布消息到某个 Exchange
- 并带一个 routing_key
- RabbitMQ 用 binding 规则把消息送到指定 Queue
- Worker 监听对应 Queue
你可以把 Celery 的任务路由理解为:
Celery 帮你生成 routing_key,并帮你声明 Exchange/Queue/Binding。
4.2 为什么 Celery 常用 direct/topic,而不是 fanout?
- 任务队列的核心诉求:一个任务只应该被一个 worker 消费(通常)
- fanout 会复制消息给多个队列,导致同一个任务可能被多方处理(除非你明确要广播)
- direct/topic 更符合“按类型路由到单队列,再由队列内部竞争消费”的模式
📌 工程经验
任务分发:direct/topic
事件广播:fanout
sequenceDiagram participant P as Producer participant E as Exchange participant B as Bindings(规则表) participant Q as Queue(s) P->>E: publish(message, routing_key) E->>B: 查询所有绑定规则 B-->>E: 返回 (queue, binding_key) 列表 E->>E: 按 exchange_type 执行匹配/路由 E->>Q: 投递到匹配成功的 0~N 个队列
5️⃣ 📒 本章笔记总结
- RabbitMQ 三核心组件:
- Exchange:路由器,不存消息
- Queue:存消息,消费者从这里取
- Binding:路由规则,把 Exchange 和 Queue 连接起来
- 两个 Key:
- routing_key(消息标签):Producer 发布消息时携带
- binding_key(接收规则):Queue 绑定 Exchange 时声明
- Exchange 用 routing_key 匹配 binding_key 决定投递队列
- Exchange 四类型:
- direct:精确匹配(rk==bk)
- fanout:广播(忽略 rk)
- topic:通配符匹配(
*一段,#多段) - headers:按消息头匹配(少用)
- 一条消息可投递到 0~N 个队列(取决于绑定规则)
- 与 Celery 的关系:Celery 通过配置生成 exchange/queue/routing_key,并由 RabbitMQ 完成最终路由投递
- 工程建议:
- 任务分发优先 direct/topic
- 事件广播才用 fanout
- topic 必须先定 routing_key 命名规范,否则会变“玄学路由”
【第 4 章:Celery 是什么?它解决了什么问题】
到这一章为止,你已经真正理解了 RabbitMQ:
- 它能路由
- 能存消息
- 能投递
那接下来一个自然的问题是:
“那我为什么还需要 Celery?直接用 RabbitMQ + pika 不行吗?”这一章,就是来回答这个“架构分水岭问题”的。
1️⃣ 本章要解决什么问题(Why)
假设你已经用 RabbitMQ + pika 跑起来了消息通信,很快你会遇到一串工程级痛点:
- 任务是什么?
- 只是一个字符串?JSON?函数调用?
- 参数、版本、序列化怎么统一?
- 怎么并发执行?
- 一个进程拉几条消息?
- 多进程还是多线程?
- CPU 密集 vs IO 密集怎么区分?
- 失败怎么办?
- 失败要不要重试?
- 重试间隔?次数?
- 哪些错误该重试,哪些不该?
- 结果去哪?
- 谁来记录任务成功/失败?
- 怎么查询任务状态?
- 前端如何感知完成?
- 任务怎么路由?
- 不同任务进不同队列?
- 不同 worker 处理不同任务?
- 动态扩缩容怎么做?
如果你全靠 pika 自己写,这意味着:
你在“手写一个简化版 Celery”。
Celery 存在的意义就是:
把“分布式任务系统”的通用复杂度,做成一套成熟框架。
2️⃣ 核心概念与角色(What)
先给 Celery 一个准确定位:
Celery 是一个分布式异步任务队列/任务调度框架
它让你把“函数调用”升级为“可分布式执行的任务”。
Celery 里最重要的 4 个角色(先记住名字)
| 角色 | 本质 |
|---|---|
| Task | 被执行的“任务定义”(函数 + 元数据) |
| Producer | 提交任务的一方(通常是 Web 服务) |
| Broker | 消息中间件(RabbitMQ / Redis 等) |
| Worker | 实际执行任务的进程 |
你会发现:Celery 的角色 完全覆盖了 MQ 模型,但语义更高层。
3️⃣ 工作原理(How)
我们从“你写一行 delay()”开始,拆解 Celery 到底做了什么。
3.1 一行代码背后的真实流程
1 | send_email.delay(user_id=42) |
背后发生的是一整套流程:
sequenceDiagram
participant App as Web / Producer
participant C as Celery Client
participant B as Broker (RabbitMQ)
participant W as Worker
App->>C: 调用 task.delay()
C->>C: 序列化任务名 + 参数
C->>B: 发送消息
B-->>W: 投递到队列
W->>W: 反序列化 + 执行任务函数
📌 关键点
- 你“调用函数”的感觉,其实是 发消息
- Worker 才是真正调用函数的地方
3.2 Celery 帮你隐藏了哪些复杂度?
如果不用 Celery,你需要自己处理:
- 任务序列化 / 反序列化
- 消息格式与版本
- Worker 并发模型
- 心跳、断线重连
- ACK / 重试 / 失败处理
- 任务超时、限速
- 任务路由
- 结果存储
Celery 的核心价值一句话总结:
Celery 把“分布式执行函数”这件事,封装成了一个你能放心使用的基础设施。
4️⃣ 与 RabbitMQ / Celery 的关系(边界非常重要)
4.1 RabbitMQ vs Celery 的分工边界
| 维度 | RabbitMQ | Celery |
|---|---|---|
| 消息存储 | ✅ | ❌ |
| 消息路由 | ✅(Exchange/Binding) | ⚠️(通过配置使用) |
| 任务语义 | ❌ | ✅ |
| 重试策略 | ❌ | ✅ |
| 并发执行 | ❌ | ✅ |
| 失败处理 | ❌ | ✅ |
| 结果管理 | ❌ | ✅ |
📌 关键认知
RabbitMQ 是“消息基础设施”,
Celery 是“任务执行与调度系统”。
4.2 Celery 如何”使用”RabbitMQ?
- Celery 把 任务 当作 消息
- 把 RabbitMQ 当作 Broker
- 利用:
- Exchange → 决定任务路由
- Queue → 存任务
- Consumer → Worker 拉取任务
但:
RabbitMQ 并不知道什么是“任务”
Celery 也不负责“存消息”
5️⃣ 最小可运行示例(Practice)
这一章给你一个真正的 Celery 最小示例,目标是:
跑起来一次任务,让你“感觉到 Celery 在干活”。
5.1 环境准备
1 | pip install celery |
RabbitMQ 已启动(复用前面章节)。
5.2 定义 Celery 应用与任务
1 | # tasks.py |
5.3 启动 Worker(非常关键的一步)
1 | celery -A tasks worker --loglevel=info |
你会看到:
- worker 启动
- 连接 RabbitMQ
- 声明 exchange / queue
- 等待任务
5.4 提交任务(Producer)
1 | # call.py |
你会观察到:
call.py立刻返回- worker 进程中执行
add - 2 秒后完成
📌 这就是 “函数 → 分布式异步任务” 的最小闭环。
6️⃣ 📒 本章笔记总结
- Celery 是 分布式异步任务执行框架
- 它解决的是:
- 任务定义
- 任务投递
- 并发执行
- 重试/失败处理
- 路由与结果管理
- RabbitMQ vs Celery 分工:
- RabbitMQ:消息存储 + 路由
- Celery:任务语义 + 执行调度
task.delay()≠ 本地函数调用- 它是一次 消息发送
- Worker 才是真正执行任务的地方
- 如果你用 pika 手写任务系统,本质是在“重复造 Celery 的轮子”
【第 5 章:Celery 架构全景解析(Producer / Broker / Worker / Result Backend)】
到这一章,我们要完成一个关键跃迁:
从“会用 Celery” → “看懂 Celery 的整体架构与设计取舍”这一章结束后,你脑子里应该能自动浮现一张图:
👉 一个任务,从 Web 出发,如何穿过 RabbitMQ,被 Worker 执行,并最终(可选)返回结果。
1️⃣ 本章要解决什么问题(Why)
在第 4 章,你已经知道:
1 | add.delay(1, 2) |
这行代码会:
- 不阻塞当前进程
- 把任务交给别的进程/机器执行
但工程上,真正重要的问题是:
- 任务是谁发起的?
- 消息到底存在哪?
- Worker 如何拉任务?
- 任务结果放哪里?
- 如果 Worker 挂了/重启,系统还能不能继续?
如果你搞不清这些边界:
- 你会乱配 broker / backend
- 不知道该不该开 result backend
- 不敢改并发、prefetch、ack
- 出问题时完全不知道从哪查
👉 所以这一章的目标是:
把 Celery 的“角色分工 + 数据流 + 责任边界”一次性讲清楚。
2️⃣ 核心概念与角色(What)
Celery 的整体架构由 4 个核心角色组成(必须全部记住):
| 角色 | 名称 | 本质职责 |
|---|---|---|
| Producer | 任务生产者 | 提交任务(通常是 Web/API) |
| Broker | 消息中间件 | 存任务、投递任务 |
| Worker | 执行者 | 拉任务并执行 |
| Result Backend | 结果存储 | 保存任务状态/结果(可选) |
这 4 个角色彼此解耦,各司其职,是 Celery 架构稳定性的核心。
3️⃣ 工作原理(How)
我们从“一个任务的完整生命周期”来拆。
3.1 全景架构图(一定要能背出来)
flowchart LR
P[Producer
Web / API] -->|send task| B[(Broker
RabbitMQ)]
B -->|deliver task| W[Worker]
W -->|execute task| W
W -->|store result| R[(Result Backend)]
注意:Producer 从不直接和 Worker 通信。
3.2 Producer(任务生产者)
它是谁?
- Web 服务
- 管理后台
- 定时任务(beat)
- CLI 脚本
它做什么?
- 把“函数调用”转换成“任务消息”
- 序列化:
- task 名
- 参数
- 路由信息
- 重试策略等元数据
- 发送给 Broker
1 | add.delay(3, 5) |
📌 关键认知
Producer 的生命周期极短:
“发出去就不管了”(是否关心结果取决于你)。
3.3 Broker(RabbitMQ)
Broker 只做 3 件事:
- 接收任务消息
- 按 Exchange / Queue / Binding 路由
- 把消息交给某个 Worker
Broker 不知道:
- 任务是什么函数
- 是否执行成功
- 是否需要重试
📌 重要结论
Broker ≠ 调度器 ≠ 执行器
它只是一个可靠的消息中转站。
3.4 Worker(任务执行者)
Worker 是 Celery 的“重心”。
它的职责包括:
- 从 Broker 拉任务
- 反序列化任务
- 调用真正的 Python 函数
- 管理并发(进程/线程/协程)
- 捕获异常
- 决定:
- ack
- retry
- fail
sequenceDiagram
participant W as Worker
participant B as Broker
W->>B: 请求任务
B-->>W: 投递任务
W->>W: 执行函数
W->>B: ack / reject
📌 非常重要
Worker 是有状态的进程,
而 Producer 和 Broker 都是“相对无状态”的。
3.5 Result Backend(结果存储)
这是最容易被误解的一块。
Result Backend 是干什么的?
- 存任务状态:
- PENDING
- STARTED
- SUCCESS
- FAILURE
- 存返回值或异常信息
常见实现:
- Redis
- 数据库
- RPC backend(少见)
flowchart LR
W[Worker] -->|update state/result| R[(Result Backend)]
P[Producer] -->|query task result| R
📌 关键点(必记)
- Result Backend ≠ Broker
- Broker 存“任务消息”
- Backend 存“任务结果”
4️⃣ 与 RabbitMQ / Celery 的关系(边界再次强调)
4.1 RabbitMQ 在架构中的位置
- RabbitMQ = Broker
- 它只负责:
- 任务消息的可靠投递
- 它不参与:
- 执行
- 状态管理
- 重试决策
4.2 Celery 在架构中的职责
Celery 横跨:
- Producer SDK
- Worker Runtime
- 路由/重试/并发/超时策略
- Result Backend 接口
📌 架构总结一句话
RabbitMQ 是“运输系统”,Celery 是“任务执行与调度系统”。
5️⃣ 最小可运行示例(Practice)
目标:把 4 个角色都跑起来并“看到它们协作”。
5.1 tasks.py(定义 Celery + 任务)
1 | from celery import Celery |
5.2 启动 Worker
1 | celery -A tasks worker --loglevel=info |
你会看到:
- 连接 broker
- 注册任务
- 启动并发池
5.3 提交任务并查询结果
1 | from tasks import slow_add |
📌 注意
在 Web 服务中 **不要随便用
get()**,这里只是演示 Result Backend 的作用。
6️⃣ 📒 本章笔记总结
- Celery 架构四角色:
- Producer:提交任务
- Broker:存储并投递任务(RabbitMQ)
- Worker:执行任务
- Result Backend:存任务状态/结果(可选)
- Producer 永远不直接找 Worker
- Broker 永远不知道任务是否成功
- Worker 决定 ack / retry / fail
- Result Backend 用于:
- 查询结果
- 任务编排(chain/chord)
- Broker ≠ Result Backend
- Worker 是系统中最“重”的组件(并发、状态、资源消耗)
【第 6 章:RabbitMQ + Celery 如何协同工作(完整任务生命周期)】
到这一章,我们要做一件极其重要的事:
把前 1~5 章的所有“零件”,组装成一条“能在你脑中自动播放的任务时间线”。看完这一章,你应该能做到:
- 看一条 Celery 日志,就知道任务走到哪一步
- 出问题时,知道该查 RabbitMQ 还是 Celery Worker
- 明白 “为什么 Celery 要这样设计”,而不是死记配置项
1️⃣ 本章要解决什么问题(Why)
前面我们已经分别理解了:
- RabbitMQ:Exchange / Queue / Binding / routing
- Celery:Producer / Broker / Worker / Result Backend
但工程中真正让人迷糊的是:
“它们是怎么在时间上、责任上配合的?”
常见困惑包括:
- Celery 什么时候创建 Exchange / Queue?
- routing_key 到底在哪一步生效?
- Worker 是主动推还是被动拉?
- ack 在什么时候发?
- 任务失败后,消息发生了什么?
👉 本章目标:
从“调用 delay()”开始,到“任务成功/失败结束”为止,完整拆解一次任务的生命周期。
2️⃣ 核心概念与角色(What)
在这一章里,你只需要抓住 5 个关键实体:
| 实体 | 属于 | 核心职责 |
|---|---|---|
| Task | Celery | 任务定义(函数 + 元数据) |
| Producer | Celery | 序列化并发送任务 |
| Broker | RabbitMQ | 存储 & 路由任务消息 |
| Worker | Celery | 拉取、执行、确认任务 |
| Result Backend | Celery | 保存任务状态/结果(可选) |
📌 关键认知
RabbitMQ 只负责“消息”;
Celery 负责“任务语义 + 执行控制”。
3️⃣ 工作原理(How)
下面是完整任务生命周期的 8 个阶段。
我会先给总览图,再逐步拆解。
3.1 完整任务生命周期(总览图)
sequenceDiagram
participant P as Producer (Web)
participant B as Broker (RabbitMQ)
participant W as Worker (Celery)
participant R as Result Backend
P->>B: 1. publish task message
B-->>W: 2. deliver via queue
W->>W: 3. deserialize task
W->>W: 4. execute function
W->>B: 5. ack / reject
W->>R: 6. store state/result
3.2 阶段 1:Producer 调用 delay()
1 | send_email.delay(user_id=42) |
Celery Producer 做了什么?
- 找到 task 名(如
tasks.send_email) - 序列化:
- task 名
- 参数
- routing_key / queue
- retry / countdown / ETA 等元数据
- 构造一条 消息
- 发送给 RabbitMQ(Exchange)
📌 注意
此时任务“还没开始执行”,只是“被投递”。
3.3 阶段 2:RabbitMQ 路由消息
RabbitMQ 做的事情非常“冷静”:
- Exchange 收到消息
- 用 routing_key 匹配 binding
- 把消息放进一个或多个 Queue
- 等待 Worker 来取
flowchart LR
P --> E[Exchange]
E --> Q[Queue]
📌 RabbitMQ 不知道:
- 这是 Celery 任务
- 任务是否重要
- 任务是否会失败
3.4 阶段 3:Worker 拉取任务(pull 模型)
Worker 启动后会:
- 连接 Broker
- 声明自己关心的 Queue
- 主动从 Queue 拉任务
sequenceDiagram
W->>B: basic.consume(queue)
B-->>W: deliver message
📌 关键点(必记)
RabbitMQ 不“推任务”,Worker 是主动拉的
(这是背压、限速、稳定性的基础)
3.5 阶段 4:Worker 反序列化 & 执行任务
Worker 收到消息后:
- 反序列化消息 → 找到 task 函数
- 放入执行池(进程 / 线程 / 协程)
- 执行 Python 函数
1 | message → task name → function → run |
📌 工程视角
这一步才是真正“消耗 CPU / IO”的地方。
3.6 阶段 5:ACK / REJECT(非常关键)
执行完成后,Worker 决定:
- ✅ 成功 →
ACK - ❌ 失败但可重试 →
REJECT + requeue - ❌ 失败不可重试 →
REJECT(丢弃或死信)
flowchart LR
W -->|success| ACK
W -->|retry| REQUEUE
W -->|fail| DROP
📌 核心结论
ACK 是 Worker 发的,不是 RabbitMQ 自动的
这也是 Celery 能保证“至少一次执行”的基础。
3.7 阶段 6:写入 Result Backend(可选)
如果你配置了 Result Backend:
- Worker 会写:
- 状态(STARTED / SUCCESS / FAILURE)
- 返回值 / 异常信息
Producer 或其他任务可以:
1 | AsyncResult(task_id).status |
📌 注意
Result Backend 和 Broker 完全是两条线
3.8 阶段 7:任务生命周期结束
此时:
- RabbitMQ 中:消息已被 ACK(消失)
- Worker:释放资源
- Backend:保留结果(直到过期)
整个任务正式结束。
4️⃣ 与 RabbitMQ / Celery 的关系(再次拉清边界)
4.1 RabbitMQ 的责任边界
- 接收消息
- 路由消息
- 存储消息
- 投递消息
不做:
- 执行
- 重试决策
- 状态管理
4.2 Celery 的责任边界
- 定义任务
- 控制并发
- 执行函数
- 重试策略
- ACK / REJECT
- 结果管理
📌 一句话
RabbitMQ = 运输系统
Celery = 任务执行与控制系统
5️⃣ 最小可运行示例(Practice)
目标:用日志亲眼看到“生命周期”。
5.1 任务定义
1 | # tasks.py |
5.2 启动 Worker
1 | celery -A tasks worker --loglevel=info |
5.3 发送任务
1 | from tasks import unreliable_task |
观察:
- 奇数任务 → fail → retry → 最终失败
- 偶数任务 → 成功 → ACK
📌 结合日志,你能清楚看到:
- 消息被重新入队
- 重试间隔生效
- ACK 只在成功后发送
6️⃣ 📒 本章笔记总结(可直接记录)
- Celery + RabbitMQ 协同的本质:消息 + 任务语义分离
- 完整生命周期:
- Producer 序列化任务 → 发消息
- RabbitMQ 路由 → Queue
- Worker 主动拉任务
- Worker 执行函数
- Worker 决定 ACK / RETRY / FAIL
- (可选)写 Result Backend
- RabbitMQ 不知道任务是否成功
- ACK 由 Worker 决定
- Worker 是系统中最关键、最“重”的组件
- pull 模型是系统稳定性的关键(背压)
【第 7 章:Celery 任务执行流程深度拆解】
这一章我们要把 Celery Worker 从“黑盒”拆成“管线”。
你会真正理解:
- 任务消息进 Worker 后经历了哪些内部阶段
concurrency、prefetch、acks_late这些配置为什么会改变可靠性/吞吐- 为什么新手最容易在“并发 + ACK + 重试”上踩坑
1️⃣ 本章要解决什么问题(Why)
当系统上线后,你很快会遇到这些“工程真实问题”:
- 为什么我设置了并发 20,吞吐却上不去?
- 为什么偶尔会出现任务重复执行?
- 为什么 Worker 重启后,有些任务丢了/有些任务又被重跑?
- prefetch 是什么?为什么它会导致队列看起来“空了但任务还在跑”?
- IO 任务该用协程还是进程?CPU 任务怎么配?
这些问题都指向同一件事:
你需要看懂 Worker 内部的执行流程与关键开关,否则只能靠“玄学调参”。
2️⃣ 核心概念与角色(What)
本章聚焦 Worker 内部的几个关键模块(不背源码名,也能理解):
| 概念 | 作用 | 你要记住什么 |
|---|---|---|
| Consumer(消费循环) | 从 Broker 拉消息 | pull + prefetch 发生在这里 |
| Prefetch(预取) | 先拉一批消息到本地缓存 | 吞吐↑,公平性↓,内存占用↑ |
| QoS(质量控制) | 控制最多“在途未确认”消息数 | prefetch_count 的本质 |
| Pool(执行池) | 实际跑任务的并发单元 | 进程/线程/协程,决定性能模型 |
| ACK 时机 | 什么时候告诉 broker “我处理完了” | 可靠性与重复执行的核心开关 |
| Retry(重试) | 失败后再投递 | 由 Worker/Celery 决策,不是 RabbitMQ |
再补一个非常重要的术语:
- In-flight / Unacked 消息:
消息已经投递给 Worker,但还没 ack(RabbitMQ 认为“还没处理完”)
3️⃣ 工作原理(How)
我们用“消息进入 Worker 后的内部管线”来拆解:
从 RabbitMQ 投递给 Worker 的那一刻起,发生什么?
3.1 Worker 内部任务管线(总览)
flowchart LR
A[Broker 投递消息] --> B[Worker Consumer 接收]
B --> C[本地预取缓冲区
prefetch buffer]
C --> D[反序列化 & 定位 Task]
D --> E[投递到执行池 Pool]
E --> F[执行任务函数]
F --> G{成功/失败?}
G -->|成功| H[ACK]
G -->|失败| I[Retry / Reject / Fail]
H --> J[(可选) 写 Result Backend]
I --> J
你只要抓住:
Consumer 拉消息 + Pool 执行 + ACK 确认
这三段是 Celery Worker 的核心。
3.2 Prefetch:为什么队列”空了但任务还在跑”?
prefetch 的本质:
Worker 为了提高吞吐,会提前从 RabbitMQ 拉一批消息放在本地缓存里。
ASCII 示意:
1 | RabbitMQ Queue: [m1 m2 m3 m4 m5 m6 ...] |
因此 RabbitMQ 管理台上看到:
- 队列消息变少甚至为 0
但实际上: - Worker 本地还有一批“未开始执行”的消息
📌 工程结论
队列长度不是系统“还剩多少任务”的真实指标
还要看 in-flight/unacked 和 worker 并发执行情况。
3.3 Prefetch 数 = 并发数 × prefetch_multiplier
Celery 默认(常见情况):
worker_concurrency = Nworker_prefetch_multiplier = M- 那么理论预取上限:
N * M
例:并发 8,multiplier 4 → 预取 32 个任务到本地。
📌 影响:
- 吞吐更高(减少 broker 往返)
- 但会导致:
- 任务分配不公平(某个 worker 抢太多)
- 长任务时拖尾严重(别的 worker 空闲但任务被抢走)
✅ 建议(经验值):
- 长任务:
worker_prefetch_multiplier = 1 - 短任务:可以 >1 追求吞吐
3.4 ACK 时机:可靠性与重复执行的核心
这里是最关键的可靠性分水岭。
方案 A:早 ACK(默认常见:acks_late=False)
- Worker 一拿到任务就 ack
- 优点:队列看起来干净、吞吐高
- 缺点:任务执行中 worker 崩了 → 任务可能丢失(broker 以为已处理完)
方案 B:晚 ACK(acks_late=True)
- Worker 执行完才 ack
- 优点:worker 崩了 → RabbitMQ 会重新投递 → 不丢任务
- 缺点:可能出现重复执行(至少一次语义),任务必须幂等
sequenceDiagram
participant B as Broker
participant W as Worker
B-->>W: deliver msg
alt acks_late=False
W->>B: ACK immediately
W->>W: execute task
else acks_late=True
W->>W: execute task
W->>B: ACK after success
end
📌 必背结论
acks_late=True → 至少一次(可能重复,但不易丢)
acks_late=False → 至多一次(不重复,但可能丢)
3.5 Retry:失败后到底发生了什么?
Celery 的 retry(概念上)通常是:
- 任务抛异常
- Celery 捕获后决定:
- 重新发送一条“同任务的新消息”(可能带 countdown/eta)
- 或 reject/requeue(视策略)
重点:
RabbitMQ 只负责“再次投递消息”,
是否重试、重试几次、间隔多少,是 Celery 的策略。
3.6 Pool(执行池):并发模型决定性能上限
Celery worker 的并发池常见有:
- prefork(多进程,默认常见):适合 CPU 任务、隔离好
- threads(多线程):适合部分 IO,但受 GIL 影响
- gevent/eventlet(协程):高并发 IO(但要配合 monkey patch)
📌 工程建议(简单可靠版):
- 不确定选什么:prefork(最稳)
- 大量 IO(请求第三方、爬虫):考虑 gevent(但要懂它的限制)
- CPU 密集:prefork + 把并发设为 CPU 核数附近
4️⃣ 与 RabbitMQ / Celery 的关系
这一章要把“责任切得更细”:
- RabbitMQ:
- 负责把消息投递给 worker
- 维护 unacked 状态
- worker 掉线时重新投递 unacked(取决于 ack 时机)
- Celery Worker:
- 决定什么时候 ack(早/晚)
- 决定 prefetch(一次拉多少)
- 决定并发池怎么跑(prefork/threads/gevent)
- 决定失败是否 retry、怎么 retry
📌 最核心边界:
RabbitMQ 只知道“消息是否 ack”。
Celery 决定“任务何时算完成”。
5️⃣ 最小可运行示例(Practice)
目标:用一个 demo 让你“肉眼看到”
- prefetch 的效果
- acks_late 的差异(重启 worker 时)
5.1 tasks.py
1 | from celery import Celery |
5.2 启动 worker(观察预取)
建议这样启动(长任务更公平):
1 | celery -A tasks worker --loglevel=info --concurrency=2 |
并在配置里设置(你可以临时在 tasks.py 中加):
1 | app.conf.worker_prefetch_multiplier = 1 |
5.3 发送一堆任务
1 | from tasks import sleep_task |
观察现象:
- multiplier 大时:某个 worker 会“抢走很多任务”,另一个可能闲
- multiplier=1 时:任务分配更均匀
5.4 观察 ack 行为(概念实验)
acks_late=False:任务一到就 ack,worker 执行中崩溃可能丢acks_late=True:执行完才 ack,worker 中途崩溃会被重新投递(可能重复)
(生产环境要配合幂等设计,后面第 9 章会系统讲。)
6️⃣ 📒 本章笔记总结(可直接记录)
- Worker 内部管线:接收 → 预取缓冲 → 反序列化 → 投递执行池 → 执行 → ACK/Retry →(可选)写结果
- Prefetch:
- 预先拉取一批消息到 worker 本地
prefetch_count ≈ concurrency * prefetch_multiplier- 长任务建议
prefetch_multiplier=1
- ACK 时机:
acks_late=False:早 ACK → 吞吐高但可能丢acks_late=True:晚 ACK → 不易丢但可能重复(至少一次)
- RabbitMQ 只关心消息是否 ACK;Celery 决定任务完成语义
- 并发池选择:
- 默认 prefork 最稳
- IO 高并发才考虑 gevent(理解其限制)
- 调参原则:
- 先保证可靠性(ACK/幂等)
- 再调公平性(prefetch)
- 最后追吞吐(并发/池模型)
【第 8 章:常见配置、路由、队列设计策略】
到这一章,你已经理解原理、看懂流程、知道 Worker 在干什么。
现在我们要解决的是一个真正决定系统好不好用的问题:👉 “任务怎么分队列?Worker 怎么分工?配置怎么配才不踩坑?”
这一章非常工程化,目标是:
让你能设计一个“长期可维护、不怕扩展、不怕流量”的任务系统。
1️⃣ 本章要解决什么问题(Why)
很多 Celery 系统在“能跑”之后,会逐渐出现这些问题:
- 所有任务挤在一个队列
- 长任务把短任务全部堵住
- 用户体验极不稳定
- CPU 任务和 IO 任务混跑
- IO 任务把进程占满
- CPU 任务拖慢整体吞吐
- 高优先级任务被低优先级淹没
- 紧急任务等半天
- SLA 无法保障
- Worker 随便加,问题越来越多
- 加了 worker 反而更慢
- 不知道哪个 worker 在处理什么
这些问题的根因只有一个:
队列设计和路由策略一开始没想清楚。
2️⃣ 核心概念与角色(What)
在本章,你需要牢牢记住 3 个“设计维度”:
2.1 队列(Queue)是”资源隔离单元”
不只是“消息容器”
更是:
- 负载隔离
- 优先级隔离
- 资源隔离(CPU / IO)
2.2 路由(Routing)是”调度策略”
决定任务:
- 进哪个队列
- 被哪类 worker 执行
2.3 Worker 是”执行资源池”
- Worker ≠ 队列
- 一个 worker 可以监听多个队列
- 不同 worker 可以监听不同队列
1 | Task → Queue → Worker Pool → CPU / IO |
3️⃣ 工作原理(How)
我们从 “一个任务如何被路由” 讲起,再上升到整体设计。
3.1 任务路由的完整路径
flowchart LR
T[Task] -->|routing_key / queue| E[Exchange]
E --> Q1[queue: cpu_tasks]
E --> Q2[queue: io_tasks]
Q1 --> W1[CPU Worker Pool]
Q2 --> W2[IO Worker Pool]
Celery 路由本质是三件事:
- 给任务打标签(routing_key / queue)
- RabbitMQ 根据 binding 规则投递
- Worker 只消费自己负责的队列
3.2 最重要的队列拆分原则(必背)
❗ 不要按“业务功能”拆队列,而要按“执行特性”拆队列
错误示例 ❌:
1 | queue_order |
正确示例 ✅:
1 | queue_cpu |
📌 工程直觉
谁会互相“拖慢”,谁就必须分开。
3.3 一个推荐的基础队列模型(90% 项目适用)
1 | queues: |
3.4 Worker 分组设计(非常关键)
flowchart LR
Q1[fast_tasks] --> W1[worker-fast]
Q2[slow_tasks] --> W2[worker-slow]
Q3[cpu_tasks] --> W3[worker-cpu]
Q4[io_tasks] --> W4[worker-io]
好处:
- 各类任务互不影响
- worker 参数可针对性调优
- 出问题易定位
4️⃣ 与 RabbitMQ / Celery 的关系
4.1 RabbitMQ 在这里负责什么?
- Exchange / Queue / Binding
- 消息可靠投递
- 队列级别的隔离
4.2 Celery 在这里负责什么?
- 定义路由规则
- 选择 queue / routing_key
- 控制 worker 并发、prefetch、ack 策略
📌 边界总结
RabbitMQ 提供“硬隔离的通道”,
Celery 决定“谁走哪条通道”。
5️⃣ 最小可运行示例(Practice)
目标:一套清晰、可扩展的路由 + 队列配置模板。
5.1 Celery 基础配置(强烈推荐这样写)
1 | # celery_app.py |
5.2 任务路由规则(task_routes)
1 | app.conf.task_routes = { |
📌 好处:
- 路由集中管理
- 新增 worker 不用改任务代码
5.3 Worker 启动方式(按职责启动)
1 | # 快任务 |
6️⃣ 📒 本章笔记总结
队列是 资源隔离单元,不是简单分类
拆队列要按:
- 执行时间
- CPU / IO 特性
- SLA 优先级
避免“一个队列跑所有任务”
路由规则集中在
task_routesWorker 按队列分组启动
CPU / IO / 快慢任务尽量分离
设计目标:
一个队列出问题,不拖垮整个系统
【第 9 章:失败重试、幂等性、任务可靠性】
这一章是真正决定你系统“敢不敢上生产”的一章。
前面你已经学会:
- 怎么投递任务
- 怎么并发执行
- 怎么拆队列、配 worker
但如果你回答不了下面这几个问题,系统一出故障就会失控:
- 任务失败后会不会丢?会不会重复?
- Worker 挂了、网络抖了,会发生什么?
- 为什么 Celery 一定会“可能重复执行”?
- 那我怎么保证 业务数据不被搞乱?
1️⃣ 本章要解决什么问题(Why)
现实世界里的任务失败是常态,不是异常:
- 网络超时
- 第三方 API 5xx
- Worker 进程被 OOM Kill
- 容器/节点重启
- 代码 Bug
如果你不提前设计失败与重试,系统会出现三种灾难之一:
- ❌ 任务丢失(业务数据缺失)
- ❌ 任务重复但不可控(数据被写多次、扣钱多次)
- ❌ 失败雪崩(重试风暴,把系统打死)
👉 本章目标:
建立一套“可预期的失败模型 + 可控的重试策略 + 业务级幂等设计”。
2️⃣ 核心概念与角色(What)
你需要掌握 5 个关键概念(这是可靠性的“骨架”):
| 概念 | 含义 |
|---|---|
| ACK 语义 | 什么时候告诉 Broker “任务完成了” |
| 投递语义 | 至少一次 / 至多一次 / 恰好一次 |
| Retry | 失败后是否重新执行 |
| 幂等性 | 重复执行是否安全 |
| 死信 | 永远失败的任务怎么处理 |
📌 核心事实(必须接受)
Celery + MQ 天然只能保证“至少一次执行”
“恰好一次”必须靠业务幂等来兜底。
3️⃣ 工作原理(How)
我们从“失败发生的时间点”来拆解系统行为。
3.1 失败可能发生在哪些阶段?
flowchart LR
A[消息在 Broker] --> B[Worker 拉取]
B --> C[任务执行中]
C --> D[ACK 之前]
D --> E[ACK 之后]
- A/B 阶段失败:消息还在 MQ → 可重投
- C/D 阶段失败:Worker 挂了但未 ACK → 会被重新投递
- E 阶段失败:已经 ACK → 任务对 MQ 来说“结束了”
📌 关键点
ACK 之前失败 = 可能重试
ACK 之后失败 = MQ 无能为力
3.2 投递语义:至少一次 vs 至多一次
✅ 至少一次(Celery 的默认可靠模型)
acks_late=True- Worker 执行完才 ACK
- Worker 崩溃 → RabbitMQ 重投
结果:
- 任务不会轻易丢
- 但可能重复执行
⚠️ 至多一次(性能优先,慎用)
acks_late=False- 一收到就 ACK
- 执行中崩溃 → 任务丢失
📌 选择原则
能重复 ≫ 能丢
所以生产环境通常选择:至少一次 + 幂等
3.3 Retry 是怎么发生的?
Celery 的 retry 本质上是:
重新发送一条“新任务消息”
1 |
|
背后发生的是:
- 捕获异常
- 生成新任务(同 task_id 或新 task_id)
- 设置 ETA / countdown
- 再次发送给 Broker
📌 RabbitMQ 不知道“这是重试”,
重试完全是 Celery 的决策。
3.4 重试风暴(生产大坑)
如果你写成这样:
1 |
一旦下游服务挂了:
- 所有任务同时失败
- 同时重试
- Broker/Worker 被瞬间打爆
📌 工程防护手段:
- 限制
max_retries - 使用
retry_backoff(指数退避) - 区分“可重试”和“不可重试”的异常
- 必要时熔断(业务层)
3.5 幂等性:真正的”保险丝”
幂等性定义:
同一个任务 执行 1 次和执行 N 次,业务结果一致
这是 Celery 系统必须由你实现的部分。
4️⃣ 与 RabbitMQ / Celery 的关系(边界极其重要)
4.1 RabbitMQ 能保证什么?
- 消息不轻易丢(持久化 + ACK)
- Worker 挂了会重新投递未 ACK 消息
RabbitMQ 不能保证:
- 消息只被执行一次
- 任务业务语义正确
4.2 Celery 能保证什么?
- 自动重试
- 失败感知
- 延迟重试
- 至少一次执行模型
Celery 不能保证:
- 你的业务不会被重复执行破坏
📌 核心结论
“恰好一次”是业务设计问题,不是 MQ 能解决的问题。
5️⃣ 最小可运行示例(Practice)
目标:展示“失败 → 重试 → 幂等保护”的完整链路。
5.1 幂等保护示例(数据库 / Redis)
1 | # tasks.py |
📌 即使任务被重复执行:
- 只有第一次会真正“扣钱”
- 后续直接跳过
5.2 死信(Dead Letter)的概念用法
当任务:
- 超过最大重试次数
- 或被 reject 且不 requeue
你应该:
- 记录到 DB
- 发告警
- 人工介入 / 补偿
永远失败的任务 ≠ 应该无限重试的任务
6️⃣ 📒 本章笔记总结
失败是常态,必须提前设计
Celery + MQ 的可靠模型是:至少一次执行
ACK 决定任务是否会被重投
acks_late=True是生产常用配置Retry 是 Celery 重新发消息,不是 RabbitMQ 魔法
必须防止重试风暴:
- 限次数
- 退避
- 区分异常
幂等性是任务可靠性的最后防线
MQ/Celery 无法保证“恰好一次”,只能由业务保证
永久失败任务要进入“死信/人工处理”流程
【第 10 章:生产环境架构设计与最佳实践】
这是收官章。
到这里,我们不再讲“功能怎么用”,而是回答一个更难的问题:👉 如何把 RabbitMQ + Celery 变成一个“能长期稳定运行、可扩展、可观测、可恢复”的生产系统?
本章目标:
给你一套可以直接画进架构图、写进设计文档、落地上线的参考范式。
1️⃣ 本章要解决什么问题(Why)
很多系统在测试环境看起来“挺好”,一到生产就出问题,常见症状:
流量一高就排队失控
Worker 一挂,任务行为不可预测
出问题时:
- 不知道是 MQ、Worker 还是代码 Bug
任务失败没人管,数据慢慢腐烂
随着业务增长,不敢改、不敢扩
根因只有一个:
缺少“生产级架构意识”——没有从一开始就把可靠性、扩展性、可观测性放进设计里。
2️⃣ 核心概念与角色(What)
生产环境里,你要把系统看成 5 层结构(而不是一个 Celery 进程):
| 层级 | 角色 | 关注点 |
|---|---|---|
| 接入层 | Web / API / Cron | 快速返回、限流 |
| 调度层 | Celery Producer | 路由、延迟、失败感知 |
| 传输层 | RabbitMQ | 稳定投递、削峰 |
| 执行层 | Celery Worker | 并发、资源隔离 |
| 可观测层 | Logs / Metrics / Alerts | 可定位、可恢复 |
📌 关键认知
生产架构不是“多加机器”,而是把职责切清楚。
3️⃣ 工作原理(How)
下面给你一套典型生产架构参考图,可以直接照着画。
3.1 标准生产架构(推荐起点)
flowchart LR
Client --> API[Web/API Server]
API -->|delay/apply_async| C[Celery Producer]
C -->|publish| MQ[(RabbitMQ Cluster)]
MQ -->|queue: fast| WF[Worker Fast]
MQ -->|queue: io| WIO[Worker IO]
MQ -->|queue: cpu| WCPU[Worker CPU]
MQ -->|queue: slow| WS[Worker Slow]
WF --> R[(Result Backend)]
WIO --> R
WCPU --> R
WS --> R
WF --> LOG[Logs/Metrics]
WIO --> LOG
WCPU --> LOG
WS --> LOG
3.2 从”单机”到”集群”的演进路径(非常重要)
阶段 1:单机起步(可用)
- 1 个 RabbitMQ
- 1~2 个 Worker
- 分 2~3 个队列
阶段 2:水平扩展(常见)
- RabbitMQ 升级为集群
- Worker 按队列横向扩
- Web / Worker 完全解耦
阶段 3:资源隔离(成熟)
- CPU / IO / 快慢任务彻底分离
- 不同队列跑在不同节点
- 单队列故障不拖全局
📌 经验
扩展的是 Worker,不是 Producer,也不是 MQ 逻辑。
4️⃣ 与 RabbitMQ / Celery 的关系(生产级边界)
4.1 RabbitMQ:稳、少动
- 集群化
- 开启持久化
- 设置合理的内存/磁盘水位
- 队列数量受控(避免百万级)
RabbitMQ 的原则:
少而稳,不要当计算系统用。
4.2 Celery Worker:可变、可控
Worker 是最常扩缩的部分
按队列、按资源类型拆
配置:
concurrencyprefetch_multiplieracks_late- pool 类型
Celery 的原则:
Worker 是“消耗资源”的地方,必须被严格管理。
5️⃣ 最小可运行示例(Practice)
这里给你的是 “生产可直接用的配置模板”(简化但不玩具)。
5.1 Celery 全局配置(推荐)
1 | # celeryconfig.py |
5.2 Worker 启动策略(示例)
1 | # 快任务 |
6️⃣ 📒 本章笔记总结
生产环境核心目标:
- 不丢
- 可控
- 可扩
- 可观测
架构要点:
- Producer 快速返回
- RabbitMQ 稳定投递、削峰
- Worker 按资源/优先级分组
队列设计是长期维护成本的决定因素
Worker 是最“重”的组件,也是最容易横向扩展的
至少一次执行 + 业务幂等是现实世界的正确选择
日志 / 指标 / 告警 ≠ 可选项,是系统的一部分
Celery + RabbitMQ 是一套“工程系统”,不是一个库