Source code for scalems.radical.session

from __future__ import annotations

__all__ = (
    "runtime_session",
    "RuntimeSession",
)

import asyncio
import logging
import threading
import typing
import uuid
import warnings
import weakref

import typing_extensions

from scalems.exceptions import APIError
from scalems.exceptions import ProtocolError
from scalems.radical.runtime_configuration import RuntimeConfiguration

from radical import pilot as rp

logger = logging.getLogger(__name__)
logger.debug("Importing {}".format(__name__))


[docs]class RuntimeSession: """Container for scalems.radical runtime state data. Use a creation function to provide RuntimeSession with an asyncio event loop. Interact with the RuntimeSession in the main thread whenever possible. Let the RuntimeSession dispatch slow rp UI calls to other threads as needed and appropriate. Note: There is very little automated error recovery. For examples of expansive checking and re-launching of runtime resources, refer to the fixtures in conftest.py at or before revision 41b965a27c5af9abc115677b738085c35766b5b6. """ resources: typing.Optional[asyncio.Task[dict]] = None """The active Pilot resources, if any. The runtime_startup routine schedules a Task to get a copy of the Pilot.resource_details['rm_info'] dictionary, once the Pilot reaches state PMGR_ACTIVE. """ _configuration: RuntimeConfiguration _loop: asyncio.AbstractEventLoop _pilot_manager: typing.Optional[rp.PilotManager] = None _pilot: typing.Optional[rp.Pilot] = None _session: rp.Session _task_manager: typing.Optional[rp.TaskManager] = None
[docs] def __init__(self, session: rp.Session, *, loop: asyncio.AbstractEventLoop, configuration: RuntimeConfiguration): if not isinstance(session, rp.Session) or session.closed: raise ValueError("*session* must be an active RADICAL Pilot Session.") self._session = session # TODO(#359,#383): Call session.close in a ThreadPoolExecutor that we use for RP UI calls. self._session_finalizer = weakref.finalize(self, session.close) if loop.is_closed(): raise ValueError("*loop* must be an active event loop.") # Note: loop.is_running() may not yet return True if no coroutines have been awaited. self._loop = loop self._configuration = configuration self._new_pilot_lock = threading.Lock()
def __repr__(self): if session := self._session: session = session.uid if pilot := self._pilot: pilot = pilot.uid representation = f'<RuntimeSession "{session}" pilot:"{pilot}">' return representation
[docs] async def wait_closed(self): """Wait for a closing session to be closed. Use with `close()` to allow the asyncio event loop to resolve outstanding tasks. TODO: Do we need this? This method may not be necessary. Or it may be more necessary in the future. """ while not self.resources.done(): timer = asyncio.create_task(asyncio.sleep(10.0), name="Session closing timer") done, pending = asyncio.wait((self.resources, timer), return_when=asyncio.FIRST_EXCEPTION) if self.resources in done: timer.cancel() else: logger.info("Waiting for session to close.")
[docs] def close(self): """Direct the runtime to shut down and release resources. Warning: This function may return before resources have been finalized. Follow a call to `close()` with `wait_closed()` to give the event loop a chance to cycle. """ # De-initialize state: reset data members to class defaults. with self._new_pilot_lock: if self.resources is not None and not self.resources.done(): if threading.main_thread() == threading.current_thread(): self.resources.cancel() else: self._loop.call_soon_threadsafe(self.resources.cancel) if self._pilot is not None: del self._pilot if self._task_manager is not None: del self._task_manager if self._pilot_manager is not None: del self._pilot_manager # Note: there are no documented exceptions or errors to check for, # programmatically. Some issues encountered during shutdown will be # reported through the reporter or logger of the # radical.pilot.utils.component.Component base. # The RP convention seems to be to use the component uid as the name # of the underlying logging.Logger node, so we could presumably attach # a log handler to the logger for a component of interest. logger.debug(f"Closing Session {self.session.uid}.") with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=DeprecationWarning, module="radical.pilot.task_manager") warnings.filterwarnings("ignore", category=DeprecationWarning, module="radical.pilot.db.database") warnings.filterwarnings("ignore", category=DeprecationWarning, module="radical.pilot.session") # TODO: let rp.Session.close run in a separate thread. # Note: One option to avoid allocating new threads during shutdown is # to wrap the session at creation time in a Task that calls close() # when it catches an Exception or an Event. # TODO: Wrap asyncio.to_thread for RP calls to get some accounting of how many # rp UI calls are pending and how many threads are being used for rp UI calls. # TODO: Use a single thread to serialize rp UI calls? self.session.close(download=True) self._session_finalizer() del self._session_finalizer
@property def session(self) -> rp.Session: """The current radical.pilot.Session (may already be closed).""" return self._session def _new_pilotmanager(self): with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=DeprecationWarning, module="radical.pilot.task_manager") warnings.filterwarnings("ignore", category=DeprecationWarning, module="radical.pilot.db.database") warnings.filterwarnings("ignore", category=DeprecationWarning, module="radical.pilot.session") return rp.PilotManager(session=self.session) @typing.overload def pilot_manager(self) -> typing.Union[rp.PilotManager, None]: ... @typing.overload def pilot_manager(self, pilot_manager: str) -> rp.PilotManager: ... @typing.overload def pilot_manager(self, pilot_manager: rp.PilotManager) -> rp.PilotManager: ...
[docs] def pilot_manager(self, pilot_manager=None) -> typing.Union[rp.PilotManager, None]: """Get (optionally set) the current PilotManager. Args: pilot_manager (optional, radical.pilot.PilotManager, str): Set to RP PilotManager instance or identifier, if provided. Returns: radical.pilot.PilotManager: instance, if set, else ``None``. Raises: ValueError: for invalid identifier. APIError: for invalid RP Session configuration. """ if pilot_manager is None: # Caller should destroy and recreate Pilot if this call has to replace PilotManager. session = self.session if session.closed: # Once rp.Session is closed, require a new RuntimeSession. raise ProtocolError(f"RP Session {self.session.uid} is closed. Get a new RuntimeSession instance.") if self._pilot_manager is not None: return self._pilot_manager # Is there a way to check whether the PilotManager is healthy? logger.info(f"Creating a new PilotManager for {self.session.uid}") manager = self._new_pilotmanager() logger.info(f"New PilotManager is {manager.uid}") return self.pilot_manager(manager) elif isinstance(pilot_manager, rp.PilotManager): if self._pilot_manager is not None and pilot_manager != self._pilot_manager: raise APIError(f"PilotManager {self._pilot_manager.uid} already assigned.") if not pilot_manager.session.uid == self.session.uid: raise APIError("Cannot accept a PilotManager from a different Session.") self._pilot_manager = pilot_manager return self._pilot_manager else: uid = pilot_manager try: pmgr = self.session.get_pilot_managers(pmgr_uids=uid) assert isinstance(pmgr, rp.PilotManager) except (AssertionError, KeyError) as e: raise ValueError(f"{uid} does not describe a valid PilotManager") from e except Exception as e: logger.exception("Unhandled RADICAL Pilot exception.", exc_info=e) raise ValueError(f"{uid} does not describe a valid PilotManager") from e else: return self.pilot_manager(pmgr)
@typing.overload def task_manager(self) -> typing.Union[rp.TaskManager, None]: ... @typing.overload def task_manager(self, task_manager: str) -> rp.TaskManager: ... @typing.overload def task_manager(self, task_manager: rp.TaskManager) -> rp.TaskManager: ...
[docs] def task_manager(self, task_manager=None) -> typing.Union[rp.TaskManager, None]: """Get (optionally set) the current TaskManager. Args: task_manager (optional, radical.pilot.TaskManager, str): Set to RP TaskManager instance or identifier, if provided. Returns: radical.pilot.TaskManager: instance, if set, else ``None``. Raises: ValueError: for invalid identifier. APIError: for invalid RP Session configuration. """ if task_manager is None: return self._task_manager elif isinstance(task_manager, rp.TaskManager): if not task_manager.session.uid == self.session.uid: raise APIError("Cannot accept a TaskManager from a different Session.") self._task_manager = task_manager return task_manager else: uid = task_manager try: tmgr = self.session.get_task_managers(tmgr_uids=uid) assert isinstance(tmgr, rp.TaskManager) except (AssertionError, KeyError) as e: raise ValueError(f"{uid} does not describe a valid TaskManager") from e except Exception as e: logger.exception("Unhandled RADICAL Pilot exception.", exc_info=e) raise ValueError(f"{uid} does not describe a valid TaskManager") from e else: return self.task_manager(tmgr)
@staticmethod def _new_pilot( *, session: rp.Session, pilot_manager: rp.PilotManager, pilot_description: rp.PilotDescription, task_manager: rp.TaskManager, ): logger.debug( "Using resource config: {}".format(str(session.get_resource_config(pilot_description.resource).as_dict())) ) logger.debug("Using PilotDescription: {}".format(str(pilot_description.as_dict()))) with warnings.catch_warnings(): warnings.filterwarnings("ignore", category=DeprecationWarning, module="radical.pilot.task_manager") warnings.filterwarnings("ignore", category=DeprecationWarning, module="radical.pilot.db.database") warnings.filterwarnings("ignore", category=DeprecationWarning, module="radical.pilot.session") pilot = pilot_manager.submit_pilots([rp.PilotDescription(pilot_description)])[0] task_manager.add_pilots(pilot) return pilot
[docs] def pilot(self) -> rp.Pilot: """Get active Pilot. Allows lazy initialization of the Pilot resource. Returns: radical.pilot.Pilot: The current Pilot instance, if available and valid, or a new Pilot instance in the configured PilotManager. Raises: APIError: for invalid RP Session configuration. """ with self._new_pilot_lock: if self.session.closed: raise APIError("Session is already closed.") pilot_manager = self.pilot_manager() if not pilot_manager: raise APIError("Cannot get/set Pilot before setting PilotManager.") pilot = self._pilot if pilot is None or pilot.state in rp.FINAL: if pilot is None: logger.info(f"Creating a Pilot for {self.session.uid}") else: assert isinstance(pilot, rp.Pilot) logger.info(f"Old Pilot {pilot.uid} in state {pilot.state}") pilot_description = describe_pilot(self._configuration) logger.debug("Requesting Pilot: {}".format(repr(pilot_description.as_dict()))) task_manager = self.task_manager() if not task_manager: raise APIError("Cannot get/set Pilot before setting TaskManager.") pilot = self._new_pilot( session=self.session, pilot_manager=pilot_manager, pilot_description=pilot_description, task_manager=task_manager, ) logger.debug(f"Got Pilot {pilot.uid}: {pilot.as_dict()}") # Note: This could take hours or days depending on the queuing system. # Can we report some more useful information, like job ID? # self.resources = self._loop.create_task(pilot_resources(pilot), name="Pilot resources") self.resources = asyncio.create_task(get_pilot_resources(pilot), name="Pilot resources") self._pilot = pilot # Do some checking. session = pilot.session assert isinstance(session, rp.Session) if session.uid != self.session.uid: raise APIError("Cannot accept a Pilot from a different Session.") if pilot.pmgr.uid != pilot_manager.uid: raise APIError("Pilot must be associated with a PilotManager already configured.") # TODO: If new, the Pilot referenced will still be starting up. It seems like we # don't know when or if the Pilot will ever actually start. But maybe we should use # a Future to allow for synchronization or error detection. return pilot
def describe_pilot(configuration: RuntimeConfiguration): pilot_description_dict = configuration.rp_resource_params["PilotDescription"].copy() # Get a unique identifier. pilot_description_dict["uid"] = f"pilot.{str(uuid.uuid4())}" pilot_description_dict["resource"] = configuration.execution_target assert pilot_description_dict["exit_on_error"] is False # if pilot_description_dict.get("exit_on_error", True): # warnings.warn("Failing to set PilotDescription.exit_on_error to False may prevent clean shut down.") pilot_description = rp.PilotDescription(pilot_description_dict) return pilot_description def _rp_session(*args, **kwargs) -> rp.Session: # Note: radical.pilot.Session creation causes several deprecation warnings. # Ref https://github.com/radical-cybertools/radical.pilot/issues/2185 with warnings.catch_warnings(): warnings.simplefilter("ignore", category=DeprecationWarning) # This would be a good time to `await`, if an event-loop friendly # Session creation function becomes available. session = rp.Session(*args, **kwargs) logger.info(f"Created {session.uid}") return session async def runtime_session(*, configuration: RuntimeConfiguration, loop=None) -> RuntimeSession: """Start a new RADICAL Pilot Session. Returns: RuntimeSession instance. """ if loop is None: loop = asyncio.get_running_loop() _task = asyncio.create_task(asyncio.to_thread(_rp_session), name="create-Session") session: rp.Session = await _task runtime = RuntimeSession(session=session, loop=loop, configuration=configuration) # At some point soon, we need to track Session ID for the workflow metadata. session_id = runtime.session.uid # Do we want to log this somewhere? # session_config = copy.deepcopy(self.session.cfg.as_dict()) logger.debug("Acquired RP Session {}".format(session_id)) logger.debug("Launching PilotManager.") pilot_manager = await asyncio.create_task( asyncio.to_thread(rp.PilotManager, session=runtime.session), name="get-PilotManager", ) pilot_manager = runtime.pilot_manager(pilot_manager) logger.debug("Got PilotManager {}.".format(pilot_manager.uid)) logger.debug("Launching TaskManager.") task_manager = await asyncio.create_task( asyncio.to_thread(rp.TaskManager, session=runtime.session), name="get-TaskManager", ) task_manager = runtime.task_manager(task_manager) logger.debug("Got TaskManager {}".format(task_manager.uid)) # # Get a Pilot # # We can launch an initial Pilot, but we may have to run further Pilots # during self._queue_runner_task (or while servicing scalems.wait() within the # with block) to handle dynamic work load requirements. # Optionally, we could refrain from launching the pilot here, at all, # but it seems like a good chance to start bootstrapping the agent environment. # # How and when should we update the pilot description? pilot = runtime.pilot() logger.debug("Added Pilot {} to task manager {}.".format(pilot.uid, runtime.task_manager().uid)) return runtime class RmInfo(typing_extensions.TypedDict): # Refer to https://github.com/radical-cybertools/radical.pilot/issues/2973 # for evolution of a more stable interface. # See issue #367 requested_cores: int requested_gpus: int async def get_pilot_resources(pilot: rp.Pilot) -> RmInfo: def log_pilot_state(fut: asyncio.Task[str]): if not fut.cancelled(): if e := fut.exception(): logger.exception("Exception while watching for Pilot to become active.", exc_info=e) logger.info(f"Pilot {pilot.uid} in state {pilot.state}.") logger.info("Waiting for an active Pilot.") # Wait for Pilot to be in state PMGR_ACTIVE. (There is no reasonable # choice of a timeout because we are waiting for the HPC queuing system.) # Then, query Pilot.resource_details['rm_info']['requested_cores'] and 'requested_gpus'. pilot_state = asyncio.create_task( asyncio.to_thread(pilot.wait, state=rp.PMGR_ACTIVE, timeout=None), name="pilot_state_waiter" ) pilot_state.add_done_callback(log_pilot_state) await pilot_state rm_info: RmInfo = pilot.resource_details.get("rm_info") logger.debug(f"Pilot {pilot.uid} resources: {str(rm_info)}") if rm_info is not None: assert "requested_cores" in rm_info and isinstance(rm_info["requested_cores"], int) assert "requested_gpus" in rm_info and isinstance(rm_info["requested_gpus"], int) return rm_info.copy()