asyncio#

pattern#

from toolbox.asyncio import pattern
class CoroutineClass[source]#

Adds start, stop, and async context manager functionality to a class.

This is a useful pattern that can be used for any asyncio-based class with an awaitable entry-point that needs to be started/stopped via both non-blocking code, and/or async code. Built-in with an async context manager.

This is useful for large asynchronous operations that happens within a single class. See example below for how to use it.

Parameters
  • func (Optional[Awaitable]) – The awaitable entry-point of the class. Defaults to β€˜self.entry’.

  • start_callback (Optional[Callable]) – A function to call when the class is started.

  • end_callback (Optional[Callable]) – A function to call when the class is stopped.

  • run (bool) – Whether to start the class immediately on initialization.

Example

from toolbox import CoroutineClass
import asyncio

class Coroutine(CoroutineClass):
    def __init__(self, run: bool = False):
        super().__init__(run=run)

    # Default entry function.
    async def entry(self):
        await asyncio.sleep(1)
        return "Hello world"

# Start coroutine outside Python async context.
def iomain():

    # via __init__
    coro = Coroutine(run=True)
    print(coro.result)  # Hello world

    # via .run()
    coro = Coroutine()
    result = coro.run()
    print(result)  # Hello world

# Start coroutine inside Python async context.
async def aiomain():

    # via __init__
    coro = Coroutine(run=True)
    await asyncio.sleep(1)
    coro.stop()
    print(coro.result)  # None - because process was stopped before completion.

    # via .run()
    coro = Coroutine()
    coro.run()
    await asyncio.sleep(1)
    result = coro.stop()  # None - because coroutine was stopped before completion.
    print(result)  # Hello world

    # via await
    coro = Coroutine()
    result = await coro  # You can also start, and await later.
    print(result)  # Hello World

    # via context manager
    async with Coroutine() as coro:
        result = await coro
    print(result)  # Hello World
start(self)#

Starts the task without blocking.

Note

The task will block if we call this method outside an async context.

stop(self, result)#

Stops the task without blocking.

Parameters

result (Optional[Any]) – The result to return when the task is stopped.

Notes

This function is attached as a callback to the task.

Return type

Any


streams#

from toolbox.asyncio import streams
async tls_handshake(reader, writer, ssl_context, server_side)[source]#

Manually perform a TLS handshake over a stream.

Parameters
  • reader (StreamReader) – The reader of the client connection.

  • writer (StreamWriter) – The writer of the client connection.

  • ssl_context (Optional[SSLContext]) – The SSL context to use. Defaults to None.

  • server_side (bool) – Whether the connection is server-side or not. Defaults to False.

Note

If the ssl_context is not passed and server_side is not set, then ssl.create_default_context() will be used.

For Python 3.6 to 3.9 you can use ssl.PROTOCOL_TLS for the SSL context. For Python 3.10+ you need to either use ssl.PROTOCOL_TLS_CLIENT or ssl.PROTOCOL_TLS_SERVER depending on the role of the reader/writer.

Example

Client code:

from toolbox.asyncio.streams import tls_handshake
import asyncio

async def client():
    reader, writer = await asyncio.open_connection("httpbin.org", 443, ssl=False)
    await tls_handshake(reader=reader, writer=writer)

    # Communication is now encrypted.
    ...

asyncio.run(client())

Server code:

from toolbox.asyncio.streams import tls_handshake
import asyncio
import ssl

async def server(reader, writer):
    context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    context.load_cert_chain(certfile="server.crt", keyfile="server.key")
    await tls_handshake(
        reader=reader,
        writer=writer,
        ssl_context=context,
        server_side=True,
    )

    # Connection is now encrypted.
    ...

async def main():
    srv = await asyncio.start_server(server, host="127.0.0.1", port=8888)
    async with srv:
        await srv.serve_forever()

asyncio.run(main())

threads#

from toolbox.asyncio import threads
async to_thread(func, *args, **kwargs)[source]#

Asynchronously run function func in a separate thread.

Any *args and **kwargs supplied for this function are directly passed to func. Also, the current contextvars.Context is propogated, allowing context variables from the main thread to be accessed in the separate thread.

Return a coroutine that can be awaited to get the eventual result of func.

Parameters
  • func (Callable) – Synchronous function to create awaitable context with.

  • args – Arguments to pass to func.

  • kwargs – Arguments to pass to func.

Note

This function is similar to Python 3.9 asyncio.to_thread(), which can be found here, with slight modifications to make it backwards compatible.

Example

from toolbox.asyncio.threads import to_thread
import asyncio
import time

def func():
    time.sleep(2)
    return "Hello world"

asyncio main():
    await to_thread(func)

asyncio.run(main())
Return type

Awaitable

awaitable(func)[source]#

Decorator that converts a synchronous function into an asynchronous function.

When decorator is used func becomes an awaitable. When awaited, the synchronous function runs in a seperate thread as to not block the event loop. This function leverages the toolbox.asyncio.threads.to_thread() function.

Parameters

func (Callable) – Synchronous function to create awaitable context with.

Example

from toolbox.asyncio.threads import awaitable
import asyncio
import time

@awaitable
def func():
    time.sleep(2)
    return "Hello world"

async def main():
    await func()

asyncio.run(func())
Return type

Awaitable[Any]