mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4
282 字
1 分钟
Python 并发与异步编程
2023-12-12

一、GIL 机制#

1.1 什么是 GIL#

# GIL: Global Interpreter Lock
# CPython 的互斥锁,保证同一时刻只有一个线程执行 Python 字节码
# 问题:Python 字节码执行时,解释器需要锁定
# 同一进程内的所有线程共享 GIL
import threading
import time
# CPU 密集型任务
def cpu_bound(n):
return sum(i * i for i in range(n))
# 由于 GIL,多线程对 CPU 密集型任务无效
# 应该使用多进程绕过 GIL

1.2 GIL 的影响#

场景建议原因
CPU 密集型多进程GIL 限制只能一个线程执行字节码
IO 密集型多线程/asyncioIO 时会释放 GIL
混合型多进程 + 异步结合两者优势
# CPU 密集型:应该用多进程
import multiprocessing
def cpu_task(n):
return sum(i * i for i in range(n))
if __name__ == "__main__":
with multiprocessing.Pool(4) as pool:
results = pool.map(cpu_task, [10**7] * 8)

1.3 释放 GIL 的方式#

# 1. C 扩展:长时间计算的 C 函数可释放 GIL
# 2. ctypes:调用 C 库时释放 GIL
# 3. PyPy:使用 JIT 编译器,减少 GIL 影响
# 4. 其他 Python 实现:Jython, IronPython 无 GIL
# 示例:使用 numpy(底层 C 实现)
import numpy as np
# numpy 操作不经过 Python 字节码,GIL 影响小
a = np.random.rand(10000, 10000)
b = np.dot(a, a.T) # 并行计算

二、asyncio 异步编程#

2.1 协程基础#

import asyncio
# async def 定义协程函数
async def fetch(url):
await asyncio.sleep(1) # 模拟 IO 操作
return f"data from {url}"
# 运行协程
async def main():
result = await fetch("example.com")
print(result)
asyncio.run(main())
# 协程不能直接调用
# fetch("example.com") # 不会执行!

2.2 并发执行#

import asyncio
async def fetch(url, delay):
await asyncio.sleep(delay)
return f"data from {url}"
async def main():
# gather: 并发执行多个协程
results = await asyncio.gather(
fetch("url1", 1),
fetch("url2", 2),
fetch("url3", 0.5)
)
print(results)
asyncio.run(main())

2.3 创建任务#

import asyncio
async def main():
# create_task: 创建后台任务
task = asyncio.create_task(fetch("url1", 1))
# 任务状态
print(task.done()) # False
print(task.result()) # 等待完成并获取结果
# wait: 等待一组任务
task1 = asyncio.create_task(fetch("url1", 1))
task2 = asyncio.create_task(fetch("url2", 2))
done, pending = await asyncio.wait([task1, task2])
for task in done:
print(task.result())
asyncio.run(main())

2.4 异步上下文管理器#

import asyncio
class AsyncResource:
async def __aenter__(self):
print("获取资源")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("释放资源")
await asyncio.sleep(0.1)
async def main():
async with AsyncResource() as resource:
print("使用资源")
asyncio.run(main())

2.5 异步迭代器与生成器#

import asyncio
# 异步生成器
async def async_generator():
for i in range(5):
await asyncio.sleep(0.1)
yield i
async def main():
# 异步迭代
async for item in async_generator():
print(item)
# 异步列表推导式
results = [item async for item in async_generator()]
print(results)
asyncio.run(main())

三、多线程 threading#

3.1 基础用法#

import threading
import time
def worker(name, seconds):
print(f"线程 {name} 开始")
time.sleep(seconds)
print(f"线程 {name} 结束")
# 创建线程
t1 = threading.Thread(target=worker, args=("A", 1))
t2 = threading.Thread(target=worker, args=("B", 2))
t1.start() # 启动线程
t2.start()
t1.join() # 等待线程结束
t2.join()
print("所有线程结束")

3.2 线程同步#

import threading
# 互斥锁
counter = 0
lock = threading.Lock()
def increment():
global counter
with lock: # 自动获取和释放锁
counter += 1
threads = [threading.Thread(target=increment) for _ in range(1000)]
for t in threads: t.start()
for t in threads: t.join()
print(counter) # 1000

3.3 线程安全的数据结构#

import threading
# Queue 是线程安全的
from queue import Queue
q = Queue()
q.put("item")
item = q.get() # 阻塞直到有元素
# ThreadLocal 线程本地存储
local_data = threading.local()
def worker():
local_data.value = threading.current_thread().name
print(local_data.value)
threads = [threading.Thread(target=worker) for _ in range(3)]
for t in threads: t.start()

3.4 读写锁#

import threading
class ReadWriteLock:
def __init__(self):
self._read_ready = threading.Condition(threading.Lock())
self._readers = 0
def acquire_read(self):
with self._read_ready:
self._readers += 1
def release_read(self):
with self._read_ready:
self._readers -= 1
if self._readers == 0:
self._read_ready.notify_all()
def acquire_write(self):
self._read_ready.acquire()
while self._readers > 0:
self._read_ready.wait()
def release_write(self):
self._read_ready.release()
# RWLock 允许并发读,但写时独占

四、多进程 multiprocessing#

4.1 基础用法#

import multiprocessing
def worker(name):
return f"Worker {name}"
if __name__ == "__main__":
# 创建进程池
with multiprocessing.Pool(4) as pool:
results = pool.map(worker, ["A", "B", "C", "D"])
print(results) # ['Worker A', 'Worker B', 'Worker C', 'Worker D']
# 单个进程
p = multiprocessing.Process(target=worker, args=("X",))
p.start()
p.join()

4.2 进程间通信#

import multiprocessing
# Queue - 进程间队列
def producer(queue):
for i in range(5):
queue.put(i)
def consumer(queue):
while not queue.empty():
print(queue.get())
if __name__ == "__main__":
queue = multiprocessing.Queue()
p1 = multiprocessing.Process(target=producer, args=(queue,))
p2 = multiprocessing.Process(target=consumer, args=(queue,))
p1.start()
p1.join()
p2.start()
p2.join()
# Pipe - 双工管道
def proc1(conn):
conn.send("hello")
msg = conn.recv()
print(msg)
if __name__ == "__main__":
conn1, conn2 = multiprocessing.Pipe()
p = multiprocessing.Process(target=proc1, args=(conn1,))
p.start()
print(conn2.recv()) # hello
conn2.send("world")
p.join()

4.3 共享内存#

import multiprocessing
# 共享值
value = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
def increment():
with lock:
value.value += 1
# 共享数组
arr = multiprocessing.Array('i', [1, 2, 3])
# Manager - 共享数据结构
manager = multiprocessing.Manager()
shared_dict = manager.dict()
shared_list = manager.list()

五、线程 vs 进程 vs 协程#

5.1 对比表#

特性线程进程协程
内存共享共享进程内存独立内存共享进程内存
GIL 影响受限无影响无影响
创建开销极小
切换开销中等极小
适用场景IO 密集型CPU 密集型IO 密集型
复杂度中等

5.2 选择指南#

# CPU 密集型:多进程
import multiprocessing
def compute(data):
return sum(i * i for i in range(data))
with multiprocessing.Pool(4) as pool:
results = pool.map(compute, [10**7] * 10)
# IO 密集型:asyncio(首选)或 threading
import asyncio
async def fetch_all(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
return await asyncio.gather(*tasks)
# 混合型:多进程 + 协程
# 每个进程运行一个事件循环

5.3 经典问题:生产者-消费者#

# asyncio 版本
import asyncio
async def producer(queue):
for i in range(10):
await queue.put(i)
await asyncio.sleep(0.1)
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(item)
async def main():
queue = asyncio.Queue()
await asyncio.gather(
producer(queue),
consumer(queue)
)
asyncio.run(main())
# threading 版本
import threading
from queue import Queue
def producer(queue):
for i in range(10):
queue.put(i)
def consumer(queue):
while True:
item = queue.get()
if item is None:
break
print(item)
queue = Queue()
threads = [
threading.Thread(target=producer, args=(queue,)),
threading.Thread(target=consumer, args=(queue,))
]
for t in threads: t.start()
for t in threads: t.join()

支持与分享

如果这篇文章对你有帮助,欢迎支持作者或分享给更多人

Python 并发与异步编程
https://blog.souloss.com/posts/interview/python-concurrency-and-async/
作者
Souloss
发布于
2023-12-12
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时