asyncio#

cache#

from toolbox.asyncio import future_lru_cache
future_lru_cache(maxsize)[source]#

Decorator to cache an async function’s return value each time it is called.

Parameters:

maxsize (Optional[int]) – The maximum size of the cache.

Return type:

Awaitable

Returns:

The decorated function.

Notes

This method is a modification of a answer from StackOverflow here.

Example

from toolbox import future_lru_cache

@future_lru_cache
async def func():
    # Expensive computation.
    return 42

async def main():
    await func()  # Runs it once.
    await func()  # Returns the cached value.

pattern#

from toolbox.asyncio import pattern
class CoroutineClass[source]#

Adds start, stop, and async context manager functionality to a class.

This is a useful pattern that can be used for any asyncio-based class with an awaitable entry-point that needs to be started/stopped via both non-blocking code, and/or async code. Built-in with an async context manager.

This is useful for large asynchronous operations that happens within a single class. See example below for how to use it.

Parameters:
  • func (Optional[Awaitable]) – The awaitable entry-point of the class. Defaults to ‘self.entry’.

  • start_callback (Optional[Callable]) – A function to call when the class is started.

  • end_callback (Optional[Callable]) – A function to call when the class is stopped.

  • run (bool) – Whether to start the class immediately on initialization.

Example

from toolbox import CoroutineClass
import asyncio

class Coroutine(CoroutineClass):
    def __init__(self, run: bool = False):
        super().__init__(run=run)

    # Default entry function.
    async def entry(self):
        await asyncio.sleep(1)
        return "Hello world"

# Start coroutine outside Python async context.
def iomain():

    # via __init__
    coro = Coroutine(run=True)
    print(coro.result)  # Hello world

    # via .run()
    coro = Coroutine()
    result = coro.run()
    print(result)  # Hello world

# Start coroutine inside Python async context.
async def aiomain():

    # via __init__
    coro = Coroutine(run=True)
    await asyncio.sleep(1)
    coro.stop()
    print(coro.result)  # None - because process was stopped before completion.

    # via .run()
    coro = Coroutine()
    coro.run()
    await asyncio.sleep(1)
    result = coro.stop()  # None - because coroutine was stopped before completion.
    print(result)  # Hello world

    # via await
    coro = Coroutine()
    result = await coro  # You can also start, and await later.
    print(result)  # Hello World

    # via context manager
    async with Coroutine() as coro:
        result = await coro
    print(result)  # Hello World
stop(self, result)#

Stops the task without blocking.

Parameters:

result (Optional[Any]) – The result to return when the task is stopped.

Notes

This function is attached as a callback to the task.

Return type:

Any


streams#

from toolbox.asyncio import streams
async tls_handshake(reader, writer, ssl_context, server_side)[source]#

Manually perform a TLS handshake over a stream.

Parameters:
  • reader (StreamReader) – The reader of the client connection.

  • writer (StreamWriter) – The writer of the client connection.

  • ssl_context (Optional[SSLContext]) – The SSL context to use. Defaults to None.

  • server_side (bool) – Whether the connection is server-side or not. Defaults to False.

Note

If the ssl_context is not passed and server_side is not set, then ssl.create_default_context() will be used.

For Python 3.6 to 3.9 you can use ssl.PROTOCOL_TLS for the SSL context. For Python 3.10+ you need to either use ssl.PROTOCOL_TLS_CLIENT or ssl.PROTOCOL_TLS_SERVER depending on the role of the reader/writer.

Example

Client code:

from toolbox.asyncio.streams import tls_handshake
import asyncio

async def client():
    reader, writer = await asyncio.open_connection("httpbin.org", 443, ssl=False)
    await tls_handshake(reader=reader, writer=writer)

    # Communication is now encrypted.
    ...

asyncio.run(client())

Server code:

from toolbox.asyncio.streams import tls_handshake
import asyncio
import ssl

async def server(reader, writer):
    context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
    context.load_cert_chain(certfile="server.crt", keyfile="server.key")
    await tls_handshake(
        reader=reader,
        writer=writer,
        ssl_context=context,
        server_side=True,
    )

    # Connection is now encrypted.
    ...

async def main():
    srv = await asyncio.start_server(server, host="127.0.0.1", port=8888)
    async with srv:
        await srv.serve_forever()

asyncio.run(main())

threads#

from toolbox.asyncio import threads
async to_thread(func, *args, **kwargs)[source]#

Asynchronously run function func in a separate thread.

Any *args and **kwargs supplied for this function are directly passed to func. Also, the current contextvars.Context is propogated, allowing context variables from the main thread to be accessed in the separate thread.

Return a coroutine that can be awaited to get the eventual result of func.

Parameters:
  • func (Callable) – Synchronous function to create awaitable context with.

  • args – Arguments to pass to func.

  • kwargs – Arguments to pass to func.

Note

This function is similar to Python 3.9 asyncio.to_thread(), which can be found here, with slight modifications to make it backwards compatible.

Example

from toolbox.asyncio.threads import to_thread
import asyncio
import time

def func():
    time.sleep(2)
    return "Hello world"

asyncio main():
    await to_thread(func)

asyncio.run(main())
Return type:

Awaitable

awaitable(func)[source]#

Decorator that converts a synchronous function into an asynchronous function.

When decorator is used func becomes an awaitable. When awaited, the synchronous function runs in a seperate thread as to not block the event loop. This function leverages the toolbox.asyncio.threads.to_thread() function.

Parameters:

func (Callable) – Synchronous function to create awaitable context with.

Example

from toolbox.asyncio.threads import awaitable
import asyncio
import time

@awaitable
def func():
    time.sleep(2)
    return "Hello world"

async def main():
    await func()

asyncio.run(func())
Return type:

Awaitable[Any]