【第 8 章:常见配置、路由、队列设计策略】

到这一章,你已经理解原理、看懂流程、知道 Worker 在干什么
现在我们要解决的是一个真正决定系统好不好用的问题

👉 “任务怎么分队列?Worker 怎么分工?配置怎么配才不踩坑?”

这一章非常工程化,目标是:
让你能设计一个“长期可维护、不怕扩展、不怕流量”的任务系统。


1️⃣ 本章要解决什么问题(Why)

很多 Celery 系统在“能跑”之后,会逐渐出现这些问题:

  1. 所有任务挤在一个队列
  • 长任务把短任务全部堵住
  • 用户体验极不稳定
  1. CPU 任务和 IO 任务混跑
  • IO 任务把进程占满
  • CPU 任务拖慢整体吞吐
  1. 高优先级任务被低优先级淹没
  • 紧急任务等半天
  • SLA 无法保障
  1. Worker 随便加,问题越来越多
  • 加了 worker 反而更慢
  • 不知道哪个 worker 在处理什么

这些问题的根因只有一个:

队列设计和路由策略一开始没想清楚。


2️⃣ 核心概念与角色(What)

在本章,你需要牢牢记住 3 个“设计维度”

2.1 队列(Queue)是”资源隔离单元”

  • 不只是“消息容器”

  • 更是:

    • 负载隔离
    • 优先级隔离
    • 资源隔离(CPU / IO)

2.2 路由(Routing)是”调度策略”

  • 决定任务:

    • 进哪个队列
    • 被哪类 worker 执行

2.3 Worker 是”执行资源池”

  • Worker ≠ 队列
  • 一个 worker 可以监听多个队列
  • 不同 worker 可以监听不同队列
1
Task → Queue → Worker Pool → CPU / IO

3️⃣ 工作原理(How)

我们从 “一个任务如何被路由” 讲起,再上升到整体设计。


3.1 任务路由的完整路径

flowchart LR
    T[Task] -->|routing_key / queue| E[Exchange]
    E --> Q1[queue: cpu_tasks]
    E --> Q2[queue: io_tasks]
    Q1 --> W1[CPU Worker Pool]
    Q2 --> W2[IO Worker Pool]

Celery 路由本质是三件事:

  1. 给任务打标签(routing_key / queue)
  2. RabbitMQ 根据 binding 规则投递
  3. Worker 只消费自己负责的队列

3.2 最重要的队列拆分原则(必背)

不要按“业务功能”拆队列,而要按“执行特性”拆队列

错误示例 ❌:

1
2
3
queue_order
queue_user
queue_payment

正确示例 ✅:

1
2
3
4
5
queue_cpu
queue_io
queue_fast
queue_slow
queue_critical

📌 工程直觉

谁会互相“拖慢”,谁就必须分开。


3.3 一个推荐的基础队列模型(90% 项目适用)

1
2
3
4
5
queues:
- fast_tasks (短、快、用户敏感)
- slow_tasks (长任务、后台)
- cpu_tasks (CPU 密集)
- io_tasks (IO 密集)

3.4 Worker 分组设计(非常关键)

flowchart LR
    Q1[fast_tasks] --> W1[worker-fast]
    Q2[slow_tasks] --> W2[worker-slow]
    Q3[cpu_tasks] --> W3[worker-cpu]
    Q4[io_tasks] --> W4[worker-io]

好处:

  • 各类任务互不影响
  • worker 参数可针对性调优
  • 出问题易定位

4️⃣ 与 RabbitMQ / Celery 的关系

4.1 RabbitMQ 在这里负责什么?

  • Exchange / Queue / Binding
  • 消息可靠投递
  • 队列级别的隔离

4.2 Celery 在这里负责什么?

  • 定义路由规则
  • 选择 queue / routing_key
  • 控制 worker 并发、prefetch、ack 策略

📌 边界总结

RabbitMQ 提供“硬隔离的通道”,
Celery 决定“谁走哪条通道”。


5️⃣ 最小可运行示例(Practice)

目标:一套清晰、可扩展的路由 + 队列配置模板


5.1 Celery 基础配置(强烈推荐这样写)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# celery_app.py
from celery import Celery
from kombu import Queue

app = Celery(
"demo",
broker="amqp://guest:guest@localhost:5672//",
)

app.conf.task_queues = (
Queue("fast_tasks"),
Queue("slow_tasks"),
Queue("cpu_tasks"),
Queue("io_tasks"),
)

app.conf.task_default_queue = "slow_tasks"

5.2 任务路由规则(task_routes)

1
2
3
4
5
6
app.conf.task_routes = {
"tasks.send_email": {"queue": "io_tasks"},
"tasks.resize_image": {"queue": "cpu_tasks"},
"tasks.generate_report": {"queue": "slow_tasks"},
"tasks.quick_check": {"queue": "fast_tasks"},
}

📌 好处:

  • 路由集中管理
  • 新增 worker 不用改任务代码

5.3 Worker 启动方式(按职责启动)

1
2
3
4
5
6
7
8
9
10
11
# 快任务
celery -A celery_app worker -Q fast_tasks -c 8

# 慢任务
celery -A celery_app worker -Q slow_tasks -c 2

# CPU 任务
celery -A celery_app worker -Q cpu_tasks -c 4

# IO 任务
celery -A celery_app worker -Q io_tasks -c 50 -P gevent

6️⃣ 📒 本章笔记总结

  • 队列是 资源隔离单元,不是简单分类

  • 拆队列要按:

    • 执行时间
    • CPU / IO 特性
    • SLA 优先级
  • 避免“一个队列跑所有任务”

  • 路由规则集中在 task_routes

  • Worker 按队列分组启动

  • CPU / IO / 快慢任务尽量分离

  • 设计目标:
    一个队列出问题,不拖垮整个系统



【第 9 章:失败重试、幂等性、任务可靠性】

这一章是真正决定你系统“敢不敢上生产”的一章。

前面你已经学会:

  • 怎么投递任务
  • 怎么并发执行
  • 怎么拆队列、配 worker

但如果你回答不了下面这几个问题,系统一出故障就会失控:

  • 任务失败后会不会丢?会不会重复?
  • Worker 挂了、网络抖了,会发生什么?
  • 为什么 Celery 一定会“可能重复执行”?
  • 那我怎么保证 业务数据不被搞乱

1️⃣ 本章要解决什么问题(Why)

现实世界里的任务失败是常态,不是异常:

  • 网络超时
  • 第三方 API 5xx
  • Worker 进程被 OOM Kill
  • 容器/节点重启
  • 代码 Bug

如果你不提前设计失败与重试,系统会出现三种灾难之一:

  1. 任务丢失(业务数据缺失)
  2. 任务重复但不可控(数据被写多次、扣钱多次)
  3. 失败雪崩(重试风暴,把系统打死)

👉 本章目标:
建立一套“可预期的失败模型 + 可控的重试策略 + 业务级幂等设计”。


2️⃣ 核心概念与角色(What)

你需要掌握 5 个关键概念(这是可靠性的“骨架”):

概念 含义
ACK 语义 什么时候告诉 Broker “任务完成了”
投递语义 至少一次 / 至多一次 / 恰好一次
Retry 失败后是否重新执行
幂等性 重复执行是否安全
死信 永远失败的任务怎么处理

📌 核心事实(必须接受)

Celery + MQ 天然只能保证“至少一次执行”
“恰好一次”必须靠业务幂等来兜底。


3️⃣ 工作原理(How)

我们从“失败发生的时间点”来拆解系统行为。


3.1 失败可能发生在哪些阶段?

flowchart LR
    A[消息在 Broker] --> B[Worker 拉取]
    B --> C[任务执行中]
    C --> D[ACK 之前]
    D --> E[ACK 之后]
  • A/B 阶段失败:消息还在 MQ → 可重投
  • C/D 阶段失败:Worker 挂了但未 ACK → 会被重新投递
  • E 阶段失败:已经 ACK → 任务对 MQ 来说“结束了”

📌 关键点

ACK 之前失败 = 可能重试
ACK 之后失败 = MQ 无能为力


3.2 投递语义:至少一次 vs 至多一次

✅ 至少一次(Celery 的默认可靠模型)

  • acks_late=True
  • Worker 执行完才 ACK
  • Worker 崩溃 → RabbitMQ 重投

结果

  • 任务不会轻易丢
  • 可能重复执行

⚠️ 至多一次(性能优先,慎用)

  • acks_late=False
  • 一收到就 ACK
  • 执行中崩溃 → 任务丢失

📌 选择原则

能重复 ≫ 能丢
所以生产环境通常选择:至少一次 + 幂等


3.3 Retry 是怎么发生的?

Celery 的 retry 本质上是:

重新发送一条“新任务消息”

1
2
3
@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={"max_retries": 3})
def fragile_task(self):
...

背后发生的是:

  1. 捕获异常
  2. 生成新任务(同 task_id 或新 task_id)
  3. 设置 ETA / countdown
  4. 再次发送给 Broker

📌 RabbitMQ 不知道“这是重试”,

重试完全是 Celery 的决策。


3.4 重试风暴(生产大坑)

如果你写成这样:

1
@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={"max_retries": None})

一旦下游服务挂了:

  • 所有任务同时失败
  • 同时重试
  • Broker/Worker 被瞬间打爆

📌 工程防护手段:

  • 限制 max_retries
  • 使用 retry_backoff(指数退避)
  • 区分“可重试”和“不可重试”的异常
  • 必要时熔断(业务层)

3.5 幂等性:真正的”保险丝”

幂等性定义

同一个任务 执行 1 次和执行 N 次,业务结果一致

这是 Celery 系统必须由你实现的部分


4️⃣ 与 RabbitMQ / Celery 的关系(边界极其重要)

4.1 RabbitMQ 能保证什么?

  • 消息不轻易丢(持久化 + ACK)
  • Worker 挂了会重新投递未 ACK 消息

RabbitMQ 不能保证:

  • 消息只被执行一次
  • 任务业务语义正确

4.2 Celery 能保证什么?

  • 自动重试
  • 失败感知
  • 延迟重试
  • 至少一次执行模型

Celery 不能保证:

  • 你的业务不会被重复执行破坏

📌 核心结论

“恰好一次”是业务设计问题,不是 MQ 能解决的问题。


5️⃣ 最小可运行示例(Practice)

目标:展示“失败 → 重试 → 幂等保护”的完整链路


5.1 幂等保护示例(数据库 / Redis)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# tasks.py
from celery import Celery
import redis
import time

app = Celery(
"demo",
broker="amqp://guest:guest@localhost:5672//",
)

rds = redis.Redis(host="localhost", port=6379, db=0)

@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=3, retry_kwargs={"max_retries": 5})
def pay_order(self, order_id):
key = f"order_paid:{order_id}"

# 幂等检查
if rds.exists(key):
print("already processed, skip")
return "ok"

# 模拟不稳定下游
if time.time() % 2 < 1:
raise RuntimeError("payment gateway error")

# 关键业务操作(只能执行一次)
print("do payment")
rds.set(key, 1)

return "paid"

📌 即使任务被重复执行:

  • 只有第一次会真正“扣钱”
  • 后续直接跳过

5.2 死信(Dead Letter)的概念用法

当任务:

  • 超过最大重试次数
  • 或被 reject 且不 requeue

你应该:

  • 记录到 DB
  • 发告警
  • 人工介入 / 补偿

永远失败的任务 ≠ 应该无限重试的任务


6️⃣ 📒 本章笔记总结

  • 失败是常态,必须提前设计

  • Celery + MQ 的可靠模型是:至少一次执行

  • ACK 决定任务是否会被重投

  • acks_late=True 是生产常用配置

  • Retry 是 Celery 重新发消息,不是 RabbitMQ 魔法

  • 必须防止重试风暴:

    • 限次数
    • 退避
    • 区分异常
  • 幂等性是任务可靠性的最后防线

  • MQ/Celery 无法保证“恰好一次”,只能由业务保证

  • 永久失败任务要进入“死信/人工处理”流程



【第 10 章:生产环境架构设计与最佳实践】

这是收官章
到这里,我们不再讲“功能怎么用”,而是回答一个更难的问题:

👉 如何把 RabbitMQ + Celery 变成一个“能长期稳定运行、可扩展、可观测、可恢复”的生产系统?

本章目标:
给你一套可以直接画进架构图、写进设计文档、落地上线的参考范式。


1️⃣ 本章要解决什么问题(Why)

很多系统在测试环境看起来“挺好”,一到生产就出问题,常见症状:

  1. 流量一高就排队失控

  2. Worker 一挂,任务行为不可预测

  3. 出问题时:

    • 不知道是 MQ、Worker 还是代码 Bug
  4. 任务失败没人管,数据慢慢腐烂

  5. 随着业务增长,不敢改、不敢扩

根因只有一个:

缺少“生产级架构意识”——没有从一开始就把可靠性、扩展性、可观测性放进设计里。


2️⃣ 核心概念与角色(What)

生产环境里,你要把系统看成 5 层结构(而不是一个 Celery 进程):

层级 角色 关注点
接入层 Web / API / Cron 快速返回、限流
调度层 Celery Producer 路由、延迟、失败感知
传输层 RabbitMQ 稳定投递、削峰
执行层 Celery Worker 并发、资源隔离
可观测层 Logs / Metrics / Alerts 可定位、可恢复

📌 关键认知

生产架构不是“多加机器”,而是把职责切清楚


3️⃣ 工作原理(How)

下面给你一套典型生产架构参考图,可以直接照着画。


3.1 标准生产架构(推荐起点)

flowchart LR
    Client --> API[Web/API Server]

    API -->|delay/apply_async| C[Celery Producer]

    C -->|publish| MQ[(RabbitMQ Cluster)]

    MQ -->|queue: fast| WF[Worker Fast]
    MQ -->|queue: io| WIO[Worker IO]
    MQ -->|queue: cpu| WCPU[Worker CPU]
    MQ -->|queue: slow| WS[Worker Slow]

    WF --> R[(Result Backend)]
    WIO --> R
    WCPU --> R
    WS --> R

    WF --> LOG[Logs/Metrics]
    WIO --> LOG
    WCPU --> LOG
    WS --> LOG

3.2 从”单机”到”集群”的演进路径(非常重要)

阶段 1:单机起步(可用)

  • 1 个 RabbitMQ
  • 1~2 个 Worker
  • 分 2~3 个队列

阶段 2:水平扩展(常见)

  • RabbitMQ 升级为集群
  • Worker 按队列横向扩
  • Web / Worker 完全解耦

阶段 3:资源隔离(成熟)

  • CPU / IO / 快慢任务彻底分离
  • 不同队列跑在不同节点
  • 单队列故障不拖全局

📌 经验

扩展的是 Worker,不是 Producer,也不是 MQ 逻辑。


4️⃣ 与 RabbitMQ / Celery 的关系(生产级边界)

4.1 RabbitMQ:稳、少动

  • 集群化
  • 开启持久化
  • 设置合理的内存/磁盘水位
  • 队列数量受控(避免百万级)

RabbitMQ 的原则:

少而稳,不要当计算系统用。


4.2 Celery Worker:可变、可控

  • Worker 是最常扩缩的部分

  • 按队列、按资源类型拆

  • 配置:

    • concurrency
    • prefetch_multiplier
    • acks_late
    • pool 类型

Celery 的原则:

Worker 是“消耗资源”的地方,必须被严格管理。


5️⃣ 最小可运行示例(Practice)

这里给你的是 “生产可直接用的配置模板”(简化但不玩具)。


5.1 Celery 全局配置(推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# celeryconfig.py
task_acks_late = True
worker_prefetch_multiplier = 1

task_reject_on_worker_lost = True

task_default_queue = "slow_tasks"

task_routes = {
"tasks.quick_*": {"queue": "fast_tasks"},
"tasks.io_*": {"queue": "io_tasks"},
"tasks.cpu_*": {"queue": "cpu_tasks"},
}

task_track_started = True
result_expires = 3600

broker_connection_retry_on_startup = True

5.2 Worker 启动策略(示例)

1
2
3
4
5
6
7
8
9
10
11
# 快任务
celery -A app worker -Q fast_tasks -c 8

# IO 密集
celery -A app worker -Q io_tasks -P gevent -c 100

# CPU 密集
celery -A app worker -Q cpu_tasks -c 4

# 慢后台
celery -A app worker -Q slow_tasks -c 2

6️⃣ 📒 本章笔记总结

  • 生产环境核心目标:

    • 不丢
    • 可控
    • 可扩
    • 可观测
  • 架构要点:

    • Producer 快速返回
    • RabbitMQ 稳定投递、削峰
    • Worker 按资源/优先级分组
  • 队列设计是长期维护成本的决定因素

  • Worker 是最“重”的组件,也是最容易横向扩展的

  • 至少一次执行 + 业务幂等是现实世界的正确选择

  • 日志 / 指标 / 告警 ≠ 可选项,是系统的一部分

  • Celery + RabbitMQ 是一套“工程系统”,不是一个库





本站总访问量 加载中...
今日访客数 加载中...
本页总访客数 加载中...
发表了 36 篇文章 🔸 总计 111.4k 字