from prezutils import setup
setup(
name="Awaits, how do they work?",
author="Yann Kaiser",
author_extra=[
Twitter("@YannKsr"),
Employer("Criteo", is_hiring=True, at="Palo Alto"),
PyPI("clize", purpose="Turn functions into CLIs"),
],
)
How they want us to use it
import asyncio
import aiohttp
async def get_things_from_internet(url):
async with aiohttp.get(url) as resp:
print(await resp.text())
loop = asyncio.get_event_loop()
all_requests = asyncio.gather([
get_things_from_internet('https://www.google.com/')
get_things_from_internet('https://www.python.org/')
])
result = loop.run_until_complete(all_requests)
print(result)
2011
Node JS
async def fib(n):
if n < 2:
return n
return (await fib(n-1)) + (await fib(n-2))
print(something_we_will_build_here(fib(100)))
def recur_fib(n):
if n < 2:
return n
return recur_fib(n-1) + recur_fib(n-2)
def iter_fib(n):
n_2 = 0
n_1 = 1
for _ in range(n - 1):
n_2, n_1 = n_1, n_2+n_1
return n_1
import dis
@dis.dis
async def myfunc():
return await "some object"
3 0 LOAD_CONST 1 ('some object')
2 GET_AWAITABLE
4 LOAD_CONST 0 (None)
6 YIELD_FROM
8 RETURN_VALUE
3 0 LOAD_CONST 1 ('some object')
2 GET_AWAITABLE
4 LOAD_CONST 0 (None)
6 YIELD_FROM
8 RETURN_VALUE
'some object'
GET_AWAITABLE
≈
stack[0] = stack[0].__await__()
3 0 LOAD_CONST 1 ('some object')
2 GET_AWAITABLE
4 LOAD_CONST 0 (None)
6 YIELD_FROM
8 RETURN_VALUE
<coroutine object>
3 0 LOAD_CONST 1 ('some object')
2 GET_AWAITABLE
4 LOAD_CONST 0 (None)
6 YIELD_FROM
8 RETURN_VALUE
<coroutine object>
None
3 0 LOAD_CONST 1 ('some object')
2 GET_AWAITABLE
4 LOAD_CONST 0 (None)
6 YIELD_FROM
8 RETURN_VALUE
<return value of the coro>
3 0 LOAD_CONST 1 ('some object')
2 GET_AWAITABLE
4 LOAD_CONST 0 (None)
6 YIELD_FROM
8 RETURN_VALUE
import dis
@dis.dis
def x():
yield from "some object"
3 0 LOAD_CONST 1 ('some object')
2 GET_YIELD_FROM_ITER
4 LOAD_CONST 0 (None)
6 YIELD_FROM
8 RETURN_VALUE
3 0 LOAD_CONST 1 ('some object')
2 GET_AWAITABLE
4 LOAD_CONST 0 (None)
6 YIELD_FROM
8 RETURN_VALUE
class AdditionAwaitable:
def __init__(self, *args):
self.args = args
def __await__(self):
result = yield ('add', *self.args)
return result
async def add(*args):
return await AdditionAwaitable(*args)
async def add(*args):
return await AdditionAwaitable(*args)
g = add(1, 2)
>>> g
<coroutine object add at 0x7f7f4a0ace60>
>>> dir(g)
['__await__', '__class__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'cr_await', 'cr_code', 'cr_frame', 'cr_running', 'send', 'throw']
>>> [attr for attr in dir(g) if not attr.startswith('_')]
['close', 'cr_await', 'cr_code', 'cr_frame', 'cr_running', 'send', 'throw']
>>> g.send("spam")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: can''t send non-None value to a just-started coroutine
>>> g.send(None)
('add', 1, 2)
>>> g.send(None)
('add', 1, 2)
class AdditionAwaitable:
def __init__(self, *args):
self.args = args
def __await__(self):
result = yield ('add', *self.args)
return result
async def add(*args):
return await AdditionAwaitable(*args)
>>> g.send(None)
('add', 1, 2)
>>> g.cr_frame
<frame object at 0x7f8c31d21dc8>
>>> inspect.getframeinfo(g.cr_frame)
Traceback(filename='add.py', lineno=10, function='add', code_context=[' result = await AdditionAwaitable(*args)\n'], index=0)
>>> inspect.getframeinfo(g.cr_await.gi_frame)
Traceback(filename='add.py', lineno=6, function='__await__', code_context=[" return (yield ('add', *self.args))\n"], index=0)
>>> g.send(None)
('add', 1, 2)
>>> g.send(3)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration: 3
Recap:
await
is based on yield from
coro.send(...)
resumes such a function
yield
coro
will store where it is paused
Let's automate this!
def run_task(coro):
msg = coro.send(None)
def run_task(coro):
msg = coro.send(None)
action, *args = msg
if action == 'add':
result = sum(args)
def run_task(coro):
while True:
msg = coro.send(None)
action, *args = msg
if action == 'add':
result = sum(args)
def run_task(coro):
result = None
while True:
msg = coro.send(result)
action, *args = msg
if action == 'add':
result = sum(args)
def run_task(coro):
result = None
while True:
try:
msg = coro.send(result)
except StopIteration as e:
return e.value
action, *args = msg
if action == 'add':
result = sum(args)
def run_task(coro):
result = None
while True:
try:
msg = coro.send(result)
except StopIteration as e:
return e.value
action, *args = msg
if action == 'add':
result = sum(args)
else:
coro.throw(ValueError(action))
def run_task(coro):
result = None
while True:
try:
msg = coro.send(result)
except StopIteration as e:
return e.value
action, *args = msg
if action == 'add':
result = sum(args)
else:
coro.throw(ValueError(action))
print(run_task(add(1, 2)))
3
async def fib(n):
if n < 2:
return n
result = await AdditionAwaitable(await fib(n - 2),
await fib(n - 1))
return result
cache = {}
async def fib(n):
if n in cache:
return cache[n]
if n < 2:
return n
result = await AdditionAwaitable(await fib(n - 2),
await fib(n - 1))
cache[n] = result
return result
run_task(fib(100))
def run_task(coro):
result = None
while True:
try:
msg = coro.send(result)
except StopIteration as e:
return e.value
action, *args = msg
if action == 'add':
result = sum(args)
else:
coro.throw(ValueError(action))
def run_task(coro):
result = None
while True:
try:
msg = coro.send(result)
except StopIteration as e:
return e.value
action, *args = msg
if action == 'add':
result = sum(args)
elif action == 'sub':
a, b = args
result = a - b
else:
coro.throw(ValueError(action))
def run_task(coro):
result = None
while True:
try:
msg = coro.send(result)
except StopIteration as e:
return e.value
action, *args = msg
if action == 'add':
result = sum(args)
elif action == 'sub':
a, b = args
result = a - b
elif action == 'mul':
a, b = args
result = a * b
elif action == 'div':
a, b = args
result = a / b
else:
coro.throw(ValueError(action))
def run_task(coro):
result = None
while True:
try:
msg = coro.send(result)
except StopIteration as e:
return e.value
action, *args = msg
if action == 'add':
result = sum(args)
elif action == 'sub':
a, b = args
result = a - b
elif action == 'mul':
a, b = args
result = a * b
elif action == 'div':
a, b = args
result = a / b
elif action == 'mod':
a, b = args
result = a % b
elif action == 'divmod':
a, b = args
result = a / b, a % b
elif action == 'getattr':
obj, attr = args
result = getattr(obj, attr)
elif action == 'setattr':
obj, attr, val = args
result = setattr(obj, attr, val)
else:
coro.throw(ValueError(action))
Deferred
Future
Promise
Callbacks
a Promise
class Promise:
def __init__(self):
self.success = None
self.result = None
def set_success(self, value):
self.success = True
self.result = value
return self
def set_exception(self, exc):
self.success = False
self.result = exc
return self
def __await__(self):
return (yield self)
class Promise:
def __init__(self):
self.success = None
self.result = None
p.success |
p.result |
|
---|---|---|
🥚 | None |
|
🐣 | True |
12345 |
🍳 | False |
ValueError('oops') |
def run(coro):
result = None
while True:
try:
msg = coro.send(result)
except StopIteration as e:
return e.value
action, *args = msg
if action == 'add':
result = sum(args)
else:
coro.throw(ValueError(action))
def run(coro):
result = None
while True:
try:
msg = coro.send(result)
except StopIteration as e:
return e.value
action, *args = msg
if action == 'add':
result = sum(args)
else:
coro.throw(ValueError(action))
def run(coro):
result = None
while True:
try:
promise = coro.send(result)
except StopIteration as e:
return e.value
def run(coro):
promise = Promise().set_success(None)
while True:
try:
promise = coro.send(promise.result)
except StopIteration as e:
return e.value
def run(coro):
promise = Promise().set_success(None)
while True:
try:
if promise.success:
promise = coro.send(promise.result)
else:
promise = coro.throw(promise.result)
except StopIteration as e:
return e.value
def run(coro):
promise = Promise().set_success(None)
while True:
if promise.success is None:
... # wait ??
try:
if promise.success:
promise = coro.send(promise.result)
else:
promise = coro.throw(promise.result)
except StopIteration as e:
return e.value
coros = []
def add_coro(coro):
kickoff_promise = Promise().set_success(None)
result_promise = Promise()
coros.append((coro, kickoff_promise, result_promise))
return result_promise
def run(initial_coro):
final_result = add_coro(initial_coro)
while final_result.success is None:
... # run coroutines
if final_result.success:
return final_result.result
else:
raise final_result.result
select a coro whose promise has materialized
for i, (coro, awaited_promise, result_promise) in enumerate(coros):
if awaited_promise.success is not None:
coros.pop(i)
break
else:
raise RuntimeException("Deadlock!")
run the coroutine
# coro, awaited_promise, result_promise
try:
if awaited_promise.success:
new_promise = coro.send(awaited_promise.result)
else:
new_promise = coro.throw(awaited_promise.result)
except StopIteration as e:
result_promise.set_success(e.value)
except BaseException as e:
result_promise.set_exception(e)
else:
coros.append((coro, new_promise, result_promise))
coros = []
def add_coro(coro):
kickoff_promise = Promise().set_success(None)
result_promise = Promise()
coros.append((coro, kickoff_promise, result_promise))
return result_promise
def run(initial_coro):
final_result = add_coro(initial_coro)
while final_result.success is None:
for i, (coro, awaited_promise, result_promise) in enumerate(coros):
if awaited_promise.success is not None:
coros.pop(i)
break
else:
raise RuntimeException("Deadlock!")
try:
if awaited_promise.success:
new_promise = coro.send(awaited_promise.result)
else:
new_promise = coro.throw(awaited_promise.result)
except StopIteration as e:
result_promise.set_success(e.value)
except BaseException as e:
result_promise.set_exception(e)
else:
coros.append((coro, new_promise, result_promise))
if final_result.success:
return final_result.result
else:
raise final_result.result
cache = {}
async def fib(n):
if n in cache:
return cache[n]
if n < 2:
return n
result = await fib(n-2) + await fib(n-1)
cache[n] = result
return result
cache = {}
async def fib(n):
if n in cache:
return cache[n]
if n < 2:
return n
result = await add_coro(fib(n-2)) + await add_coro(fib(n-1))
cache[n] = result
return result
run(fib(10000))
Recap:
async def echo_server(conn):
text = await read(conn)
while text.strip() != b'bye':
print(f"Received: {text}")
await write(conn, b'echo: ' + text)
text = await read(conn)
conn.close()
select
epoll
IOCP
…
def run(initial_coro):
final_result = add_coro(initial_coro)
while final_result.success is None:
for i, (coro, awaited_promise, result_promise) in enumerate(coros):
if awaited_promise.success is not None:
coros.pop(i)
break
else:
raise RuntimeException("Deadlock!")
try:
if awaited_promise.success:
new_promise = coro.send(awaited_promise.result)
else:
new_promise = coro.throw(awaited_promise.result)
except StopIteration as e:
result_promise.set_success(e.value)
except BaseException as e:
result_promise.set_exception(e)
else:
coros.append((coro, new_promise, result_promise))
if final_result.success:
return final_result.result
else:
raise final_result.result
for i, (coro, awaited_promise, result_promise) in enumerate(coros):
if awaited_promise.success is not None:
coros.pop(i)
break
else:
if reads or writes:
check_socks(timeout=None)
continue
else:
raise RuntimeException("Deadlock!")
reads = collections.defaultdict(list)
writes = collections.defaultdict(list)
def readability(sock):
p = Promise()
reads[sock].append(p)
return p
def writability(sock):
p = Promise()
writes[sock].append(p)
return p
async def read(sock):
await readability(sock)
return f.recv(4096)
async def write(sock, to_send):
await writability(sock)
return f.send(to_send)
def check_socks(timeout):
rlist, wlist, _ = \
select.select(list(reads), list(writes), [], timeout)
for fd in rlist:
for p in reads.pop(fd):
p.set_success(None)
for fd in wlist:
for p in writes.pop(fd):
p.set_success(None)
async def echo_server(conn):
text = await read(conn)
while text.strip() != b'bye':
print(f"Received: {text}")
await write(conn, b'echo: ' + text)
text = await read(conn)
conn.close()
async def launch_server(func, port):
sock = socket.socket(
type=socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
sock.bind(('127.0.0.1', port))
sock.listen()
print(f"Listening on port {port}")
while True:
await readability(sock)
conn, addr = sock.accept()
print(f'Connected to {addr}')
add_coro(func(conn))
run(launch_server(echo_server, 4321))
run(launch_server(echo_server, 4321))
Recap:
select
& co help us not make blocking calls
select
and Promise
s
curio
PEP 525
ceval.c
Callback hell
dir()