Source code for toolbox.asyncio.pattern
import asyncio
from abc import ABC
from typing import Any, Awaitable, Callable, Optional
[docs]
class CoroutineClass(ABC):
def __init__(
self,
func: Optional[Awaitable] = None,
start_callback: Optional[Callable] = None,
end_callback: Optional[Callable] = None,
run: bool = False,
):
"""
Adds start, stop, and async context manager functionality to a class.
This is a useful pattern that can be used for any asyncio-based class with an
awaitable entry-point that needs to be started/stopped via both non-blocking
code, and/or async code. Built-in with an async context manager.
This is useful for large asynchronous operations that happens within a single
class. See example below for how to use it.
Args:
func: The awaitable entry-point of the class. Defaults to 'self.entry'.
start_callback: A function to call when the class is started.
end_callback: A function to call when the class is stopped.
run: Whether to start the class immediately on initialization.
Example:
.. code-block:: python
from toolbox import CoroutineClass
import asyncio
class Coroutine(CoroutineClass):
def __init__(self, run: bool = False):
super().__init__(run=run)
# Default entry function.
async def entry(self):
await asyncio.sleep(1)
return "Hello world"
# Start coroutine outside Python async context.
def iomain():
# via __init__
coro = Coroutine(run=True)
print(coro.result) # Hello world
# via .run()
coro = Coroutine()
result = coro.run()
print(result) # Hello world
# Start coroutine inside Python async context.
async def aiomain():
# via __init__
coro = Coroutine(run=True)
await asyncio.sleep(1)
coro.stop()
print(coro.result) # None - because process was stopped before completion.
# via .run()
coro = Coroutine()
coro.run()
await asyncio.sleep(1)
result = coro.stop() # None - because coroutine was stopped before completion.
print(result) # Hello world
# via await
coro = Coroutine()
result = await coro # You can also start, and await later.
print(result) # Hello World
# via context manager
async with Coroutine() as coro:
result = await coro
print(result) # Hello World
"""
self._func = func if func else self.entry
self._start_callback = start_callback
self._end_callback = end_callback
self._task = None
# Setup the asyncio loop.
try:
loop = asyncio.get_event_loop()
except RuntimeError: # pragma: no cover
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
finally:
self._loop = loop
self.result = None
if run:
self.run()
async def entry(self):
"""
Default async entry-point.
"""
raise NotImplementedError # pragma: no cover
def run(self):
"""
Starts the task without blocking.
Note:
The task will block if we call this method outside an async context.
"""
# If task is not running or has been cancelled, start it.
if not self._task or self._task.cancelled():
# Call the start callback.
if self._start_callback:
self._start_callback()
# Creates the task inside the loop.
self._task = self._loop.create_task(self._func())
# Add self.stop as callback for when the task is done.
self._task.add_done_callback(self.stop)
# Runs the loop if we are not in an async context.
if not self._loop.is_running():
# Runs task until completion. Calls self.stop.
self._loop.run_until_complete(self._task)
return self.result
def stop(self, result: Optional[Any] = None) -> Any:
"""
Stops the task without blocking.
Args:
result: The result to return when the task is stopped.
Notes:
This function is attached as a callback to the task.
"""
# If task is running and hasn't been cancelled, cancel it.
if self._task and not self._task.cancelled():
# Cancel task.
self._task.cancel()
# Call the end callback.
if self._end_callback:
self._end_callback()
# Tries the get the result of the task.
if self._task.done():
try:
self.result = self._task.result()
except asyncio.CancelledError: # pragma: no cover
pass
# Return the result.
return self.result
async def __aenter__(self) -> type:
"""
Enter the async context manager.
"""
self.run()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb) -> bool:
"""
Exit the async context manager.
"""
self.stop()
return self._task.cancelled()
def __await__(self):
"""
Await the task.
"""
if not self._task:
self.run()
return self._task.__await__()