"""Support for scalems on radical.pilot.raptor.
Define the connective tissue for SCALE-MS tasks embedded in rp.Task arguments.
Provide the RP Raptor and Worker details. Implement the raptor task
script as a package entry point that we expect to be executable from the
command line in a virtual environment where the scalems package is installed,
without further modification to the PATH.
The client should be reasonably certain that the target environment has a
compatible installation of RP and scalems. A rp.raptor.Master task script is
installed with the scalems package. The script name is provide by the
module function :py:func:`~scalems.radical.raptor.raptor_script()`, and will
be resolvable on the PATH for a Python interpreter process in an
environment that has the `scalems` package installed.
We should try to keep this module as stable as possible so that the run time
interface provided by scalems to RP is robust, and the entry point scripts
change as little as possible across versions.
scalems.radical runtime details live in runtime.py.
As of :py:mod:`radical.pilot` version 1.18, TaskDescription supports
a *mode* field and several additional fields. The extended schema for the TaskDescription
depends on the value of *mode*. The structured data provided to the executor callable
is composed from these additional fields according to the particular mode.
We use the :py:data:`radical.pilot.task_description.TASK_FUNCTION` mode,
specifying a *function* field to name a callable in the (global) namespace
accessible by the worker process.
We populate the worker global namespace with imported callables in the module
file from which Raptor imports our :py:class:`radical.pilot.raptor.MPIWorker` subclass,
:py:class:`~scalems.radical.raptor.ScaleMSWorker`.
The callable for *call* accepts ``*args`` and ``**kwargs``, which are extracted
from fields in the TaskDescription. Since we derive from MPIWorker, an mpi4py
communicator is inserted at ``args[0]``.
Protocol
--------
As of RP 1.18, the :py:mod:`radical.pilot.raptor` interface is not documented.
The following diagrams illustrate the approximate architecture of a Raptor Session.
.. uml::
title RP Raptor client Session
box "RP client"
participant TaskManager
end box
'queue "Taskmanager: Scheduler and Input Queue" as Queue
queue "Scheduler/Input Queue" as Queue
-> TaskManager: submit(master_task_description)
activate TaskManager
'note left
note over TaskManager
TaskDescription uses `mode=rp.RAPTOR_MASTER`.
Task args provide details for the script
so that a Raptor can start appropriate Workers.
end note
TaskManager -> Queue: task_description
activate Queue
deactivate TaskManager
Queue <-
Queue -->
deactivate Queue
-> TaskManager: submit(task_description)
activate TaskManager
'note left
note over TaskManager
TaskDescription names a Raptor uid in *raptor_id* field.
end note
TaskManager -> Queue: task_description
activate Queue
deactivate TaskManager
Queue <-
Queue -->
deactivate Queue
...
TaskManager ->
TaskManager <-- : raptor task results
A "raptor task" is an *executable* task (*mode* = ``rp.RAPTOR_MASTER``)
in which the script named by
:py:data:`radical.pilot.TaskDescription.executable`
manages the life cycle of a :py:class:`radical.pilot.raptor.Master`
(or subclass instance).
As of RP 1.14, the protocol is as follows.
.. uml::
title raptor raptor task lifetime management
participant "raptor task"
-> "raptor task" : stage in
create Raptor as raptor
"raptor task" -> raptor: create Raptor(cfg)
"raptor task" -> raptor: Raptor.submit_workers(descr=descr, count=n_workers)
"raptor task" -> raptor: Raptor.start()
alt optional hook for self-submitting additional tasks
"raptor task" -> raptor: Raptor.submit_tasks(tasks)
end
queue scheduler
scheduler -\\ raptor : request_cb
scheduler -\\ raptor : result_cb
raptor -> raptor: Raptor.stop() (optional)
"raptor task" -> raptor: Raptor.join()
[<-- "raptor task" : stage out
The *cfg* argument to :py:class:`radical.pilot.raptor.Master` does not currently
appear to be required.
*scalems* encodes worker requirements on the client side. The *scalems* raptor
script decodes the client-provided requirements, combines the information with
run time details and work load inspection, and produces the WorkerDescription
with which to :py:func:`~radical.pilot.raptor.Master.submit_workers()`.
.. uml::
class RaptorWorkerConfig {
count
descr
}
class WorkerDescription
RaptorWorkerConfig *--> "descr" WorkerDescription
namespace scalems.radical {
class RaptorConfiguration
class ClientWorkerRequirements
class ScaleMSRaptor
ScaleMSRaptor --> RaptorConfiguration : launches with
RaptorConfiguration *-- ClientWorkerRequirements
ClientWorkerRequirements -right-> .RaptorWorkerConfig : worker_description()
}
namespace radical.pilot.raptor {
class Master {
submit_workers()
}
Master::submit_workers .> .RaptorWorkerConfig
}
scalems.radical.ScaleMSRaptor -up-|> radical.pilot.raptor.Master
.. uml::
title scalems raptor runtime management
!pragma teoz true
box "client"
box "scalems.radical" #honeydew
participant RPDispatchingExecutor as client_runtime
end box
box "radical.pilot" #White
participant task_manager
end box
end box
queue "RP runtime" as scheduler
-> client_runtime : runtime_startup()
activate client_runtime
client_runtime -> client_runtime : coro_get_scheduler()
activate client_runtime
client_runtime -> client_runtime : raptor_script()
client_runtime -> client_runtime : raptor_input()
activate client_runtime
client_runtime -> client_runtime : worker_requirements()
activate client_runtime
note over client_runtime
TODO: Allocate Worker according to workload
through a separate call to the running Raptor
and remove ClientWorkerRequirements from RaptorConfiguration.
end note
return ClientWorkerRequirements
return RaptorConfiguration
client_runtime -> client_runtime: launch Raptor Task
activate client_runtime
note over client_runtime, task_manager
We currently require a pre-existing venv for Raptor task.
TODO(#90,#141): Allow separate venv for Worker task and Tasks.
end note
client_runtime -> task_manager : submit raptor task
task_manager -\\ scheduler
note over task_manager, scheduler
Raptor cfg is staged with TaskDescription.
end note
deactivate client_runtime
note over client_runtime, task_manager
TODO(#92,#105): Make sure the Worker starts successfully!!!
end note
return raptor task
return dispatching context
TODO
----
Pass input and output objects more efficiently.
Worker processes come and go,
and are not descended from Raptor processes (which may not even be on the same node),
so we can't generally pass objects by reference or even in shared memory segments.
However, we can use optimized ZMQ facilities for converting network messages
directly to/from memory buffers.
In other cases, we can use the scalems typing system to specialize certain types
for optimized (de)serialization from/to file-backed storage or streams.
See https://github.com/SCALE-MS/scale-ms/issues/106
TODO
----
Get a stronger specification of the RP Raptor interface.
See https://github.com/radical-cybertools/radical.pilot/issues/2731
"""
from __future__ import annotations
__all__ = (
"ClientWorkerRequirements",
"RaptorConfiguration",
"raptor_input",
"worker_requirements",
"worker_description",
"SoftwareCompatibilityError",
"ScaleMSWorker",
"ScaleMSRaptor",
"ScalemsRaptorWorkItem",
"WorkerDescription",
"RaptorWorkerConfig",
)
import abc
import argparse
import asyncio
import contextlib
import dataclasses
import functools
import importlib
import importlib.metadata
import json
import os
import pathlib
import sys
import tempfile
import traceback
import typing
import warnings
import zlib
from contextlib import ContextDecorator
from importlib.machinery import ModuleSpec
from importlib.util import find_spec
from collections.abc import Generator
import packaging.version
import scalems.cpi
from scalems.identifiers import EphemeralIdentifier
from scalems.store import FileStore
if typing.TYPE_CHECKING:
try:
from mpi4py.MPI import Comm
except ImportError:
Comm = None
# We import rp before `logging` to avoid warnings when rp monkey-patches the
# logging module. This `try` suite helps prevent auto-code-formatters from
# rearranging the import order of built-in versus third-party modules.
import radical.pilot as rp
import scalems.exceptions
import scalems.messages
import scalems.file as _file
import scalems.store as _store
# QUESTION: How should we approach logging? To what degree can/should we integrate
# with rp logging?
import logging
logger = logging.getLogger(__name__)
# Note that we have not restricted propagation or attached a handler, so the messages
# to `logger` will end up going to the stderr of the process that imported this module
# (e.g. the raptor task or the Worker task) and appearing in the corresponding
# Task.stderr. This is available on the client side in the case of the raptor task,
# but only to the Raptor itself for the worker task.
# We should probably either add a specific LogHandler or integrate with the
# RP logging and Component based log files.
# See also
# * https://github.com/SCALE-MS/scale-ms/discussions/261
# * https://github.com/SCALE-MS/scale-ms/issues/255
@dataclasses.dataclass(frozen=True)
class BackendVersion:
"""Identifying information for the computing backend."""
name: str
"""Identifying name (presumably a module name)."""
version: str
"""Implementation revision identifier as a PEP 440 compatible version string."""
backend_version = BackendVersion(name="scalems.radical.raptor", version="0.0.0")
# TODO: Where does this identifier belong?
api_name = "scalems_v0"
"""Key for dispatching raptor Requests.
We can use this to identify the schema used for SCALE-MS tasks encoded in
arguments to raptor :py:data:`~radical.pilot.TASK_FUNCTION` mode executor.
"""
CPI_MESSAGE = "scalems.cpi"
"""Flag for scalems messages to be treated as CPI calls.
Used in the :py:attr:`~radical.pilot.TaskDescription.mode` field to indicate that the
object should be handled through the SCALEMS Compute Provider Interface machinery.
"""
_CpiCommandT = typing.TypeVar("_CpiCommandT", bound="CpiCommand")
class CpiCommand(abc.ABC):
_registry = dict[str, typing.Type[_CpiCommandT]]()
@classmethod
@abc.abstractmethod
def launch(cls, manager: ScaleMSRaptor, task: TaskDictionary) -> bool:
"""Process the RP Task as a CPI Command.
Called in ScaleMSRaptor.request_cb().
Returns:
True if `manager.cpi_finalize()` was called, else False.
"""
# Developer note: when overriding, be sure to implement correct return value
# logic, even though it is not initially clear why we would want to return False.
...
@classmethod
@abc.abstractmethod
def result_hook(cls, manager: ScaleMSRaptor, task: TaskDictionary):
"""Called during Master.result_cb.
Triggered by `manager.cpi_finalize()` in the request_cb thread when
launched in response to a client message. In the initial implementation,
we are not aware of a use case in which the CpiCommand will be popped
off of the results queue in the normal result_cb thread.
"""
@typing.final
@classmethod
def get(cls, command: str):
"""Get the class (type object) registered for the named *command*."""
return CpiCommand._registry[command]
@classmethod
@abc.abstractmethod
def _command_name(cls) -> scalems.cpi.CpiOperation:
raise NotImplementedError
def __init_subclass__(cls, **kwargs):
if cls is not CpiCommand:
CpiCommand._registry[cls._command_name()] = cls
super().__init_subclass__(**kwargs)
class CpiStop(CpiCommand):
"""Provide implementation for StopCommand in RP Raptor.
When a STOP is received, the Master will stop handling new compute tasks
from the client. Tasks generated by other Tasks already in the queue will
continue to be allowed until/unless a firmer "stop" command is received.
"""
@classmethod
def _command_name(cls) -> scalems.cpi.CpiOperation:
return "stop"
@classmethod
def launch(cls, manager: ScaleMSRaptor, task: TaskDictionary):
manager._stopping = True
logger.debug("CPI STOP issued.")
# TODO: Cleaner shutdown.
# * Mark the Raptor as "shutting down".
# * Wait for pending tasks to complete and shut down Worker(s)
# * Allow override for more aggressive shutdown, such as if additional STOP
# messages arrive while processing this one (already "shutting down").
task["stderr"] = ""
task["stdout"] = "CPI STOP received"
task["exit_code"] = 0
manager.cpi_finalize(task)
return True
@classmethod
def result_hook(cls, manager: ScaleMSRaptor, task: TaskDictionary):
# TODO: Refine the state machine of the CPI Session.
# We probably need to distinguish between a clean stop and a harder stop.
manager.stop()
logger.debug(f"Finalized {str(task)}.")
class CpiHello(CpiCommand):
"""Provide implementation for HelloCommand in RP Raptor."""
@classmethod
def _command_name(cls) -> scalems.cpi.CpiOperation:
return "hello"
@classmethod
def launch(cls, manager: ScaleMSRaptor, task: TaskDictionary):
logger.debug("CPI HELLO in progress.")
task["stderr"] = ""
task["exit_code"] = 0
task["stdout"] = repr(backend_version)
# TODO: scalems object encoding.
task["return_value"] = dataclasses.asdict(backend_version)
logger.debug("Finalizing...")
manager.cpi_finalize(task)
return True
@classmethod
def result_hook(cls, manager: ScaleMSRaptor, task: TaskDictionary):
logger.debug(f"Finalized {str(task)}.")
class CpiAddItem(CpiCommand):
"""Add an item to the managed workflow record.
Descriptions of Work with no dependencies will be immediately scheduled for
execution (converted to RP tasks and submitted).
TBD: Data objects, references to existing data, completed tasks.
"""
@classmethod
def _command_name(cls) -> scalems.cpi.CpiOperation:
return "add_item"
@classmethod
def launch(cls, manager: ScaleMSRaptor, task: TaskDictionary):
"""Repackage a AddItem command as a rp.TaskDescription for submission to the Worker.
Note that we are not describing the exact function call directly,
but an encoded function call to be handled by the `run_in_worker`
dispatching function. The arguments will be serialized
together with the function code object into the
*work_item* key word argument for the RP Task.
The Master.request_cb() submits individual scalems tasks with this function.
More complex work is deserialized and managed by ScaleMSRaptor using
the `raptor_work_deserializer` function, first.
See Also:
`scalems.radical.raptor.ScaleMSWorker.run_in_worker()`
"""
# TODO: If the Raptor is already "shutting down", reject new work from the client,
# but continue to allow dynamically generated Tasks from already-queued work until/unless
# a more aggressive shut down is ordered.
# submit the task and return its task ID right away,
# then allow its result to either just be used to
# * trigger dependent work (TBD),
# * only appear in the Raptor report (TBD), or
# * be available to follow-up LRO status checks (TBD).
# Note that we have not yet implemented the QUERY functionality or tooling
# for managing Long Running Operations.
add_item: scalems.cpi.CpiCall = scalems.cpi.from_raptor_task_metadata(task["description"]["metadata"])
item_dict: ScalemsRaptorWorkItem = add_item["operand"]
# Decouple the serialization schema since it is not strongly specified or robust.
work_item = ScalemsRaptorWorkItem(
func=item_dict["func"],
module=item_dict["module"],
args=item_dict["args"],
kwargs=item_dict["kwargs"],
comm_arg_name=item_dict.get("comm_arg_name", None),
)
# TODO(#277): Convert abstract inputs to concrete values. (More management interface.)
# TODO(#277): Check dependencies and runnability before submitting.
fingerprint = zlib.crc32(json.dumps(work_item, sort_keys=True).encode("utf8"))
# TODO: More robust fingerprint. Ref scalems.serialization and scalems.store
# TODO: Check for duplicates.
scalems_task_id = f"scalems-task-{fingerprint}"
scalems_task_description = rp.TaskDescription(
_RaptorTaskDescription(
uid=scalems_task_id,
# Note that (as of this writing) *executable* is required but unused for Raptor tasks.
executable="scalems",
raptor_id=manager.uid,
# Note we could use metadata to encode additional info for ScaleMSRaptor.request_cb,
# but it is not useful for execution of the rp.TASK_FUNCTION.
metadata=None,
mode=rp.TASK_FUNCTION,
function="run_in_worker",
args=[],
kwargs={"work_item": work_item},
)
)
# Note: There is no `Task` object returned from `Raptor.submit_tasks()`
# `Task` objects are currently only available on the client side;
# the agent side handles task dicts---which the raptor will receive through the *request_cb()*.
manager.submit_tasks(scalems_task_description)
logger.debug(f"Submitted {str(scalems_task_description)} in support of {str(task)}.")
# Record task metadata and track.
task["return_value"] = scalems_task_id
task["stdout"] = scalems_task_id
task["exit_code"] = 0
manager.cpi_finalize(task)
return True
@classmethod
def result_hook(cls, manager: ScaleMSRaptor, task: TaskDictionary):
logger.debug(f"Finalized {str(task)}.")
EncodableAsDict = typing.Mapping[str, "Encodable"]
EncodableAsList = typing.List["Encodable"]
Encodable = typing.Union[str, int, float, bool, None, EncodableAsDict, EncodableAsList]
# def object_decoder(obj: dict):
# """Provide the object_hook callback for a JSONDecoder."""
@functools.singledispatch
def object_encoder(obj) -> Encodable:
"""Provide the *default* callback for JSONEncoder."""
raise TypeError(f"No decoder for {obj.__class__.__qualname__}.")
# def get_decoder() -> json.JSONDecoder:
# """Get a JSONDecoder instance extended for types in this module."""
# decoder = json.JSONDecoder(object_hook=object_decoder)
# return decoder
#
#
# def get_encoder() -> json.JSONEncoder:
# """Get a JSONEncoder instance extended for types in this module."""
# encoder = json.JSONEncoder(default=object_encoder)
# return encoder
# @object_encoder.register
# def _(obj: rp.TaskDescription) -> dict:
# return obj.as_dict()
[docs]@dataclasses.dataclass(frozen=True)
class ClientWorkerRequirements:
"""Client-side details to inform worker provisioning.
This structure is part of the `scalems.radical.raptor.RaptorConfiguration`
provided to the raptor script. The raptor script uses this information
when calling `worker_description()`.
Use the :py:func:`worker_requirements()` creation function for a stable interface.
"""
cores_per_process: int
"""Number CPU cores reserved per process (or MPI rank) for threading or child processes."""
cpu_processes: int
"""Number of ranks in the Worker MPI context."""
gpus_per_rank: float = 0
"""GPUs per Worker rank."""
pre_exec: tuple[str] = ()
"""Lines of shell expressions to be evaluated before launching the worker process."""
named_env: typing.Optional[str] = None
"""A registered virtual environment known to the raptor manager.
Warnings:
Not tested. Support has been delayed indefinitely.
"""
class _RaptorConfigurationDict(typing.TypedDict):
"""A `RaptorConfiguration` encoded as a `dict`."""
versioned_modules: typing.List[typing.Tuple[str, str]]
[docs]@dataclasses.dataclass
class RaptorConfiguration:
"""Input to the script responsible for the RP raptor.
.. todo:: Check the schema for RP.
See https://github.com/radical-cybertools/radical.pilot/issues/2731
Produced on the client side with `raptor_input`. Deserialized in the
raptor task (`raptor`) to get a `RaptorWorkerConfig`.
"""
versioned_modules: typing.List[typing.Tuple[str, str]]
"""List of name, version specifier tuples for required modules."""
[docs] @classmethod
def from_dict(cls, obj: _RaptorConfigurationDict):
"""Decode from a native dict.
Support deserialization, such as from a JSON file in the raptor task.
"""
return cls(versioned_modules=list(obj["versioned_modules"]))
@object_encoder.register
def _(obj: RaptorConfiguration) -> dict:
return dataclasses.asdict(obj)
[docs]def worker_requirements(
*,
pre_exec: typing.Iterable[str],
ranks_per_worker: int,
cores_per_rank: int = 1,
gpus_per_rank: typing.SupportsFloat = 0,
) -> ClientWorkerRequirements:
"""Get the requirements for the work load, as known to the client."""
workload_metadata = ClientWorkerRequirements(
named_env=None,
pre_exec=tuple(pre_exec),
cores_per_process=cores_per_rank,
cpu_processes=ranks_per_worker,
gpus_per_rank=float(gpus_per_rank),
)
return workload_metadata
parser = argparse.ArgumentParser(description="Command line entry point for RP raptor task.")
parser.add_argument("file", type=str, help="Input file (JSON) for configuring ScaleMSRaptor instance.")
class coverage_file(ContextDecorator):
"""Decorator for a function to conditionally record to the named coverage data file."""
_Coverage = None
cov = None
_decorated_function = ""
def __init__(self, filename=".coverage", verbose=True):
self.filename = filename
self.verbose = verbose
if os.getenv("COVERAGE_RUN") is not None or os.getenv("SCALEMS_COVERAGE") is not None:
logger.info("... testing with coverage")
try:
from coverage import Coverage
self._Coverage = Coverage
except ImportError:
warnings.warn("Code coverage reporting is not available.")
else:
logger.info("No coverage testing")
def __call__(self, func):
# Apply the decorator
assert callable(func)
self._decorated_function = func.__qualname__
# Return the decorated function.
return super().__call__(func)
def __enter__(self):
if self._Coverage is not None:
current_pid = os.getpid()
coverage_pid = os.environ.get("SCALEMS_COVERAGE_ACTIVE", None)
if coverage_pid is not None:
if int(coverage_pid) != current_pid:
logger.info(f"PID {current_pid} inheriting coverage from {coverage_pid}.")
else:
logger.info(f"{self._decorated_function}: Coverage API is already active for {current_pid}.")
# warnings.warn(f'Coverage API is already active in {current_pid}. Data race may occur.')
# TODO: Check whether this is a problem since result_cb() may be reentrant.
else:
coverage_pid = current_pid
os.environ["SCALEMS_COVERAGE_ACTIVE"] = str(current_pid)
logger.info(f"Starting coverage for {coverage_pid} in {os.getcwd()}")
filename = self.filename
if not os.path.isabs(filename):
filename = os.path.join(".", "coverage_dir", filename)
self.filename = os.path.abspath(filename)
if not os.path.exists(os.path.dirname(filename)):
os.mkdir(os.path.dirname(filename))
self.cov = self._Coverage(
auto_data=True,
branch=True,
check_preimported=True,
data_file=filename,
data_suffix=self._decorated_function,
messages=self.verbose,
source=("scalems",),
)
self.cov.start()
return self
def __exit__(self, *exc):
if self.cov is not None:
current_pid = os.getpid()
coverage_pid_str = os.environ.get("SCALEMS_COVERAGE_ACTIVE", None)
if coverage_pid_str == str(current_pid):
logger.info(f"Saving coverage data for {self._decorated_function} to {self.filename}.")
self.cov.stop()
self.cov.save()
del os.environ["SCALEMS_COVERAGE_ACTIVE"]
else:
logger.info(
f"Deferring coverage for {self._decorated_function} on {current_pid} to PID {coverage_pid_str}."
)
del self.cov
return False
[docs]def raptor():
"""Entry point for raptor.Master task.
This function implements the :command:`scalems_raptor` entry point script
called by the RADICAL Pilot executor to provide the raptor raptor task.
During installation, the `scalems` build system creates a :file:`scalems_raptor`
`entry point <https://setuptools.pypa.io/en/latest/pkg_resources.html#entry-points>`__
script that provides command line access to this function. The Python signature
takes no arguments because the function processes command line arguments from `sys.argv`
.. uml::
title scalems raptor raptor task lifetime
!pragma teoz true
queue "RP runtime" as scheduler
box execution
box "scalems.radical" #honeydew
participant "raptor task"
participant ScaleMSRaptor as raptor
end box
participant worker.py
end box
scheduler -> "raptor task" : stage in
?-> "raptor task" : scalems.radical.raptor.~__main__()
activate "raptor task"
note over "raptor task" : "TODO: Get asyncio event loop?"
note over "raptor task" : "TODO: Initialize scalems FileStore"
"raptor task" -> "raptor task" : from_dict
activate "raptor task"
return RaptorConfiguration
"raptor task" -> raptor **: create ScaleMSRaptor()
"raptor task" -> raptor: start()
...
group CPI [launch workers]
scheduler -\\ raptor : request_cb
activate raptor
raptor -> raptor : with configure_worker()
activate raptor
raptor -> worker.py ** : with _worker_file()
activate raptor
raptor -> raptor : _configure_worker()
activate raptor
raptor -> raptor : worker_description()
activate raptor
return
return WorkerDescription
deactivate raptor
return RaptorWorkerConfig
'raptor --> raptor : RaptorWorkerConfig
'deactivate raptor
raptor -> raptor: submit_workers(**config)
activate raptor
raptor -\\ worker.py
activate worker.py
raptor --> raptor: UIDs
deactivate raptor
raptor -> raptor: wait_workers()
activate raptor
return
return
deactivate raptor
scheduler -\\ raptor : result_cb
end
...
group TODO [stop workers]
"raptor task" -> raptor : ~__exit__
activate raptor
raptor -> raptor : ~__exit__
activate raptor
raptor --> raptor : stop worker (TODO)
raptor -> worker.py !! : delete
deactivate raptor
raptor -> raptor: stop() (TBD)
end
"raptor task" -> raptor: join()
deactivate raptor
deactivate "raptor task"
scheduler <-- "raptor task" : stage out
deactivate "raptor task"
"""
# TODO: Configurable log level?
logger.setLevel(logging.DEBUG)
character_stream = logging.StreamHandler()
character_stream.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
character_stream.setFormatter(formatter)
logger.addHandler(character_stream)
file_logger = logging.FileHandler("scalems.radical.raptor.log")
file_logger.setLevel(logging.DEBUG)
file_logger.setFormatter(formatter)
logging.getLogger("scalems").addHandler(file_logger)
if not os.environ["RP_TASK_ID"]:
warnings.warn("Attempting to start a Raptor without RP execution environment.")
rp_version = packaging.version.parse(rp.version)
args = parser.parse_args()
if not os.path.exists(args.file):
raise RuntimeError(f"File not found: {args.file}")
with open(args.file, "r") as fh:
configuration = RaptorConfiguration.from_dict(json.load(fh))
logger.info(f"Launching ScaleMSRaptor task in raptor {rp_version} with {repr(dataclasses.asdict(configuration))}.")
_raptor = ScaleMSRaptor(configuration)
logger.debug(f"Created {repr(_raptor)}.")
try:
_raptor.start()
logger.debug(f"Raptor started in PID {os.getpid()}: {' '.join(sys.argv)}.")
# As of RP 1.33, `join` waits on a thread that monitors Worker responsiveness until
# a termination Event is detected.
_raptor.join()
logger.debug("Raptor joined.")
finally:
if sys.exc_info() != (None, None, None):
logger.exception("Raptor task encountered exception.")
logger.debug("Completed raptor task.")
def _configure_worker(*, requirements: ClientWorkerRequirements, filename: str) -> RaptorWorkerConfig:
"""Prepare the arguments for :py:func:`rp.raptor.Master.submit_workers()`.
Provide an abstraction for the submit_workers signature and parameter types,
which may still be evolving.
See also https://github.com/radical-cybertools/radical.pilot/issues/2731
"""
assert os.path.exists(filename)
# TODO(#248): Consider a "debug-mode" option to do a trial import from *filename*
# TODO(#302): Number of workers should be chosen after inspecting the requirements of the work load.
num_workers = 1
descr = worker_description(
named_env=requirements.named_env,
worker_file=filename,
pre_exec=requirements.pre_exec,
cpu_processes=requirements.cpu_processes,
gpus_per_rank=requirements.gpus_per_rank,
)
if os.getenv("COVERAGE_RUN") is not None or os.getenv("SCALEMS_COVERAGE") is not None:
descr.environment = {"SCALEMS_COVERAGE": "TRUE"}
config: RaptorWorkerConfig = [descr] * num_workers
return config
class SoftwareCompatibilityError(RuntimeError):
"""Incompatible package versions or software interfaces."""
def _get_module_version(module: str):
"""Get version metadata for importable module."""
try:
found_version = importlib.metadata.version(module)
except importlib.metadata.PackageNotFoundError:
found_version = None
if found_version is None:
try:
spec: ModuleSpec = find_spec(module)
if spec and hasattr(spec, "parent"):
found_version = importlib.metadata.version(spec.parent)
except Exception as e:
logger.debug(f"Exception when trying to find {module}: ", exc_info=e)
if found_version is not None:
found_version = packaging.version.Version(found_version)
return found_version
[docs]class ScaleMSRaptor(rp.raptor.Master):
"""Manage a RP Raptor *raptor_id* target.
Extends :py:class:`rp.raptor.Master`.
We do not submit scalems tasks directly as RP Tasks from the client.
We encode scalems tasks as instructions to the Raptor task, which will be
decoded and translated to RP Tasks by the `ScaleMSRaptor.request_cb` and
self-submitted to the `ScaleMSWorker`.
Results of such tasks are only available through to the :py:func:`result_cb`.
The Raptor can translate results of generated Tasks into results for the Task
carrying the coded instruction, or it can produce data files to stage during or
after the Raptor task. Additionally, the Raptor could respond to other special
instructions (encoded in later client-originated Tasks) to query or retrieve
generated task results.
.. uml::
title Raptor
queue "Queue" as Queue
box "RP Agent"
participant Scheduler
end box
queue "Raptor input queue" as master_queue
box "scalems.radical.raptor"
participant ScaleMSRaptor
participant rpt.Master
end box
queue "ZMQ Raptor work channel" as channel
activate Queue
Scheduler -> Queue: accepts work
activate Scheduler
Scheduler <-- Queue: task dictionary
deactivate Queue
note over Scheduler
Scheduler gets task from queue,
observes `scheduler` field and routes to Raptor.
end note
Scheduler -> master_queue: task dictionary
activate master_queue
deactivate Scheduler
note over ScaleMSRaptor, rpt.Master
Back end RP queue manager
passes messages to Raptor callbacks.
end note
rpt.Master -> master_queue: accept Task
activate rpt.Master
rpt.Master <-- master_queue: cpi_task
deactivate master_queue
rpt.Master -> rpt.Master: _request_cb(cpi_task)
activate rpt.Master
rpt.Master -> ScaleMSRaptor: request_cb(cpi_task)
activate ScaleMSRaptor
ScaleMSRaptor -> ScaleMSRaptor: CpiCommand.launch()
activate ScaleMSRaptor
alt optionally process or update requests
ScaleMSRaptor -> ScaleMSRaptor: submit_tasks(scalems_tasks)
activate ScaleMSRaptor
deactivate ScaleMSRaptor
ScaleMSRaptor -> ScaleMSRaptor: _result_cb(cpi_task)
activate ScaleMSRaptor
deactivate ScaleMSRaptor
else resolve a Control message
ScaleMSRaptor -> ScaleMSRaptor: _result_cb(cpi_task)
activate ScaleMSRaptor
deactivate ScaleMSRaptor
end
return
return filtered tasks
rpt.Master -> rpt.Master: submit_tasks(...)
activate rpt.Master
deactivate rpt.Master
rpt.Master -> channel: send message
deactivate rpt.Master
deactivate rpt.Master
rpt.Master -> channel: accept result
activate rpt.Master
rpt.Master <-- channel: scalems_tasks
deactivate channel
rpt.Master -> rpt.Master: _result_cb(scalems_tasks))
activate rpt.Master
rpt.Master -> ScaleMSRaptor: result_cb(scalems_tasks)
activate ScaleMSRaptor
ScaleMSRaptor -> ScaleMSRaptor: CpiCommand.result_hook()
alt optionally process or update pending requests
ScaleMSRaptor -> ScaleMSRaptor: issue#277
activate ScaleMSRaptor
deactivate ScaleMSRaptor
end
rpt.Master <-- ScaleMSRaptor
deactivate ScaleMSRaptor
rpt.Master -> master_queue: send message
activate master_queue
deactivate rpt.Master
deactivate rpt.Master
"""
[docs] def __init__(self, configuration: RaptorConfiguration):
"""Initialize a SCALE-MS Raptor.
Verify the environment. Perform some scalems-specific set up and
initialize the base class details.
"""
# Verify environment.
for module, version in configuration.versioned_modules:
logger.debug(f"Looking for {module} version {version}.")
found_version = _get_module_version(module)
if found_version is None:
raise SoftwareCompatibilityError(f"{module} not found.")
minimum_version = packaging.version.Version(version)
if found_version >= minimum_version:
logger.debug(f"Found {module} version {found_version}: Okay.")
else:
raise SoftwareCompatibilityError(f"{module} version {found_version} not compatible with {version}.")
# The rp.raptor base class will fail to initialize without the environment
# expected for a RP task launched by an RP agent. However, we may still
# want to acquire a partially-initialized instance for testing.
try:
super(ScaleMSRaptor, self).__init__()
except KeyError:
warnings.warn("Raptor incomplete. Could not initialize raptor.Master base class.")
# Initialize internal state.
# TODO: Use a scalems RuntimeManager.
self.__worker_files = {}
self._stopping = False
@contextlib.contextmanager
def _worker_file(self):
"""Scoped temporary module file for raptor worker.
Not thread-safe. (Should it be?)
Write and return the path to a temporary Python module. The module imports
:py:class:`ScaleMSWorker` into its module namespace
so that the file and class can be used in the worker description for
:py:func:`rp.raptor.Master.submit_workers()`
"""
worker = ScaleMSWorker
# Note: there does not appear to be any utility in any lines to evaluate
# beyond the Worker subclass import. This namespace will not be available
# (through `globals()` or `locals()`, at least) to the rp.TASK_FUNCTION
# implementation in rp.raptor.worker_mpi._Worker._dispatch_function.
text = [
f"from {worker.__module__} import {worker.__name__}\n",
# f'from {run_in_worker.__module__} import {run_in_worker.__name__}\n'
]
# TODO: Use the Raptor's FileStore to get an appropriate fast shared filesystem.
with tempfile.TemporaryDirectory() as tmp_dir:
filename = self.__worker_files.get(worker, None)
if filename is None:
filename = next_numbered_file(dir=tmp_dir, name="scalems_worker", suffix=".py")
with open(filename, "w") as fh:
fh.writelines(text)
self.__worker_files[worker] = filename
yield filename
del self.__worker_files[worker]
[docs] def request_cb(self, tasks: typing.Sequence[TaskDictionary]) -> typing.Sequence[TaskDictionary]:
"""Allows all incoming requests to be processed by the Raptor.
RADICAL guarantees that :py:func:`~radical.pilot.raptor.Master.request_cb()`
calls are made sequentially in a single thread.
The implementation does not need to be thread-safe or reentrant.
(Ref: https://github.com/radical-cybertools/radical.utils/blob/master/src/radical/utils/zmq/queue.py#L386)
RADICAL does not guarantee that Tasks are processed in the same order in
which they are submitted by the client.
If overridden, request_cb() must return a :py:class:`list`. The returned list
is interpreted to be requests that should be processed normally after the callback.
This allows subclasses of rp.raptor.Master to add or remove requests before they become Tasks.
The returned list is submitted with self.submit_tasks() by the base class after the
callback returns.
A Raptor may call *self.submit_tasks()* to self-submit items (e.g. instead of or in
addition to manipulating the returned list in *request_cb()*).
It is the developer's responsibility to choose unique task IDs (uid) when crafting
items for Raptor.submit_tasks().
"""
try:
# It is convenient to compartmentalize the filtering and dispatching
# of scalems tasks in a generator function, but this callback signature
# requires a `list` to be returned.
remaining_tasks = self._scalems_handle_requests(tasks)
# Let non-scalems tasks percolate to the default machinery.
return list(remaining_tasks)
except Exception as e:
logger.exception("scalems request filter propagated an exception.")
# TODO: Submit a clean-up task.
# Use a thread or asyncio event loop controlled by the main raptor task thread
# to issue a cancel to all outstanding tasks and stop the workers.
# Question: Is there a better / more appropriate way to exit cleanly on errors?
self.stop()
# Note that we are probably being called in a non-root thread.
# TODO: Avoid letting an exception escape unhandled.
# WARNING: RP 1.18 suppresses exceptions from request_cb(), but there is a note
# indicating that exceptions will cause task failure in a future version.
# But the failure modes of the scalems raptor task are not yet well-defined.
# See also
# * https://github.com/SCALE-MS/scale-ms/issues/218 and
# * https://github.com/SCALE-MS/scale-ms/issues/229
raise e
[docs] def cpi_finalize(self, task: TaskDictionary):
"""Short-circuit the normal Raptor protocol to finalize a task.
This is an alias for Raptor._result_cb(), but is a public method used
in the :py:class:`CpiCommand.launch` method for
:py:class:`~scalems.messages.Control` commands, which do not
call :py:func:`ScaleMSRaptor.submit_tasks`.
"""
self._result_cb(task)
def _scalems_handle_requests(self, tasks: typing.Sequence[TaskDictionary]):
for task in tasks:
mode = task["description"]["mode"]
# Allow non-scalems work to be handled normally.
if mode != CPI_MESSAGE:
logger.debug(f"Deferring {str(task)} to regular RP handling.")
yield task
continue
_finalized = False
try:
cpi_call: scalems.cpi.CpiCall = scalems.cpi.from_raptor_task_metadata(task["description"]["metadata"])
if not self._stopping:
logger.debug(f"Received message {cpi_call} in {str(task)}")
impl: typing.Type[CpiCommand] = CpiCommand.get(cpi_call["operation"])
# # TODO(#277)
# # Check work dependencies. Generate necessary data staging.
# self._dependents[task['uid']] = ...
# # Generate sub-tasks and manage dependencies. Prune tasks that
# # are already completed. Produce appropriate error if work cannot
# # be performed. Manage sufficient state that result_cb is able to
# # submit tasks as their dependencies are met.
#
# self._workload.add(task)
# ...
_finalized = impl.launch(self, task)
else:
# Note that, depending on the progress of `self.stop()`, the Task
# may never progress, from the client perspective.
logger.error(f"Ignoring command {cpi_call} since STOP has already been issued.")
# TODO: Encapsulate a response message type and error variant.
task["return_value"] = False
self._result_cb(task)
_finalized = True
except Exception as e:
# Exceptions here are presumably bugs in scalems or RP.
# Almost certainly, we should shut down after cleaning up.
# 1. TODO: Record error in log for ScaleMSRaptor.
# 2. Make sure that task is resolved.
# 3. Trigger clean shut down of raptor task.
if not _finalized:
if not task["exception"]:
task["exception"] = repr(e)
task["exception_detail"] = traceback.format_exc()
# TODO: Mark failed with appropriate CPI response message instance
self._result_cb(task)
raise e
[docs] def result_cb(self, tasks: typing.Sequence[TaskDictionary]):
"""SCALE-MS specific handling of completed tasks.
We perform special handling for two types of tasks.
1. Tasks submitted throughcompleted by the collaborating Worker(s).
2. Tasks intercepted and resolved entirely by the Raptor (CPI calls).
"""
# Note: At least as of RP 1.18, exceptions from result_cb() are suppressed.
for task in tasks:
logger.debug(f'Result callback for {task["description"]["mode"]} task {task["uid"]}: {task}')
mode = task["description"]["mode"]
# Allow non-scalems work to be handled normally.
if mode == CPI_MESSAGE:
cpi_call: scalems.cpi.CpiCall = scalems.cpi.from_raptor_task_metadata(task["description"]["metadata"])
# command = scalems.messages.Command.decode(task["description"]["metadata"])
logger.debug(f"Finalizing {cpi_call} in {str(task)}")
impl: typing.Type[CpiCommand] = CpiCommand.get(cpi_call["operation"])
impl.result_hook(self, task)
# # Release dependent tasks.
# # https://github.com/SCALE-MS/randowtal/blob/c58452a3eaf0c058337a85f4fca3f51c5983a539/test_am
# # /brer_master.py#L114
# # TODO: use scalems work graph management.
# if task['uid'] in self._dependents:
# # Warning: The dependent task may have multiple dependencies.
# dep = self._dependents[task['uid']]
# self._log.debug('=== submit dep %s', dep['uid'])
# self.submit_tasks(dep)
#
# mode = task['description']['mode']
#
# # NOTE: `state` will be `AGENT_EXECUTING`
# self._log.debug('=== out: %s', task['stdout'])
# self._log.debug('result_cb %s: %s [%s] [%s]',
# task['uid'],
# task['state'],
# sorted(task['stdout']),
# task['return_value'])
# #TODO(#108)
# # Complete and publish
# # https://github.com/SCALE-MS/randowtal/blob/c58452a3eaf0c058337a85f4fca3f51c5983a539/test_am
# # /brer_master.py#L114
# # Check whether all of the submitted tasks have been completed.
# if self._submitted == self._completed:
# # we are done, not more workloads to wait for - return the tasks
# # and terminate
# # TODO: We can advance the rp task state with a callback registered with the corresponding
# # unwrapped scalems task.
# for req in self._workload:
# # TODO: create raptor method
# req['task']['target_state'] = rp.DONE
# self.advance(req['task'], rp.AGENT_STAGING_OUTPUT_PENDING,
# publish=True, push=True)
[docs]class WorkerDescription(rp.TaskDescription):
"""Worker description.
See Also:
* :py:meth:`~radical.pilot.raptor.Master.submit_workers()`
* https://github.com/radical-cybertools/radical.pilot/issues/2731
"""
cores_per_rank: typing.Optional[int]
environment: typing.Optional[dict[str, str]]
gpus_per_rank: typing.Optional[float]
named_env: typing.Optional[str]
pre_exec: typing.Optional[list[str]]
ranks: int
raptor_class: str
raptor_file: str
mode: str # rp.RAPTOR_WORKER
RaptorWorkerConfig = list[WorkerDescription]
"""Client-specified Worker requirements.
Used by raptor script when calling `worker_description()`.
Created internally with :py:func:`_configure_worker()`.
See Also:
:py:mod:`scalems.radical.raptor.ClientWorkerRequirements`
"""
[docs]def worker_description(
*,
named_env: typing.Optional[str],
worker_file: str,
cores_per_process: int = None,
cpu_processes: int = None,
gpus_per_rank: float = None,
pre_exec: typing.Iterable[str] = (),
environment: typing.Optional[typing.Mapping[str, str]] = None,
) -> WorkerDescription:
"""Get a worker description for ScaleMSRaptor.submit_workers().
Parameters:
cores_per_process (int, optional): See `radical.pilot.TaskDescription.cores_per_rank`
cpu_processes (int, optional): See `radical.pilot.TaskDescription.ranks`
environment (dict, optional): Environment variables to set in the Worker task.
gpus_per_rank (float, optional): See `radical.pilot.TaskDescription.gpus_per_rank`
named_env (str): Python virtual environment registered with `radical.pilot.Pilot.prepare_env`
(currently ignored. see #90).
pre_exec (list[str]): Shell command lines for preparing the worker environment.
worker_file (str): Standalone module from which to import ScaleMSWorker.
*worker_file* is generated for the raptor script by the caller.
See :py:mod:`scalems.radical.raptor`
The *uid* for the Worker task is defined by the ScaleMSRaptor.submit_workers().
"""
descr = WorkerDescription()
if cores_per_process is not None:
descr.cores_per_rank = cores_per_process
if environment is not None:
descr.environment = environment
if gpus_per_rank is not None:
descr.gpus_per_rank = gpus_per_rank
if named_env is not None:
# descr.named_env=None
logger.warning(f"Ignoring named_env={named_env}. Parameter is currently unused.")
descr.pre_exec = list(pre_exec)
if cpu_processes is not None:
descr.ranks = cpu_processes
descr.raptor_class = "ScaleMSWorker"
descr.raptor_file = worker_file
descr.mode = rp.RAPTOR_WORKER
return descr
class RaptorTaskExecutor(typing.Protocol):
"""Represent the signature of executor functions for rp.raptor.MPIWorker."""
def __call__(self, *args, comm: Comm, **kwargs) -> Encodable:
...
[docs]class ScaleMSWorker(rp.raptor.MPIWorker):
"""Specialize the Raptor MPI Worker for scalems dispatching of serialised work.
scalems tasks encode the importable function and inputs in the arguments to
the `run_in_worker` dispatching function,
which is available to the `ScaleMSWorker` instance.
.. uml::
title ScaleMSWorker task dispatching
queue "ZMQ Raptor work channel" as channel
box "scalems.radical.raptor.Worker"
participant ScaleMSWorker
participant rpt.Worker
end box
box "usermodule"
participant usermodule
end box
box "target venv"
end box
rpt.Worker -> channel: pop message
activate rpt.Worker
rpt.Worker <-- channel: rp.TASK_FUNCTION mode Task
rpt.Worker -> rpt.Worker: _dispatch_function()
activate rpt.Worker
alt scalems encoded workload
rpt.Worker -> ScaleMSWorker: run_in_worker()
activate ScaleMSWorker
ScaleMSWorker -> ScaleMSWorker: unpack encoded function call
ScaleMSWorker -> ScaleMSWorker: from usermodule import func
ScaleMSWorker -> usermodule: ""func(*args, **kwargs)""
activate usermodule
ScaleMSWorker <-- usermodule
deactivate usermodule
return
else pickled rp.PythonTask workload (not implemented)
rpt.Worker -> rpt.Worker: unpickled scalems handler
activate rpt.Worker
rpt.Worker -> usermodule: ""func(*args, **kwargs)""
activate usermodule
rpt.Worker <-- usermodule
deactivate usermodule
rpt.Worker --> rpt.Worker: {out, err, ret, value}
deactivate rpt.Worker
end
deactivate rpt.Worker
rpt.Worker -> channel: put result
deactivate rpt.Worker
"""
# TODO: How to get the module logger output for the Worker task.
[docs] @coverage_file()
def run_in_worker(self, *, work_item: ScalemsRaptorWorkItem, comm=None):
"""Unpack and run a task requested through RP Raptor.
Satisfies RaptorTaskExecutor protocol. Named as the *function* for
rp.TASK_FUNCTION mode tasks generated by scalems.
This function MUST be imported and referentiable by its *__qualname__* from
the scope in which ScaleMSWorker methods execute. (See ScaleMSRaptor._worker_file)
Parameters:
comm (mpi4py.MPI.Comm, optional): MPI communicator to be used for the task.
work_item: dictionary of encoded function call from *kwargs* in the TaskDescription.
See Also:
:py:func:`CpiAddItem.launch()`
"""
module = importlib.import_module(work_item["module"])
func = getattr(module, work_item["func"])
args = list(work_item["args"])
kwargs = work_item["kwargs"].copy()
comm_arg_name = work_item.get("comm_arg_name", None)
if comm_arg_name is not None:
if comm_arg_name:
kwargs[comm_arg_name] = comm
else:
args.append(comm)
self._log.debug(
"Calling {func} with args {args} and kwargs {kwargs}",
{"func": func.__qualname__, "args": repr(args), "kwargs": repr(kwargs)},
)
return func(*args, **kwargs)
def raptor_work_deserializer(*args, **kwargs):
"""Unpack a workflow document from a Task.
When a `ScaleMSRaptor.request_cb()` receives a raptor task, it uses this
function to unpack the instructions and construct a work load.
"""
raise scalems.exceptions.MissingImplementationError
[docs]class ScalemsRaptorWorkItem(typing.TypedDict):
"""Encode the function call to implement a task in the workflow.
Parameter type for the `run_in_worker` function *work_item* argument.
This structure must be trivially serializable (by `msgpack`) so that it can
be passed in a TaskDescription.kwargs field. Consistency with higher level
`scalems.serialization` is not essential, but can be pursued in the future.
The essential role is to represent an importable callable and its arguments.
At run time, the dispatching function (`run_in_worker`) conceivably has access
to module scoped state, but does not have direct access to the Worker (or
any resources other than the `mpi4py.MPI.Comm` object). Therefore, abstract
references to input data should be resolved at the Raptor in terms of the
expected Worker environment before preparing and submitting the Task.
TODO: Evolve this to something sufficiently general to be a scalems.WorkItem.
TODO: Clarify the translation from an abstract representation of work in terms
of the workflow record to the concrete representation with literal values and
local filesystem paths.
TODO: Record extra details like implicit filesystem interactions,
environment variables, etc. TBD whether they belong in a separate object.
"""
func: str
"""A callable to be retrieved as an attribute in *module*."""
module: str
"""The qualified name of a module importable by the Worker."""
args: list
"""Positional arguments for *func*."""
kwargs: dict
"""Key word arguments for *func*."""
comm_arg_name: typing.Optional[str]
"""Identify how to provide an MPI communicator to *func*, if at all.
If *comm_arg_name* is not None, the callable will be provided with the
MPI communicator. If *comm_arg_name* is an empty string, the communicator
is provided as the first positional argument. Otherwise, *comm_arg_name*
must be a valid key word argument by which to pass the communicator to *func*.
"""
[docs]class _RaptorTaskDescription(typing.TypedDict):
"""Describe a Task to be executed through a Raptor Worker.
A specialization of `radical.pilot.TaskDescription`.
Note the distinctions of a TaskDescription to be processed by a raptor.Master.
The meaning or utility of some fields is dependent on the values of other fields.
TODO: We need some additional fields, like *environment* and fields related
to launch method and resources reservation. Is the schema for rp.TaskDescription
sufficiently strong and well-documented that we can remove this hinting type?
(Check again at RP >= 1.19)
"""
uid: str
"""Unique identifier for the Task across the Session."""
executable: str
"""Unused by Raptor tasks."""
raptor_id: str
"""The UID of the raptor.Master scheduler task.
This field is relevant to tasks routed from client TaskManagers. It is not
used for tasks originating in raptor tasks.
.. ref https://github.com/SCALE-MS/scale-ms/discussions/258#discussioncomment-4087870
"""
metadata: Encodable
"""An arbitrary user-provided payload.
May be any type that is encodable by :py:mod:`msgpack` (i.e. built-in types).
"""
mode: str
"""The executor mode for the Worker to use.
For ``rp.TASK_FUNCTION`` mode, either *function* or *method* must name a task executor.
Depending on the Worker (sub)class, resources such as an `mpi4py.MPI.Comm`
will be provided as the first positional argument to the executor.
``*args`` and ``**kwargs`` will be provided to the executor from the corresponding
fields.
See Also:
* :py:data:`CPI_MESSAGE`
* :py:class:`RaptorTaskExecutor`
"""
function: str
"""Executor for ``rp.TASK_FUNCTION`` mode.
Names the callable for dispatching.
The callable can either be a function object present in the namespace of the interpreter launched
by the Worker for the task, or a :py:class:`radical.pilot.pytask.PythonTask` pickled function object.
"""
args: list
"""For ``rp.TASK_FUNCTION`` mode, list of positional arguments provided to the executor function."""
kwargs: dict
"""For ``rp.TASK_FUNCTION`` mode, a dictionary of key word arguments provided to the executor function."""
[docs]class TaskDictionary(typing.TypedDict):
"""Task representations seen by *request_cb* and *result_cb*.
Other fields may be present, but the objects in the sequences provided to
:py:meth:`scalems.radical.raptor.ScaleMSRaptor.request_cb()` and
:py:meth:`scalems.radical.raptor.ScaleMSRaptor.result_cb()` have the following fields.
Result fields will not be populated until the Task runs.
For the expected fields, see the source code for
:py:meth:`~radical.pilot.Task.as_dict()`:
https://radicalpilot.readthedocs.io/en/stable/_modules/radical/pilot/task.html#Task.as_dict
"""
uid: str
"""Canonical identifier for the Task.
Note that *uid* may be omitted from the original TaskDescription.
"""
description: _RaptorTaskDescription
"""Encoding of the original task description."""
stdout: str
"""Task standard output."""
stderr: str
"""Task standard error."""
exit_code: int
"""Task return code."""
return_value: typing.Any
"""Function return value.
Refer to the :py:class:`RaptorTaskExecutor` Protocol.
"""
exception: str
"""Exception type name and message."""
exception_detail: str
"""Full exception details with stack trace."""
state: str
"""RADICAL Pilot Task state."""
target_state: str
"""State to which the Task should be advanced.
Valid values are string constants from :py:mod:`radical.pilot.states`.
Used internally, such as in Master._result_cb().
"""
def next_numbered_file(*, dir, name, suffix):
"""Get the next available filename with the form {dir}/{name}{i}{suffix}."""
if not os.path.exists(dir):
raise ValueError(f'Directory "{dir}" does not exist.')
filelist = os.listdir(dir)
template = str(name) + "{i}" + str(suffix)
i = 1
filename = template.format(i=i)
while filename in filelist:
i += 1
filename = template.format(i=i)
return os.path.join(dir, filename)
async def coro_get_scheduler(
pre_exec: typing.Iterable[str],
pilot: rp.Pilot,
filestore: FileStore,
config_future: asyncio.Future[_file.AbstractFileReference],
) -> rp.raptor_tasks.Raptor:
"""Establish the radical.pilot.raptor.Master task.
Create a raptor rp.Task (running the scalems_rp_master script) with the
provided *name* to be referenced as the *scheduler* for raptor tasks.
Returns the rp.Task for the raptor script once the Master is ready to
receive submissions.
Raises:
DispatchError if the raptor task could not be launched successfully.
Warnings:
Scalability optimizations in RP convey a race condition in which
the Pilot.submit_tasks() may return without error, but the Pilot may terminate
before the submission is successful, and the RP components may not be alive
to mark the Task state as FAILED or CANCELED. Caller should monitor the Pilot
and Session in addition to the state of the returned task.
Currently there is no completion condition for the raptor script unless
a ``stop`` is issued.
Caller is responsible for ending or canceling the Task returned by this function.
"""
# define a raptor.scalems raptor and launch it within the pilot
td = rp.TaskDescription()
# The raptor uid is used as the `scheduler` value for raptor task routing.
# TODO(#108): Use caller-provided *name* for master_identity.
# Master tasks may not appear unique, but must be uniquely identified within the
# scope of a rp.Session for RP bookkeeping. Since there is no other interesting
# information at this time, we can generate a random ID and track it in our metadata.
master_identity = EphemeralIdentifier()
td.uid = "scalems-rp-raptor." + str(master_identity)
td.mode = rp.RAPTOR_MASTER
# scalems_rp_master will write output before it begins handling requests. The
# script may crash even before it can write anything, but if it does write
# anything, we _will_ have the output file locally.
# TODO: Why don't we have the output files? Can we ensure output files for CANCEL?
td.output_staging = [] # TODO(#229) Write and stage output from raptor task.
td.stage_on_error = True
td.pre_exec = list(pre_exec)
# We are not using prepare_env at this point. We use the `venv` configured by the
# caller.
# td.named_env = 'scalems_env'
td.executable = "python3"
td.arguments = []
if os.getenv("PYTHONDEVMODE", None) == "1" or "dev" in getattr(sys, "_xoptions", {}):
td.arguments.extend(("-X", "dev"))
td.environment["RADICAL_LOG_LVL"] = "DEBUG"
if os.getenv("COVERAGE_RUN") is not None or os.getenv("SCALEMS_COVERAGE") is not None:
# TODO: Use the FileStore!
local_coverage_dir = pathlib.Path("scalems-remote-coverage-dir").resolve()
td.arguments.extend(
(
"-m",
"coverage",
"run",
"--parallel-mode",
"--source=scalems",
"--branch",
"--data-file=coverage_dir/.coverage",
)
)
td.environment["SCALEMS_COVERAGE"] = "TRUE"
td.environment["RADICAL_LOG_LVL"] = "DEBUG"
td.pre_exec.append("mkdir -p coverage_dir")
td.output_staging.extend(
(
{
"source": "task:///coverage_dir",
"target": local_coverage_dir.as_uri(),
"action": rp.TRANSFER,
},
)
)
logger.info(f"Coverage data will be staged to {local_coverage_dir}.")
else:
logger.debug("No coverage testing")
td.arguments.extend(("-m", "scalems.radical.raptor"))
config_file = await config_future
# TODO(#75): Automate handling of file staging directives for scalems.file.AbstractFileReference
# e.g. _add_file_dependency(td, config_file)
config_file_name = str(td.uid) + "-config.json"
td.input_staging = [
{
"source": config_file.as_uri(),
"target": f"task:///{config_file_name}",
"action": rp.TRANSFER,
}
]
td.arguments.append(config_file_name)
task_metadata = {"uid": td.uid, "pilot": pilot.uid}
logger.debug(f"Using {filestore}.")
await asyncio.create_task(asyncio.to_thread(filestore.add_task, master_identity, **task_metadata), name="add-task")
logger.debug(f"Launching RP raptor scheduling. Submitting {td}.")
_task = asyncio.create_task(asyncio.to_thread(pilot.submit_tasks, [td]), name="submit-Master")
await _task
raptor: rp.raptor_tasks.Raptor = _task.result()[0]
return raptor
def launch_scheduler(
*,
pre_exec: typing.Iterable[str],
pilot: rp.Pilot,
filestore: FileStore,
config_file: scalems.file.AbstractFileReference,
) -> rp.raptor_tasks.Raptor:
"""Establish the radical.pilot.raptor.Master task.
Create a raptor rp.Task (running the scalems_rp_master script) with the
provided *name* to be referenced as the *scheduler* for raptor tasks.
Returns the rp.Task for the raptor script once the Master is ready to
receive submissions.
Raises:
DispatchError if the raptor task could not be launched successfully.
Note:
Currently there is no completion condition for the raptor script.
Caller is responsible for canceling the Task returned by this function.
"""
# define a raptor.scalems raptor and launch it within the pilot
td = rp.TaskDescription()
# The raptor uid is used as the `scheduler` value for raptor task routing.
# TODO(#108): Use caller-provided *name* for master_identity.
# Master tasks may not appear unique, but must be uniquely identified within the
# scope of a rp.Session for RP bookkeeping. Since there is no other interesting
# information at this time, we can generate a random ID and track it in our metadata.
master_identity = EphemeralIdentifier()
td.uid = "scalems-rp-raptor." + str(master_identity)
td.mode = rp.RAPTOR_MASTER
# scalems_rp_master will write output before it begins handling requests. The
# script may crash even before it can write anything, but if it does write
# anything, we _will_ have the output file locally.
# Note that rp.Task.cancel() sends SIGTERM, which may prevent writing output files.
# However, as of RP 1.33, the "stop" rpc for Raptor should allow the main script
# thread to complete normally, as long as the self.join() is reached.
td.output_staging = [] # TODO(#229) Write and stage output from raptor task.
td.stage_on_error = True
td.pre_exec = list(pre_exec)
# We are not using prepare_env at this point. We use the `venv` configured by the
# caller.
# td.named_env = 'scalems_env'
td.executable = "python3"
td.arguments = []
if os.getenv("PYTHONDEVMODE", None) == "1" or "dev" in getattr(sys, "_xoptions", {}):
td.arguments.extend(("-X", "dev"))
td.environment["RADICAL_LOG_LVL"] = "DEBUG"
if os.getenv("COVERAGE_RUN") is not None or os.getenv("SCALEMS_COVERAGE") is not None:
# TODO: Use the FileStore!
local_coverage_dir = pathlib.Path("scalems-remote-coverage-dir").resolve()
td.arguments.extend(
(
"-m",
"coverage",
"run",
"--parallel-mode",
"--source=scalems",
"--branch",
"--data-file=coverage_dir/.coverage",
)
)
td.environment["SCALEMS_COVERAGE"] = "TRUE"
td.environment["RADICAL_LOG_LVL"] = "DEBUG"
td.pre_exec.append("mkdir -p coverage_dir")
td.output_staging.extend(
(
{
"source": "task:///coverage_dir",
"target": local_coverage_dir.as_uri(),
"action": rp.TRANSFER,
},
)
)
logger.info(f"Coverage data will be staged to {local_coverage_dir}.")
else:
logger.debug("No coverage testing")
td.arguments.extend(("-m", "scalems.radical.raptor"))
logger.debug(f"Using {filestore}.")
# TODO(#75): Automate handling of file staging directives for scalems.file.AbstractFileReference
# e.g. _add_file_dependency(td, config_file)
config_file_name = str(td.uid) + "-config.json"
td.input_staging = [
{
"source": config_file.as_uri(),
"target": f"task:///{config_file_name}",
"action": rp.TRANSFER,
}
]
td.arguments.append(config_file_name)
task_metadata = {"uid": td.uid, "Pilot": pilot.uid}
filestore.add_task(master_identity, **task_metadata)
logger.debug(f"Launching RP raptor scheduling. Submitting {td}.")
return pilot.submit_tasks(descriptions=[td])[0]