282 字
1 分钟
Python 并发与异步编程
一、GIL 机制
1.1 什么是 GIL
# GIL: Global Interpreter Lock# CPython 的互斥锁,保证同一时刻只有一个线程执行 Python 字节码
# 问题:Python 字节码执行时,解释器需要锁定# 同一进程内的所有线程共享 GIL
import threadingimport time
# CPU 密集型任务def cpu_bound(n): return sum(i * i for i in range(n))
# 由于 GIL,多线程对 CPU 密集型任务无效# 应该使用多进程绕过 GIL1.2 GIL 的影响
| 场景 | 建议 | 原因 |
|---|---|---|
| CPU 密集型 | 多进程 | GIL 限制只能一个线程执行字节码 |
| IO 密集型 | 多线程/asyncio | IO 时会释放 GIL |
| 混合型 | 多进程 + 异步 | 结合两者优势 |
# CPU 密集型:应该用多进程import multiprocessing
def cpu_task(n): return sum(i * i for i in range(n))
if __name__ == "__main__": with multiprocessing.Pool(4) as pool: results = pool.map(cpu_task, [10**7] * 8)1.3 释放 GIL 的方式
# 1. C 扩展:长时间计算的 C 函数可释放 GIL# 2. ctypes:调用 C 库时释放 GIL# 3. PyPy:使用 JIT 编译器,减少 GIL 影响# 4. 其他 Python 实现:Jython, IronPython 无 GIL
# 示例:使用 numpy(底层 C 实现)import numpy as np# numpy 操作不经过 Python 字节码,GIL 影响小a = np.random.rand(10000, 10000)b = np.dot(a, a.T) # 并行计算二、asyncio 异步编程
2.1 协程基础
import asyncio
# async def 定义协程函数async def fetch(url): await asyncio.sleep(1) # 模拟 IO 操作 return f"data from {url}"
# 运行协程async def main(): result = await fetch("example.com") print(result)
asyncio.run(main())
# 协程不能直接调用# fetch("example.com") # 不会执行!2.2 并发执行
import asyncio
async def fetch(url, delay): await asyncio.sleep(delay) return f"data from {url}"
async def main(): # gather: 并发执行多个协程 results = await asyncio.gather( fetch("url1", 1), fetch("url2", 2), fetch("url3", 0.5) ) print(results)
asyncio.run(main())2.3 创建任务
import asyncio
async def main(): # create_task: 创建后台任务 task = asyncio.create_task(fetch("url1", 1))
# 任务状态 print(task.done()) # False print(task.result()) # 等待完成并获取结果
# wait: 等待一组任务 task1 = asyncio.create_task(fetch("url1", 1)) task2 = asyncio.create_task(fetch("url2", 2))
done, pending = await asyncio.wait([task1, task2])
for task in done: print(task.result())
asyncio.run(main())2.4 异步上下文管理器
import asyncio
class AsyncResource: async def __aenter__(self): print("获取资源") await asyncio.sleep(0.1) return self
async def __aexit__(self, exc_type, exc_val, exc_tb): print("释放资源") await asyncio.sleep(0.1)
async def main(): async with AsyncResource() as resource: print("使用资源")
asyncio.run(main())2.5 异步迭代器与生成器
import asyncio
# 异步生成器async def async_generator(): for i in range(5): await asyncio.sleep(0.1) yield i
async def main(): # 异步迭代 async for item in async_generator(): print(item)
# 异步列表推导式 results = [item async for item in async_generator()] print(results)
asyncio.run(main())三、多线程 threading
3.1 基础用法
import threadingimport time
def worker(name, seconds): print(f"线程 {name} 开始") time.sleep(seconds) print(f"线程 {name} 结束")
# 创建线程t1 = threading.Thread(target=worker, args=("A", 1))t2 = threading.Thread(target=worker, args=("B", 2))
t1.start() # 启动线程t2.start()
t1.join() # 等待线程结束t2.join()
print("所有线程结束")3.2 线程同步
import threading
# 互斥锁counter = 0lock = threading.Lock()
def increment(): global counter with lock: # 自动获取和释放锁 counter += 1
threads = [threading.Thread(target=increment) for _ in range(1000)]for t in threads: t.start()for t in threads: t.join()print(counter) # 10003.3 线程安全的数据结构
import threading
# Queue 是线程安全的from queue import Queue
q = Queue()q.put("item")item = q.get() # 阻塞直到有元素
# ThreadLocal 线程本地存储local_data = threading.local()
def worker(): local_data.value = threading.current_thread().name print(local_data.value)
threads = [threading.Thread(target=worker) for _ in range(3)]for t in threads: t.start()3.4 读写锁
import threading
class ReadWriteLock: def __init__(self): self._read_ready = threading.Condition(threading.Lock()) self._readers = 0
def acquire_read(self): with self._read_ready: self._readers += 1
def release_read(self): with self._read_ready: self._readers -= 1 if self._readers == 0: self._read_ready.notify_all()
def acquire_write(self): self._read_ready.acquire() while self._readers > 0: self._read_ready.wait()
def release_write(self): self._read_ready.release()
# RWLock 允许并发读,但写时独占四、多进程 multiprocessing
4.1 基础用法
import multiprocessing
def worker(name): return f"Worker {name}"
if __name__ == "__main__": # 创建进程池 with multiprocessing.Pool(4) as pool: results = pool.map(worker, ["A", "B", "C", "D"]) print(results) # ['Worker A', 'Worker B', 'Worker C', 'Worker D']
# 单个进程 p = multiprocessing.Process(target=worker, args=("X",)) p.start() p.join()4.2 进程间通信
import multiprocessing
# Queue - 进程间队列def producer(queue): for i in range(5): queue.put(i)
def consumer(queue): while not queue.empty(): print(queue.get())
if __name__ == "__main__": queue = multiprocessing.Queue() p1 = multiprocessing.Process(target=producer, args=(queue,)) p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start() p1.join() p2.start() p2.join()
# Pipe - 双工管道def proc1(conn): conn.send("hello") msg = conn.recv() print(msg)
if __name__ == "__main__": conn1, conn2 = multiprocessing.Pipe() p = multiprocessing.Process(target=proc1, args=(conn1,)) p.start() print(conn2.recv()) # hello conn2.send("world") p.join()4.3 共享内存
import multiprocessing
# 共享值value = multiprocessing.Value('i', 0)lock = multiprocessing.Lock()
def increment(): with lock: value.value += 1
# 共享数组arr = multiprocessing.Array('i', [1, 2, 3])
# Manager - 共享数据结构manager = multiprocessing.Manager()shared_dict = manager.dict()shared_list = manager.list()五、线程 vs 进程 vs 协程
5.1 对比表
| 特性 | 线程 | 进程 | 协程 |
|---|---|---|---|
| 内存共享 | 共享进程内存 | 独立内存 | 共享进程内存 |
| GIL 影响 | 受限 | 无影响 | 无影响 |
| 创建开销 | 小 | 大 | 极小 |
| 切换开销 | 中等 | 大 | 极小 |
| 适用场景 | IO 密集型 | CPU 密集型 | IO 密集型 |
| 复杂度 | 中等 | 高 | 低 |
5.2 选择指南
# CPU 密集型:多进程import multiprocessing
def compute(data): return sum(i * i for i in range(data))
with multiprocessing.Pool(4) as pool: results = pool.map(compute, [10**7] * 10)
# IO 密集型:asyncio(首选)或 threadingimport asyncio
async def fetch_all(urls): async with aiohttp.ClientSession() as session: tasks = [fetch(session, url) for url in urls] return await asyncio.gather(*tasks)
# 混合型:多进程 + 协程# 每个进程运行一个事件循环5.3 经典问题:生产者-消费者
# asyncio 版本import asyncio
async def producer(queue): for i in range(10): await queue.put(i) await asyncio.sleep(0.1)
async def consumer(queue): while True: item = await queue.get() if item is None: break print(item)
async def main(): queue = asyncio.Queue() await asyncio.gather( producer(queue), consumer(queue) )
asyncio.run(main())
# threading 版本import threadingfrom queue import Queue
def producer(queue): for i in range(10): queue.put(i)
def consumer(queue): while True: item = queue.get() if item is None: break print(item)
queue = Queue()threads = [ threading.Thread(target=producer, args=(queue,)), threading.Thread(target=consumer, args=(queue,))]for t in threads: t.start()for t in threads: t.join()支持与分享
如果这篇文章对你有帮助,欢迎支持作者或分享给更多人
部分信息可能已经过时
相关文章 智能推荐
1
Python 函数与高级特性
面试 Python 函数高级特性——装饰器原理、生成器与迭代器协议、上下文管理器、偏函数与可调用对象。
2
Python 基础语法与核心概念
面试 Python 基础语法核心概念——数据类型对比、is 与 == 区别、深拷贝与浅拷贝、命名空间与作用域。
3
Kubernetes 安全与 RBAC
面试 Kubernetes 安全——RBAC 权限模型、Security Context、Pod Security Standards、网络策略、Secret 管理。
4
Python 面向对象与设计模式
面试 Python 面向对象编程——类变量与实例变量、继承与 MRO、常见设计模式、单例模式实现。
5
Python 内存管理与性能优化
面试 Python 内存管理——引用计数、垃圾回收机制、分代回收、内存泄漏、__slots__、性能优化技巧。






