"""Runtime management for the scalems client.
Provide the framework for the SCALE-MS execution middleware layer.
Execution dispatching implementations can be chosen from the command line when
the interpreter is launched with Python's ``-m`` command line option.
Alternatively, the script can use a Python Context Manager (a ``with`` block) to
activate a WorkflowContext with an execution dispatcher implementation.
It is inadvisable to activate an execution dispatching environment with
procedural calls because it is more difficult to make sure that acquired resources
are properly released, in case the interpreter has to terminate prematurely.
Execution dispatching generally uses some sort of concurrency model,
but there is not a consistent concurrency model for all dispatchers.
ScaleMS provides abstractions to insulate scripting from particular implementations
or concurrency primitives (such as the need to call :py:func:`asyncio.run`).
The following diagram uses the :py:mod:`scalems.radical` execution module
to illustrate the workflow execution.
.. uml::
title scalems on radical.pilot (client side)
participant "workflow script" as script
box "SCALE-MS framework" #honeydew
participant "SCALE-MS API" as scalems.Runtime
participant WorkflowManager as client_workflowmanager
participant Queuer
end box
box "SCALE-MS RP adapter" #linen
participant scalems.radical
participant Executor as client_executor
end box
autoactivate on
-> scalems.radical: python -m scalems.radical ...
scalems.radical -> scalems.Runtime: scalems.invocation.run(workflow_manager)
scalems.Runtime -> scalems.radical: configuration()
return
scalems.Runtime -> scalems.radical: workflow_manager()
scalems.radical -> client_workflowmanager **: <<create>>
scalems.Runtime <-- scalems.radical:
scalems.Runtime -> script: <<runpy>>
return @scalems.app
scalems.Runtime -> scalems.Runtime: run_dispatch(work, context)
scalems.Runtime -> client_workflowmanager: async with dispatch()
client_workflowmanager -> client_workflowmanager: executor_factory()
client_workflowmanager -> client_executor **: <<create>>
client_workflowmanager --> client_workflowmanager: executor
client_workflowmanager -> Queuer **: <<create>>
== Launch runtime ==
client_workflowmanager -> client_executor: async with executor
activate client_workflowmanager #lightgray
...Dispatch work. See `manage_execution`...
client_workflowmanager <-- client_executor: leave executor context
deactivate client_workflowmanager
scalems.Runtime <-- client_workflowmanager: end dispatching context
scalems.Runtime --> scalems.Runtime: loop.run_until_complete()
deactivate client_workflowmanager
scalems.Runtime --> scalems.radical: SystemExit.code
deactivate scalems.Runtime
<-- scalems.radical: sys.exit
The interface available to ``@scalems.app`` is under development.
See :py:mod:`scalems.workflow`.
The details of work dispatching are not yet strongly specified or fully encapsulated.
`manage_execution` mediates a collaboration between a `RuntimeManager` and a
`WorkflowManager` (via `AbstractWorkflowUpdater`).
"""
from __future__ import annotations
__all__ = ("AbstractWorkflowUpdater", "RuntimeManager", "manage_execution")
import abc
import asyncio
import contextlib
import logging
import typing
import scalems.exceptions
from scalems.exceptions import APIError
from scalems.exceptions import DispatchError
from scalems.exceptions import InternalError
from scalems.exceptions import MissingImplementationError
from scalems.exceptions import ProtocolError
from scalems.messages import QueueItem
from scalems.store import FileStore
from scalems.workflow import Task
logger = logging.getLogger(__name__)
logger.debug("Importing {}".format(__name__))
[docs]class AbstractWorkflowUpdater(abc.ABC):
# TODO: The `item: Task` argument to `AbstractWorkflowUpdater.submit()` should be
# decoupled from the WorkflowManager implementation.
[docs] @abc.abstractmethod
async def submit(self, *, item: Task) -> asyncio.Task:
"""Submit work to update a workflow item.
Args:
item: managed workflow item as produced by
WorkflowManager.edit_item()
Returns:
watcher task for a coroutine that has already started.
The returned Task is a watcher task to confirm completion of or allow
cancellation of the dispatched work. Additional hidden Tasks may be
responsible for updating the provided item when results are available,
so the result of this task is not necessarily directly interesting to
the execution manager.
The caller check for unexpected cancellation or early failure. If the task has
not failed or been canceled by the time *submit()* returns, then we can
conclude that the task has been appropriately bound to the workfow item.
"""
...
class RuntimeDescriptor:
"""Data Descriptor class for (backend-specific) runtime state access.
TODO: Consider making this class generic in terms of the backend/configuration
type. (Maybe only possible if we keep the subclassing model of RuntimeManagers rather
than composing type information at instantiation.)
"""
# Ref: https://docs.python.org/3/reference/datamodel.html#implementing-descriptors
def __set_name__(self, owner, name):
# Called by type.__new__ during class creation to allow customization.
if getattr(owner, self.private_name, None) is not None:
raise ProtocolError(
f"Cannot assign {repr(self)} to {owner.__qualname__} "
f"with an existing non-None {self.private_name} member."
)
def __get__(self, instance, owner):
# Note that instance==None when called through the *owner* (as a class attribute).
if instance is None:
return self
else:
return getattr(instance, self.private_name, None)
def __set__(self, instance, value):
if getattr(instance, self.private_name, None) is not None:
raise APIError("Cannot overwrite an existing runtime state.")
setattr(instance, self.private_name, value)
def __delete__(self, instance):
try:
delattr(instance, self.private_name)
except AttributeError:
pass
def __init__(self):
self.private_name = "_runtime_state"
_BackendT = typing.TypeVar("_BackendT")
[docs]class RuntimeManager(typing.Generic[_BackendT], abc.ABC):
"""Client side manager for dispatching work loads and managing data flow.
A RuntimeManager is instantiated within the scope of the
`scalems.workflow.WorkflowManager.dispatch` context manager using the
:py:meth:`scalems.workflow.WorkflowManager._executor_factory`.
"""
get_edit_item: typing.Callable[[], typing.Callable]
"""Get the function that creates a WorkflowItem editing context."""
datastore: FileStore
submitted_tasks: typing.MutableSet[asyncio.Task]
_runtime_configuration: _BackendT
_command_queue: asyncio.Queue
_dispatcher_lock: asyncio.Lock
_queue_runner_task: typing.Union[None, asyncio.Task] = None
runtime = RuntimeDescriptor()
"""Get/set the current runtime state information.
Attempting to overwrite an existing *runtime* state raises an APIError.
De-initialize the stored runtime state by calling ``del`` on the attribute.
To re-initialize, de-initialize and then re-assign.
Design Note:
This pattern allows runtime state objects to be removed from the instance
data while potentially still in use (such as during clean-up), allowing
clearer scoping of access to the runtime state object.
In the future, we may find it more practical to use a module-level "ContextVar"
and to manage the scope in terms of PEP-567 Contexts. For the initial
implementation, though, we are using module-level runtime configuration
information, but storing the state information for initialized runtime facilities
through this Data Descriptor on the RuntimeManager instance.
Raises:
APIError if *state* is not None and a runtime state already exists.
"""
[docs] def __init__(
self,
*,
editor_factory: typing.Callable[[], typing.Callable] = None,
datastore: FileStore = None,
loop: asyncio.AbstractEventLoop,
configuration: _BackendT,
dispatcher_lock=None,
):
self.submitted_tasks = set()
# TODO: Only hold a queue in an active context manager.
self._command_queue = asyncio.Queue()
self._exception = None
self._loop: asyncio.AbstractEventLoop = loop
if editor_factory is None or not callable(editor_factory):
raise TypeError("Provide a callable that produces an edit_item interface.")
self.get_edit_item = editor_factory
if datastore is None:
raise TypeError("Provide a datastore.")
self.datastore = datastore
# TODO: Consider relying on module ContextVars and contextvars.Context scope.
self._runtime_configuration = configuration
if not isinstance(dispatcher_lock, asyncio.Lock):
raise TypeError("An asyncio.Lock is required to control dispatcher state.")
self._dispatcher_lock = dispatcher_lock
def configuration(self) -> _BackendT:
return self._runtime_configuration
[docs] @staticmethod
async def cpi(command: str, runtime):
"""Dispatcher for CPI messages.
TODO: Return value? We probably want to be able to capture something we can
query for the result of the CPI message.
"""
logger.debug(f"Null CPI handler received command {command}.")
[docs] @staticmethod
def runtime_shutdown(runtime):
"""Shutdown hook for runtime facilities.
Called while exiting the context manager. Allows specialized handling of
runtime backends in terms of the RuntimeManager instance and an abstract
*Runtime* object, presumably acquired while entering the context manager
through the *runtime_startup* hook.
The method is static as a reminder that all state should come through
the *runtime* argument. The hook is used in the context manager
implemented by the base class, which manages the removal of state
information regardless of the actions (or errors) of subclasses using
this hook.
"""
pass
def queue(self):
# TODO: Only expose queue while in an active context manager.
return self._command_queue
def exception(self) -> typing.Union[None, Exception]:
return self._exception
[docs] @contextlib.contextmanager
def runtime_configuration(self):
"""Runtime startup context manager hook.
This allows arbitrary environment changes during the scope of the __aenter__()
function, as well as a chance to modify the stored runtime configuration object.
Warning: this is probably not a permanent part of the interface. We should settle
on whether to store a configuration/context object or rely on the current
contextvars.Context.
"""
...
try:
yield None
finally:
...
[docs] @abc.abstractmethod
async def runtime_startup(self) -> asyncio.Task:
"""Runtime startup hook.
If the runtime manager uses a Session, this is the place to acquire it.
This coroutine itself returns a `asyncio.Task`. This allows the caller to yield until
until the runtime manager is actually ready.
Implementations may perform additional checks before returning to ensure that
the runtime queue processing task has run far enough to be well behaved if it
later encounters an error or is subsequently canceled. Also, blocking
interfaces used by the runtime manager could be wrapped in separate thread
executors so that the asyncio event loop doesn't have to block on this call.
"""
...
[docs] @abc.abstractmethod
def updater(self) -> AbstractWorkflowUpdater:
"""Initialize a WorkflowUpdater for the configured runtime."""
# TODO: Convert from an abstract method to a registration pattern.
...
async def __aenter__(self):
with self.runtime_configuration():
try:
# Get a lock while the state is changing.
# Warning: The dispatching protocol is immature.
# Initially, we don't expect contention for the lock, and if there is
# contention, it probably represents an unintended race condition
# or systematic dead-lock.
# TODO: Clarify dispatcher state machine and remove/replace assertions.
assert not self._dispatcher_lock.locked()
async with self._dispatcher_lock:
runner_task: asyncio.Task = await self.runtime_startup()
if runner_task.done():
if runner_task.cancelled():
raise DispatchError("Runner unexpectedly canceled while starting dispatching.")
else:
e = runner_task.exception()
if e:
logger.exception("Runner task failed with an exception.", exc_info=e)
raise e
else:
logger.warning("Runner task stopped unusually early, but did not raise an exception.")
self._queue_runner_task = runner_task
# Note: it would probably be most useful to return something with a
# WorkflowManager interface...
return self
except Exception as e:
self._exception = e
raise e
async def __aexit__(self, exc_type, exc_val, exc_tb): # noqa: C901
"""Clean up at context exit.
In addition to handling exceptions, clean up any Session resource.
We also need to make sure that we properly disengage from any queues
or generators.
We can also leave flags for ourself to be checked at __await__,
if there is a Task associated with the Executor.
"""
# Note that this coroutine could take a long time and could be
# cancelled at several points.
cancelled_error = None
# The dispatching protocol is immature.
# Initially, we don't expect contention for the lock, and if there is
# contention, it probably represents an unintended race condition
# or systematic dead-lock.
# TODO: Clarify dispatcher state machine and remove/replace assertions.
assert not self._dispatcher_lock.locked()
async with self._dispatcher_lock:
runtime = self.runtime
# This method is not thread safe, but we try to make clear as early as
# possible that instance.session is no longer publicly available.
del self.runtime
try:
# Stop the executor.
logger.debug("Stopping workflow execution.")
# TODO: Make sure that nothing else will be adding items to the
# queue from this point.
# We should establish some assurance that the next line represents
# the last thing that we will put in the queue.
self._command_queue.put_nowait({"control": "stop"})
# TODO: Consider what to do differently when we want to cancel work
# rather than just finalize it.
# Currently, the queue runner does not place subtasks,
# so there is only one thing to await.
# TODO: We should probably allow the user to provide some sort of timeout,
# or infer one from other time limits.
try:
await self._queue_runner_task
except asyncio.CancelledError as e:
raise e
except Exception as queue_runner_exception:
logger.exception("Unhandled exception when stopping queue handler.", exc_info=True)
self._exception = queue_runner_exception
else:
logger.debug("Queue runner task completed.")
finally:
if not self._command_queue.empty():
logger.error("Command queue never emptied.")
if len(self.submitted_tasks) == 0:
logger.debug("No tasks to wait for. Continuing to shut down.")
else:
# Wait for the tasks.
# For scalems.radical, the returned values are the rp.Task
# objects in their final states.
# QUESTION: How long should we wait before canceling tasks?
# TODO: Handle some sort of job maximum wall time parameter.
results = await asyncio.gather(*self.submitted_tasks)
# TODO: Log something useful about the results.
assert len(results) == len(self.submitted_tasks)
except asyncio.CancelledError as e:
logger.debug(f"{self.__class__.__qualname__} context manager received cancellation while exiting.")
cancelled_error = e
except Exception as e:
logger.exception("Exception while stopping dispatcher.", exc_info=True)
if self._exception:
logger.error("Queuer is already holding an exception.")
else:
self._exception = e
finally:
try:
await asyncio.to_thread(self.runtime_shutdown, runtime)
except asyncio.CancelledError as e:
cancelled_error = e
except Exception as e:
logger.exception(f"Exception while shutting down {repr(runtime)}.", exc_info=e)
else:
logger.debug("Runtime resources closed.")
if cancelled_error:
raise cancelled_error
# Only return true if an exception should be suppressed (because it was handled).
# TODO: Catch internal exceptions for useful logging and user-friendliness.
if exc_type is not None:
return False
[docs]async def manage_execution(executor: RuntimeManager, *, processing_state: asyncio.Event):
"""Process workflow messages until a stop message is received.
Initial implementation processes commands serially without regard for possible
concurrency.
Towards concurrency:
We can create all tasks without awaiting any of them.
Some tasks will be awaiting results from other tasks.
All tasks will be awaiting a `asyncio.Lock` or `asyncio.Condition` for each
required resource, but must do so indirectly.
To avoid dead-locks, we can't have a Lock object for each resource unless
they are managed by an intermediary that can do some serialization of requests.
In other words, we need a Scheduler that tracks the resource pool,
packages resource locks only when they can all be acquired without race
conditions or blocking,
and which then notifies the Condition for each task that it is allowed to run.
It should not do so until the dependencies of the task are known to have
all of the resources they need to complete (running with any dynamic dependencies
also running) and, preferably, complete.
Alternatively, the Scheduler can operate in blocks, allocating all resources,
offering the locks to tasks, waiting for all resources to be released,
then repeating.
We can allow some conditions to "wake up" the scheduler to back fill a block
of resources, but we should be careful with that.
(We still need to consider dynamic tasks that
generate other tasks. I think the only way to distinguish tasks which can't be
dynamic from those which might be would be with the `def` versus `async def` in
the implementing function declaration. If we abstract `await` with ``scalems.wait``,
we can throw an exception at execution time after checking a ContextVar.
It may be better to just let implementers use `await` for dynamically created
tasks, but we need to make the same check if a function calls ``.result()`` or
otherwise tries to create a dependency on an item that was not allocated
resources before the function started executing.
In a conservative first draft, we can simply throw an exception if
a non-`async def` function attempts to call a scalems workflow command like
"add_item" while in an executing context.)
"""
queue = executor.queue()
updater = executor.updater()
# Get a reference to the runtime, since it is removed from the executor before
# RuntimeManager.__aexit__() shuts down this task.
runtime = executor.runtime
# Acknowledge that the coroutine is running and will immediately begin processing
# queue items.
processing_state.set()
# Note that if an exception is raised here, the queue will never be processed.
# Could also accept a "stop" Event object for the loop conditional,
# but we would still need a way to yield on an empty queue until either
# the "stop" event or an item arrives, and then we might have to account
# for queues that potentially never yield any items, such as by sleeping briefly.
# We should be able to do
# signal_task = asyncio.create_task(stop_event.wait())
# queue_getter = asyncio.create_task(command_queue.get())
# waiter = asyncio.create_task(asyncio.wait((signal_task, queue_getter),
# return_when=FIRST_COMPLETED))
# while await waiter:
# done, pending = waiter.result()
# assert len(done) == 1
# if signal_task in done:
# break
# else:
# command: QueueItem = queue_getter.result()
# ...
# queue_getter = asyncio.create_task(command_queue.get())
# waiter = asyncio.create_task(asyncio(wait((signal_task, queue_getter),
# return_when=FIRST_COMPLETED))
#
command: QueueItem
while command := await queue.get():
# Developer note: The preceding line and the following try/finally block are
# coupled!
# Once we have awaited asyncio.Queue.get(), we _must_ have a corresponding
# asyncio.Queue.task_done(). For tidiness, we immediately enter a `try` block
# with a `finally` suite. Don't separate these without considering the Queue
# protocol.
try:
if not len(command.items()) == 1:
raise ProtocolError("Expected a single key-value pair.")
logger.debug(f"Processing command {repr(command)}")
# TODO(#23): Use formal RPC protocol.
if "control" in command:
logger.debug(f"Execution manager received {command['control']} command for {runtime}.")
try:
await executor.cpi(command["control"], runtime)
except scalems.exceptions.ScopeError as e:
logger.debug(
f"Control command \"{command['control']}\" ignored due to inactive RuntimeManager.", exc_info=e
)
# We should probably break here, too, right?
# TODO: Handle command results and take appropriate action on failures.
if command["control"] == "stop":
# End queue processing.
break
if "add_item" not in command:
# This will end queue processing. Tasks already submitted may still
# complete. Tasks subsequently added will update the static part of
# the workflow (WorkflowManager) and be eligible for dispatching in
# another dispatching session.
# If we need to begin to address the remaining queued items before
# this (the queue processor) task is awaited, we could insert a
# Condition here to alert some sort of previously scheduled special
# tear-down task.
raise MissingImplementationError(f"Executor has no implementation for {str(command)}.")
key = command["add_item"]
edit_item = executor.get_edit_item()
with edit_item(key) as item:
if not isinstance(item, Task):
raise InternalError(f"Bug: Expected {edit_item} to return a _context.Task")
# TODO: Check task dependencies.
##
# Note that we could insert resource management here. We could create
# tasks until we run out of free resources, then switch modes to awaiting
# tasks until resources become available, then switch back to placing
# tasks.
# Bind the WorkflowManager item to an RP Task.
task = await updater.submit(item=item)
if task.done():
# Stop processing the queue if task was cancelled or errored.
# TODO: Test this logical branch or consider removing it.
if task.cancelled():
logger.info(f"Stopping queue processing after unexpected cancellation of task {task}")
return
else:
exc = task.exception()
if exc:
logger.error(f"Task {task} failed much to fast. Stopping execution.")
raise exc
else:
logger.debug(f"Task {task} already done. Continuing.")
else:
executor.submitted_tasks.add(task)
except Exception as e:
logger.debug("Leaving queue runner due to exception.")
raise e
finally:
logger.debug('Releasing "{}" from command queue.'.format(str(command)))
queue.task_done()
# TODO: Resolve circular reference between `execution` and `workflow` modules.
# class ExecutorFactory(typing.Protocol[_BackendT]):
# def __call__(self,
# manager: WorkflowManager,
# params: typing.Optional[_BackendT] = None) -> RuntimeManager[_BackendT]:
# ...