from prezutils import setup


setup(
    name="Awaits, how do they work?",
    author="Yann Kaiser",
    author_extra=[
        Twitter("@YannKsr"),
        Employer("Criteo", is_hiring=True, at="Palo Alto"),
        PyPI("clize", purpose="Turn functions into CLIs"),
    ],
)
          

How they want us to use it


import asyncio

import aiohttp


async def get_things_from_internet(url):
    async with aiohttp.get(url) as resp:
        print(await resp.text())
            

loop = asyncio.get_event_loop()

all_requests = asyncio.gather([
    get_things_from_internet('https://www.google.com/')
    get_things_from_internet('https://www.python.org/')
])

result = loop.run_until_complete(all_requests)
print(result)
            

2011

Node JS

CGP Grey

How we really want to use it


async def fib(n):
    if n < 2:
        return n
    return (await fib(n-1)) + (await fib(n-2))

print(something_we_will_build_here(fib(100)))
            

def recur_fib(n):
    if n < 2:
        return n
    return recur_fib(n-1) + recur_fib(n-2)
            

def iter_fib(n):
    n_2 = 0
    n_1 = 1
    for _ in range(n - 1):
        n_2, n_1 = n_1, n_2+n_1
    return n_1
            

import dis

@dis.dis
async def myfunc():
    return await "some object"
              

  3           0 LOAD_CONST               1 ('some object')
              2 GET_AWAITABLE
              4 LOAD_CONST               0 (None)
              6 YIELD_FROM
              8 RETURN_VALUE
              

  3           0 LOAD_CONST               1 ('some object')
              2 GET_AWAITABLE
              4 LOAD_CONST               0 (None)
              6 YIELD_FROM
              8 RETURN_VALUE
              

'some object'
              

              GET_AWAITABLE
              


stack[0] = stack[0].__await__()
              

  3           0 LOAD_CONST               1 ('some object')
              2 GET_AWAITABLE
              4 LOAD_CONST               0 (None)
              6 YIELD_FROM
              8 RETURN_VALUE
              

<coroutine object>
              

  3           0 LOAD_CONST               1 ('some object')
              2 GET_AWAITABLE
              4 LOAD_CONST               0 (None)
              6 YIELD_FROM
              8 RETURN_VALUE
              

<coroutine object>
None
              

  3           0 LOAD_CONST               1 ('some object')
              2 GET_AWAITABLE
              4 LOAD_CONST               0 (None)
              6 YIELD_FROM
              8 RETURN_VALUE
              

<return value of the coro>
              

  3           0 LOAD_CONST               1 ('some object')
              2 GET_AWAITABLE
              4 LOAD_CONST               0 (None)
              6 YIELD_FROM
              8 RETURN_VALUE
              

            

import dis

@dis.dis
def x():
    yield from "some object"
              

  3           0 LOAD_CONST               1 ('some object')
              2 GET_YIELD_FROM_ITER
              4 LOAD_CONST               0 (None)
              6 YIELD_FROM
              8 RETURN_VALUE
              

  3           0 LOAD_CONST               1 ('some object')
              2 GET_AWAITABLE
              4 LOAD_CONST               0 (None)
              6 YIELD_FROM
              8 RETURN_VALUE
            

class AdditionAwaitable:
    def __init__(self, *args):
        self.args = args

    def __await__(self):
        result = yield ('add', *self.args)
        return result
            

async def add(*args):
    return await AdditionAwaitable(*args)
            

async def add(*args):
    return await AdditionAwaitable(*args)
            

              g = add(1, 2)
            

>>> g
<coroutine object add at 0x7f7f4a0ace60>
>>> dir(g)
['__await__', '__class__', '__del__', '__delattr__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__name__', '__ne__', '__new__', '__qualname__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', 'close', 'cr_await', 'cr_code', 'cr_frame', 'cr_running', 'send', 'throw']
>>> [attr for attr in dir(g) if not attr.startswith('_')]
['close', 'cr_await', 'cr_code', 'cr_frame', 'cr_running', 'send', 'throw']
            

>>> g.send("spam")
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: can''t send non-None value to a just-started coroutine
            

>>> g.send(None)
('add', 1, 2)
            

>>> g.send(None)
('add', 1, 2)
            

class AdditionAwaitable:
    def __init__(self, *args):
        self.args = args

    def __await__(self):
        result = yield ('add', *self.args)
        return result
            

async def add(*args):
    return await AdditionAwaitable(*args)
            

>>> g.send(None)
('add', 1, 2)
            

>>> g.cr_frame
<frame object at 0x7f8c31d21dc8>
>>> inspect.getframeinfo(g.cr_frame)
Traceback(filename='add.py', lineno=10, function='add', code_context=['    result = await AdditionAwaitable(*args)\n'], index=0)
>>> inspect.getframeinfo(g.cr_await.gi_frame)
Traceback(filename='add.py', lineno=6, function='__await__', code_context=["        return (yield ('add', *self.args))\n"], index=0)
            

>>> g.send(None)
('add', 1, 2)
>>> g.send(3)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration: 3
            

Recap:

  • await is based on yield from
  • async functions pause before running
  • coro.send(...) resumes such a function
  • That function will progress until it hits a yield
  • coro will store where it is paused

Let's automate this!


def run_task(coro):
    msg = coro.send(None)
            

def run_task(coro):
    msg = coro.send(None)
    action, *args = msg
    if action == 'add':
        result = sum(args)
            

def run_task(coro):
    while True:
        msg = coro.send(None)
        action, *args = msg
        if action == 'add':
            result = sum(args)
            

def run_task(coro):
    result = None
    while True:
        msg = coro.send(result)
        action, *args = msg
        if action == 'add':
            result = sum(args)
            

def run_task(coro):
    result = None
    while True:
        try:
            msg = coro.send(result)
        except StopIteration as e:
            return e.value
        action, *args = msg
        if action == 'add':
            result = sum(args)
            

def run_task(coro):
    result = None
    while True:
        try:
            msg = coro.send(result)
        except StopIteration as e:
            return e.value
        action, *args = msg
        if action == 'add':
            result = sum(args)
        else:
            coro.throw(ValueError(action))
            

def run_task(coro):
    result = None
    while True:
        try:
            msg = coro.send(result)
        except StopIteration as e:
            return e.value
        action, *args = msg
        if action == 'add':
            result = sum(args)
        else:
            coro.throw(ValueError(action))

print(run_task(add(1, 2)))
            

3
            

async def fib(n):
    if n < 2:
        return n
    result = await AdditionAwaitable(await fib(n - 2),
                                     await fib(n - 1))
    return result
            

cache = {}


async def fib(n):
    if n in cache:
        return cache[n]
    if n < 2:
        return n
    result = await AdditionAwaitable(await fib(n - 2),
                                     await fib(n - 1))
    cache[n] = result
    return result
            

run_task(fib(100))
            

def run_task(coro):
    result = None
    while True:
        try:
            msg = coro.send(result)
        except StopIteration as e:
            return e.value
        action, *args = msg
        if action == 'add':
            result = sum(args)
        else:
            coro.throw(ValueError(action))
            

def run_task(coro):
    result = None
    while True:
        try:
            msg = coro.send(result)
        except StopIteration as e:
            return e.value
        action, *args = msg
        if action == 'add':
            result = sum(args)
        elif action == 'sub':
            a, b = args
            result = a - b
        else:
            coro.throw(ValueError(action))
            

def run_task(coro):
    result = None
    while True:
        try:
            msg = coro.send(result)
        except StopIteration as e:
            return e.value
        action, *args = msg
        if action == 'add':
            result = sum(args)
        elif action == 'sub':
            a, b = args
            result = a - b
        elif action == 'mul':
            a, b = args
            result = a * b
        elif action == 'div':
            a, b = args
            result = a / b
        else:
            coro.throw(ValueError(action))
            

def run_task(coro):
    result = None
    while True:
        try:
            msg = coro.send(result)
        except StopIteration as e:
            return e.value
        action, *args = msg
        if action == 'add':
            result = sum(args)
        elif action == 'sub':
            a, b = args
            result = a - b
        elif action == 'mul':
            a, b = args
            result = a * b
        elif action == 'div':
            a, b = args
            result = a / b
        elif action == 'mod':
            a, b = args
            result = a % b
        elif action == 'divmod':
            a, b = args
            result = a / b, a % b
        elif action == 'getattr':
            obj, attr = args
            result = getattr(obj, attr)
        elif action == 'setattr':
            obj, attr, val = args
            result = setattr(obj, attr, val)
        else:
            coro.throw(ValueError(action))
            

Deferred

Future

Promise

Callbacks

a Promise


class Promise:
    def __init__(self):
        self.success = None
        self.result = None

    def set_success(self, value):
        self.success = True
        self.result = value
        return self

    def set_exception(self, exc):
        self.success = False
        self.result = exc
        return self

    def __await__(self):
        return (yield self)
            

class Promise:
    def __init__(self):
        self.success = None
        self.result = None
            
p.success p.result
🥚 None
🐣 True 12345
🍳 False ValueError('oops')

def run(coro):
    result = None
    while True:
        try:
            msg = coro.send(result)
        except StopIteration as e:
            return e.value
        action, *args = msg
        if action == 'add':
            result = sum(args)
        else:
            coro.throw(ValueError(action))
            

def run(coro):
    result = None
    while True:
        try:
            msg = coro.send(result)
        except StopIteration as e:
            return e.value
        action, *args = msg
        if action == 'add':
            result = sum(args)
        else:
            coro.throw(ValueError(action))
            

def run(coro):
    result = None
    while True:
        try:
            promise = coro.send(result)
        except StopIteration as e:
            return e.value
            

def run(coro):
    promise = Promise().set_success(None)
    while True:
        try:
            promise = coro.send(promise.result)
        except StopIteration as e:
            return e.value
            

def run(coro):
    promise = Promise().set_success(None)
    while True:
        try:
            if promise.success:
                promise = coro.send(promise.result)
            else:
                promise = coro.throw(promise.result)
        except StopIteration as e:
            return e.value
            

def run(coro):
    promise = Promise().set_success(None)
    while True:
        if promise.success is None:
            ... # wait ??
        try:
            if promise.success:
                promise = coro.send(promise.result)
            else:
                promise = coro.throw(promise.result)
        except StopIteration as e:
            return e.value
            

coros = []


def add_coro(coro):
    kickoff_promise = Promise().set_success(None)
    result_promise = Promise()
    coros.append((coro, kickoff_promise, result_promise))
    return result_promise
            

def run(initial_coro):
    final_result = add_coro(initial_coro)
    while final_result.success is None:
        ... # run coroutines
    if final_result.success:
        return final_result.result
    else:
        raise final_result.result
            

select a coro whose promise has materialized


        for i, (coro, awaited_promise, result_promise) in enumerate(coros):
            if awaited_promise.success is not None:
                coros.pop(i)
                break
        else:
            raise RuntimeException("Deadlock!")
            

run the coroutine


        # coro, awaited_promise, result_promise
        try:
            if awaited_promise.success:
                new_promise = coro.send(awaited_promise.result)
            else:
                new_promise = coro.throw(awaited_promise.result)
        except StopIteration as e:
            result_promise.set_success(e.value)
        except BaseException as e:
            result_promise.set_exception(e)
        else:
            coros.append((coro, new_promise, result_promise))
            

coros = []


def add_coro(coro):
    kickoff_promise = Promise().set_success(None)
    result_promise = Promise()
    coros.append((coro, kickoff_promise, result_promise))
    return result_promise


def run(initial_coro):
    final_result = add_coro(initial_coro)
    while final_result.success is None:
        for i, (coro, awaited_promise, result_promise) in enumerate(coros):
            if awaited_promise.success is not None:
                coros.pop(i)
                break
        else:
            raise RuntimeException("Deadlock!")
        try:
            if awaited_promise.success:
                new_promise = coro.send(awaited_promise.result)
            else:
                new_promise = coro.throw(awaited_promise.result)
        except StopIteration as e:
            result_promise.set_success(e.value)
        except BaseException as e:
            result_promise.set_exception(e)
        else:
            coros.append((coro, new_promise, result_promise))
    if final_result.success:
        return final_result.result
    else:
        raise final_result.result
            

cache = {}


async def fib(n):
    if n in cache:
        return cache[n]
    if n < 2:
        return n
    result = await fib(n-2) + await fib(n-1)
    cache[n] = result
    return result
            

cache = {}


async def fib(n):
    if n in cache:
        return cache[n]
    if n < 2:
        return n
    result = await add_coro(fib(n-2)) + await add_coro(fib(n-1))
    cache[n] = result
    return result
            

run(fib(10000))
            

Recap:

  • Promises simplify our task loop
  • We made our task loop switch between coroutines
  • Creating new tasks help us cheat the stack limit

async def echo_server(conn):
    text = await read(conn)
    while text.strip() != b'bye':
        print(f"Received: {text}")
        await write(conn, b'echo: ' + text)
        text = await read(conn)
    conn.close()
            

select

epoll

IOCP


def run(initial_coro):
    final_result = add_coro(initial_coro)
    while final_result.success is None:
        for i, (coro, awaited_promise, result_promise) in enumerate(coros):
            if awaited_promise.success is not None:
                coros.pop(i)
                break
        else:
            raise RuntimeException("Deadlock!")
        try:
            if awaited_promise.success:
                new_promise = coro.send(awaited_promise.result)
            else:
                new_promise = coro.throw(awaited_promise.result)
        except StopIteration as e:
            result_promise.set_success(e.value)
        except BaseException as e:
            result_promise.set_exception(e)
        else:
            coros.append((coro, new_promise, result_promise))
    if final_result.success:
        return final_result.result
    else:
        raise final_result.result
            

        for i, (coro, awaited_promise, result_promise) in enumerate(coros):
            if awaited_promise.success is not None:
                coros.pop(i)
                break
        else:
            if reads or writes:
                check_socks(timeout=None)
                continue
            else:
                raise RuntimeException("Deadlock!")
            

reads = collections.defaultdict(list)
writes = collections.defaultdict(list)
            

def readability(sock):
    p = Promise()
    reads[sock].append(p)
    return p

def writability(sock):
    p = Promise()
    writes[sock].append(p)
    return p
            

async def read(sock):
    await readability(sock)
    return f.recv(4096)


async def write(sock, to_send):
    await writability(sock)
    return f.send(to_send)
            

def check_socks(timeout):
    rlist, wlist, _ = \
        select.select(list(reads), list(writes), [], timeout)

    for fd in rlist:
        for p in reads.pop(fd):
            p.set_success(None)

    for fd in wlist:
        for p in writes.pop(fd):
            p.set_success(None)
            

async def echo_server(conn):
    text = await read(conn)
    while text.strip() != b'bye':
        print(f"Received: {text}")
        await write(conn, b'echo: ' + text)
        text = await read(conn)
    conn.close()
            

async def launch_server(func, port):
    sock = socket.socket(
        type=socket.SOCK_STREAM | socket.SOCK_NONBLOCK)
    sock.bind(('127.0.0.1', port))
    sock.listen()
    print(f"Listening on port {port}")
    while True:
        await readability(sock)
        conn, addr = sock.accept()
        print(f'Connected to {addr}')
        add_coro(func(conn))
            

run(launch_server(echo_server, 4321))
            

run(launch_server(echo_server, 4321))
            

Recap:

  • select & co help us not make blocking calls
  • We can upgrade our task loop to an event loop
  • We can build friendlier tools using select and Promises

MORE!

curio

PEP 525

ceval.c

Callback hell

dir()