事件循环是一种常见的运行机制,尤其在单线程的JavaScript语言中就采用这种机制,来解决单线程运行带来的一些问题。本文尝试用python来实现事件循环的基本逻辑与功能:

基本概念

python的协程方案

首先要了解Python中的异步编程,主要有以下几种方案:

  • twisted:使用事件驱动机制来提升python性能

  • gevent:使用greenlet在用户态完成栈和上下文切换来减少切换带来的性能损耗

  • tornado:使用生成器来保存协程上下文及状态,使用原生的python语法实现了协程

  • asyncio:异步编程标准化。

而本次就先介绍asyncio的异步编程标准化方案

协程

  • 协程是一个可以在返回之前“暂停执行”的函数,它可以在一段时间内间接地将控制传递给另一个协程
1
2
3
4
# await将函数控制传递回事件循环(它暂停了周围协程的执行)
async def g():
r = await f()
return r

asyncio

  • Async IO是一种语言无关的编程范式,asyncio是与其对应的python包
    • async/await用于定义协程的python关键字

异步

什么是异步?

  • 异步协程能够在等待最终结果时“暂停”,同时让其他协程运行
  • 异步代码提供了并发的感觉

**同步和异步很好的描述:**Judit和24个对手下棋,Judit每下一步需要5秒,对手需要55秒,游戏平均每人下30步结束(总共60步)

同步版本【一个任务完成再做下一个任务】:

  • Judit每次只和一个对手玩,直到游戏完成,再换下一个对手
  • 每局游戏耗时(55 + 5)* 30 == 1800秒,即30分钟。24个对手需要:24 * 30 == 720分钟=12个小时

异步版本【多个任务并发执行】:

  • Judit每下完一步,离开桌子换下一个对手,在所有24个对手中,Judit的一次移动需要24 * 5 == 120秒,也就是2分钟。全部下完需要:120 * 30 = = 3600秒=1小时

asyncio实现携程

基本函数

  • loop.call_later()
  • loop.call_soon()
  • loop.time()

实践一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import asyncio

def function_1(end_time, loop):
print("function_1 called")
if loop.time() < end_time:
loop.call_later(1, function_2, end_time, loop)
else:
loop.stop()

def function_2(end_time, loop):
print("function_2 called ")
if loop.time() < end_time:
loop.call_later(1, function_3, end_time, loop)
else:
loop.stop()

def function_3(end_time, loop):
print("function_3 called")
if loop.time() < end_time:
loop.call_later(1, function_1, end_time, loop)
else:
loop.stop()

if __name__ == '__main__':
loop = asyncio.get_event_loop()
end_loop_time = loop.time() + 8.0
loop.call_soon(function_1, end_loop_time, loop)
# 运行事件循环直到loop.stop()被调用
loop.run_forever()
loop.close()

输出:

1
2
3
4
5
6
7
8
9
function_1 called
function_2 called
function_3 called
function_1 called
function_2 called
function_3 called
function_1 called
function_2 called
function_3 called

run_in_executor

  • 在默认的thread pool中执行func
  • 在指定的thread pool中执行funcIO-bound操作
  • 在指定的process pool中执行funcCPU-bound操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio
import concurrent.futures

def blocking_io():
# IO-bound操作会block event loop
# 应该放到thread pool中去执行
with open('./test.txt', 'w') as f:
return f.write('test-string')

def cpu_bound():
# CPU-bound操作会block event loop,
# 应该放到process pool中去执行
return sum(i * i for i in range(10 ** 7))

async def main():
loop = asyncio.get_running_loop()

# 1. 在默认的thread pool中执行
result = await loop.run_in_executor(
None, blocking_io)
print('default thread pool', result)

# 2. 在指定的thread pool中执行
with concurrent.futures.ThreadPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, blocking_io)
print('custom thread pool', result)

# 3. 在指定的process pool中执行
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(
pool, cpu_bound)
print('custom process pool', result)

if __name__ == '__main__':
asyncio.run(main())

# 输出:
# default thread pool 11
# custom thread pool 11
# custom process pool 333333283333335000000