Source code for scalems.radical.runtime

"""Manage the RADICAL Pilot start-up and shut-down.

The provided RuntimeSession class encapsulates stateful resources that, once acquired,
should be shut down explicitly. RuntimeSession instances may be used as context managers to
ensure the proper protocol is followed, or the caller can take responsibility for
calling RuntimeSession.close() to shut down.
TODO: Fix!

Note: Consider whether RuntimeSession context manager is reentrant or multi-use.

The RuntimeSession state encapsulates several nested states. A Session, TaskManager,
and PilotManager must be created for the RuntimeSession to be usable. Additionally, Pilots and
scheduler tasks may be added or removed during the RuntimeSession lifetime. To better support
the alternate scenarios when a RuntimeSession instance may be provided to a scalems.radical
component in an arbitrary state, consider making the ``with`` block scoped, such that
it only returns the RuntimeSession instance to its initial state when exiting, rather than
shutting down completely.
See also https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack

Deferred:
    RuntimeSession can avoid providing direct access to RP interface, and instead run an
    entire RP Session state machine in a thread (separate from the asyncio event loop
    thread), relaying RP scripting commands through queues, in order to completely
    prevent misuse and to insulate the asyncio event loop from blocking RP commands.
    We need to get a better sense of the RP flow combinatorics before we can reasonably
    pursue this.

See Also:
    https://github.com/SCALE-MS/scale-ms/issues/55

.. uml::

    title scalems on radical.pilot run time

    box "SCALE-MS framework" #honeydew
    participant WorkflowManager as client_workflowmanager
    participant RuntimeManager
    participant "runner task" as scalems.execution
    end box
    box "SCALE-MS RP adapter" #linen
    participant RuntimeSession as client_runtime
    participant Executor as client_executor
    end box

    autoactivate on

    client_workflowmanager -> client_workflowmanager: async with executor
    client_workflowmanager -> client_executor: ~__aenter__()
    activate client_workflowmanager
    client_executor -> RuntimeManager: ~__aenter__() base class method
    RuntimeManager -> client_executor: runtime_configuration()
    return
    RuntimeManager -> client_executor: runtime_startup()

    client_executor -> : rp.Session()
    return
    client_executor -> client_runtime **: Session
    activate client_runtime
    client_executor -> : rp.PilotManager()
    return
    client_executor -> client_runtime: pilot_manager()
    return
    client_executor -> : rp.TaskManager()
    return
    client_executor -> client_runtime: task_manager()
    return
    client_executor -> : pilot_manager.submit_pilots()
    return
    client_executor -> client_runtime: pilot()
    note left
    Pilot venv is determined by resource definition (JSON file).
    end note
    return

    group ref [scalems.radical.raptor]
    client_executor -> client_executor: get_raptor()
    return
    end

    client_executor -> client_runtime: set raptor
    return

    client_executor ->> scalems.execution **: create_task(manage_raptor)
    client_executor -> scalems.execution: await runner_started
    RuntimeManager <-- client_executor: asyncio.Task
    RuntimeManager -> RuntimeManager: set runner_task
    return
    RuntimeManager --> client_executor: self
    client_workflowmanager <-- client_executor: RuntimeManager
    client_workflowmanager --> client_workflowmanager: as manager
    deactivate RuntimeManager

    client_workflowmanager -> client_workflowmanager #gray: async with dispatcher

    ...Raptor workload handling...

    return leave dispatcher context

    == Shut down runtime ==

    client_workflowmanager -> client_executor: ~__aexit__()
    client_executor -> RuntimeManager: ~__aexit__() base class method
    RuntimeManager ->> scalems.execution: enqueue a stop control message
    deactivate scalems.execution
    RuntimeManager -> RuntimeManager: await runner_task
    note right
        drain the queue
    end note
    RuntimeManager <-- scalems.execution
    deactivate RuntimeManager
    RuntimeManager ->> client_executor: runtime_shutdown()

    client_runtime <- client_executor
    return session
    client_runtime <- client_executor
    return raptor

    group Finalize Raptor task [if raptor is not None]
        group Give Raptor some time to shut down [if raptor.state not in FINAL]
            client_executor -> : runtime.raptor.wait(timeout=60)
        end
        group Forcibly cancel Raptor task [if raptor.state not in FINAL]
            client_runtime <- client_executor
            return task_manager
            client_executor -> : task_manager.cancel_tasks()
            return
            client_runtime <- client_executor
            return raptor
            client_executor -> : runtime.raptor.wait()
            return
        end
    end

    client_executor -> : session.close()
    return

    RuntimeManager -> RuntimeManager: await runtime_shutdown()

    client_executor -> client_runtime !!
    RuntimeManager <<-- client_executor
    RuntimeManager --> RuntimeManager
    RuntimeManager --> client_executor

    client_workflowmanager <-- client_executor: leave executor context
    client_workflowmanager --> client_workflowmanager

"""

from __future__ import annotations

__all__ = ("executor_factory", "current_configuration")

import asyncio
import contextlib
import contextvars
import logging
import os
import typing
import warnings
import weakref

from radical import pilot as rp

import scalems.call
import scalems.exceptions
import scalems.execution
import scalems.file
import scalems.invocation
import scalems.messages
import scalems.radical
import scalems.radical.runtime_configuration
import scalems.radical.raptor
import scalems.store
import scalems.subprocess
import scalems.workflow
from scalems.exceptions import DispatchError
from scalems.exceptions import MissingImplementationError
from scalems.exceptions import ProtocolError
from .runtime_configuration import get_pre_exec
from .runtime_configuration import RuntimeConfiguration
from .session import runtime_session
from .session import RuntimeSession
from .raptor import coro_get_scheduler
from .task import submit
from ..store import FileStore
from ..execution import AbstractWorkflowUpdater

logger = logging.getLogger(__name__)
logger.debug("Importing {}".format(__name__))


_configuration = contextvars.ContextVar("_configuration")


def current_configuration() -> typing.Optional[RuntimeConfiguration]:
    """Get the current RADICAL runtime configuration, if any.

    Returns:
        RuntimeConfiguration | None: The configuration for the RuntimeManager scope,
            if any, or None.

    TODO: Don't use a global and globally accessible module state. Let context managers
        like RuntimeManager use their own ContextVars if appropriate.
    """
    return _configuration.get(None)


[docs]class RPDispatchingExecutor(scalems.execution.RuntimeManager[RuntimeConfiguration, RuntimeSession]): """Client side manager for work dispatched through RADICAL Pilot. Extends :py:class:`scalems.execution.RuntimeManager` Configuration points: * resource config * pilot config * session config? We try to wrap rp UI calls in separate threads. Note, though, that * The rp.Session needs to be created in the root thread to be able to correctly manage signal handlers and subprocesses, and * We need to be able to schedule RP Task callbacks in the same process as the asyncio event loop in order to handle Futures for RP tasks. See https://github.com/SCALE-MS/randowtal/issues/2 """ runtime: RuntimeSession """See `scalems.execution.RuntimeManager.runtime`""" raptor: typing.Optional[rp.raptor_tasks.Raptor] = None """Raptor scheduler (master task)."""
[docs] def __init__( self, *, editor_factory: typing.Callable[[], typing.Callable] = None, datastore: FileStore = None, loop: asyncio.AbstractEventLoop, configuration: RuntimeConfiguration, ): """Create a client side execution manager. Warning: The creation method does not fully initialize the instance. Initialization and de-initialization occurs through the Python (async) context manager protocol. """ if "RADICAL_PILOT_DBURL" not in os.environ: raise DispatchError("RADICAL Pilot environment is not available.") if not isinstance(configuration.target_venv, str) or len(configuration.target_venv) == 0: raise ValueError("Caller must specify a venv to be activated by the execution agent for dispatched tasks.") super().__init__(editor_factory=editor_factory, datastore=datastore, loop=loop, configuration=configuration)
[docs] @contextlib.contextmanager def runtime_configuration(self) -> RuntimeConfiguration: """Provide scoped Configuration. Merge the runtime manager's configuration with the global configuration, update the global configuration, and yield the configuration for a ``with`` block. Restores the previous global configuration when exiting the ``with`` block. Warning: Not thread-safe. We use the ContextVar to check for (and disallow) re-entrance. This contextmanager is not async, but it could be (and is) used within an asynchronous context manager. If used from multiple threads, we would not have anything structurally prohibiting reentrant calls. Design notes: Instance configuration is coupled to module state except as allowed through :py:func:`contextvars.copy()`. """ if _configuration.get(None): raise scalems.exceptions.APIError("A scalems.radical runtime is already active.") assert self.datastore is not None c = self.configuration() token = _configuration.set(c) try: yield c finally: _configuration.reset(token)
[docs] async def runtime_startup(self) -> asyncio.Task: """Establish the RP Session. Acquire a maximally re-usable set of RP resources. The scope established by this function is as broad as it can be within the life of the workflow manager. Once *instance.runtime_startup()* succeeds, *instance.runtime_shutdown()* must be called to clean up resources. Use the async context manager behavior of the instance to automatically follow this protocol. I.e. instead of calling ``instance.runtime_startup(); ...; instance.runtime_shutdown()``, use:: async with instance: ... Raises: DispatchError: if task dispatching could not be set up. asyncio.CancelledError: if parent `asyncio.Task` is cancelled while executing. Note: **Signal handling** RP is known to use IPC signals in several cases. We believe that the client environment should only experience an internally triggered SIGINT in a single code path, and the behavior can be suppressed by setting the Pilot's :py:attr:`~radical.pilot.PilotDescription.exit_on_error` attribute to `False`. We could use `loop.add_signal_handler() <https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.add_signal_handler>`__ to convert to an exception that we can raise in an appropriate task, but this is probably unnecessary. Moreover, with Python 3.11, we get a sensible signal handling behavior (for SIGINT) with :py:class:`asyncio.Runner`. Per https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption, we can just make sure that our run time resources will be properly shut down in the event of a :py:class:`asyncio.CancelledError`, including sending appropriate calls to the `radical.pilot` framework. See Also: https://github.com/SCALE-MS/randowtal/issues/1 TODO: More concurrency. The rp.Pilot and raptor task can be separately awaited, and we should allow input data staging to begin as soon as we have enough run time details to do it. We need to clarify which tasks should be in which state to consider the asynchronous context manager to have been successfully "entered". I expect that success includes a Pilot in state PMGR_ACTIVE, and a raptor Task in state AGENT_EXECUTING, and asyncio.Task handles available for other aspects, like synchronization of metadata and initiation of input data staging. However, we may prefer that the workflow script can continue evaluation on the client side while waiting for the Pilot job, and that we avoid blocking until we absolutely have to (presumably when exiting the dispatching context). """ config: RuntimeConfiguration = self._runtime_configuration # TODO: Check that we have a FileStore. try: # # Start the Session. # # Note: the current implementation implies that only one Task for the dispatcher # will exist at a time. We are further assuming that there will probably only # be one Task per the lifetime of the dispatcher object. # We could choose another approach and change our assumptions, if appropriate. logger.debug("Entering RP dispatching context. Waiting for rp.Session.") self.runtime = await runtime_session(loop=self._loop, configuration=config) # TODO: Asynchronous data staging optimization. # Allow input data staging to begin before scheduler is in state EXECUTING and # before Pilot is in state PMGR_ACTIVE. assert self.raptor is None # Get a scheduler task IFF raptor is explicitly enabled. if config.enable_raptor: pilot = self.runtime.pilot() # TODO(#335): Separate Worker provisioning from Master provisioning. raptor_config_file_future = asyncio.create_task( scalems.radical.raptor.raptor_input( filestore=self.datastore, ), name="get-raptor-input", ) # Note that coro_get_scheduler is a coroutine that, itself, returns a rp.Task. # We await the result of coro_get_scheduler, then store the scheduler Task. self.raptor = await asyncio.create_task( coro_get_scheduler( pre_exec=list(get_pre_exec(config)), pilot=pilot, filestore=self.datastore, config_future=raptor_config_file_future, ), name="get-scheduler", ) # Note that we can derive scheduler_name from self.scheduler.uid in later methods. except asyncio.CancelledError as e: raise e except Exception as e: logger.exception("Exception while connecting RADICAL Pilot.", exc_info=e) raise DispatchError("Failed to launch SCALE-MS raptor task.") from e if self.runtime is None or self.runtime.session.closed: raise ProtocolError("Cannot process queue without a RP Session.") # Launch queue processor (proxy executor). # TODO: Make runtime_startup optional. Let it return a resource that is # provided to the normalized run_executor(), or maybe use it to configure the # Submitter that will be provided to the run_executor. runner_started = asyncio.Event() runner_task = asyncio.create_task(scalems.execution.manage_execution(self, processing_state=runner_started)) await runner_started.wait() # TODO: Note the expected scope of the runner_task lifetime with respect to # the global state changes (i.e. ContextVars and locks). return runner_task
[docs] def runtime_shutdown(self, runtime: RuntimeSession): """Manage tear down of the RADICAL Pilot Session and resources. Several aspects of the RP runtime interface use blocking calls. This method should be run in a non-root thread (concurrent.futures.Future) that the event loop can manage as an asyncio-wrapped task. Overrides :py:class:`scalems.execution.RuntimeManager` """ # Note that __aexit__() deletes self.runtime before calling # runtime_shutdown to prevent re-entrance. We will address such race # conditions differently in later updates. # TODO: Move this to a RuntimeSession.close() method. session: rp.Session = getattr(runtime, "session", None) if session is None: raise scalems.exceptions.APIError(f"No Session in {runtime}.") if session.closed: logger.error("RuntimeSession is already closed?!") else: raptor = self.raptor if raptor is not None: del self.raptor # Note: The __aexit__ for the RuntimeManager makes sure that a `stop` # is issued after the work queue is drained, if the scheduler task has # not already ended. We could check the status of this stop message... if raptor.state not in rp.FINAL: logger.info(f"Waiting for RP Raptor raptor task {raptor.uid} to complete...") raptor.wait(rp.FINAL, timeout=60) if raptor.state not in rp.FINAL: # Cancel the raptor. logger.warning("Canceling the raptor scheduling task.") # Note: the effect of CANCEL is to send SIGTERM to the shell that # launched the task process. # TODO: Report incomplete tasks and handle the consequences of terminating the # Raptor processes early. task_manager = runtime.task_manager() task_manager.cancel_tasks(uids=raptor.uid) # As of https://github.com/radical-cybertools/radical.pilot/pull/2702, # we do not expect `cancel` to block, so we must wait for the # cancellation to succeed. It shouldn't take long, but it is not # instantaneous or synchronous. We hope that a minute is enough. final_state = raptor.wait(state=rp.FINAL, timeout=10) logger.info(f"Master scheduling task state {final_state}: {repr(raptor)}.") if raptor.stdout: # TODO(#229): Fetch actual stdout file. logger.debug(raptor.stdout) if raptor.stderr: # TODO(#229): Fetch actual stderr file. # Note that all of the logging output goes to stderr, it seems, # so we shouldn't necessarily consider it an error level event. logger.debug(raptor.stderr) # TODO(#108,#229): Receive report of work handled by Master. runtime.close() if session.closed: logger.debug(f"Session {session.uid} closed.") else: logger.error(f"Session {session.uid} not closed!") logger.debug("RuntimeSession shut down.")
def updater(self) -> "WorkflowUpdater": return WorkflowUpdater(executor=self)
[docs]def executor_factory(manager: scalems.workflow.WorkflowManager, params: RuntimeConfiguration = None): if params is None: warnings.warn("executor_factory called without explicit configuration.") params = scalems.radical.runtime_configuration.configuration() elif not isinstance(params, RuntimeConfiguration): raise ValueError("scalems.radical executor_factory *params* must be a RuntimeConfiguration instance.") executor = RPDispatchingExecutor( editor_factory=weakref.WeakMethod(manager.edit_item), datastore=manager.datastore(), loop=manager.loop(), configuration=params, ) return executor
class RPResult: """Basic result type for RADICAL Pilot tasks. Define a return type for Futures or awaitable tasks from RADICAL Pilot commands. """ # TODO: Provide support for RP-specific versions of standard SCALEMS result types. class WorkflowUpdater(AbstractWorkflowUpdater): def __init__(self, executor: RPDispatchingExecutor): self.executor = executor self.task_manager = executor.runtime.task_manager() # TODO: Make sure we are clear about the scope of the configuration and the # life time of the workflow updater / submitter. self._pre_exec = list(get_pre_exec(executor.configuration())) async def submit(self, *, item: scalems.workflow.Task) -> asyncio.Task: # TODO: Ensemble handling item_shape = item.description().shape() if len(item_shape) != 1 or item_shape[0] != 1: raise MissingImplementationError("Executor cannot handle multidimensional tasks yet.") task: asyncio.Task[rp.Task] = await submit(item=item, task_manager=self.task_manager, pre_exec=self._pre_exec) return task