"""Support for scalems on radical.pilot.raptor.
Define the connective tissue for SCALE-MS tasks embedded in rp.Task arguments.
Provide the RP Raptor Master and Worker details. Implement the master task
script as a package entry point that we expect to be executable from the
command line in a virtual environment where the scalems package is installed,
without further modification to the PATH.
The client should be reasonably certain that the target environment has a
compatible installation of RP and scalems. A rp.raptor.Master task script is
installed with the scalems package. The script name is provide by the
module function :py:func:`~scalems.radical.raptor.master_script()`, and will
be resolvable on the PATH for a Python interpreter process in an
environment that has the `scalems` package installed.
We should try to keep this module as stable as possible so that the run time
interface provided by scalems to RP is robust, and the entry point scripts
change as little as possible across versions.
scalems.radical runtime details live in runtime.py.
As of :py:mod:`radical.pilot` version 1.18, TaskDescription supports
a *mode* field and several additional fields. The extended schema for the TaskDescription
depends on the value of *mode*. The structured data provided to the executor callable
is composed from these additional fields according to the particular mode.
We use the :py:data:`radical.pilot.task_description.TASK_FUNCTION` mode,
specifying a *function* field to name a callable in the (global) namespace
accessible by the worker process.
We populate the worker global namespace with imported callables in the module
file from which Raptor imports our :py:class:`radical.pilot.raptor.MPIWorker` subclass,
:py:class:`~scalems.radical.raptor.ScaleMSWorker`.
The callable for *call* accepts ``*args`` and ``**kwargs``, which are extracted
from fields in the TaskDescription. Since we derive from MPIWorker, an mpi4py
communicator is inserted at ``args[0]``.
Protocol
--------
As of RP 1.18, the :py:mod:`radical.pilot.raptor` interface is not documented.
The following diagrams illustrate the approximate architecture of a Raptor Session.
.. uml::
title RP Raptor client Session
box "RP client"
participant TaskManager
end box
'queue "Taskmanager: Scheduler and Input Queue" as Queue
queue "Scheduler/Input Queue" as Queue
-> TaskManager: submit(master_task_description)
activate TaskManager
'note left
note over TaskManager
TaskDescription uses `mode=rp.RAPTOR_MASTER`.
Task args provide details for the script
so that a Master can start appropriate Workers.
end note
TaskManager -> Queue: task_description
activate Queue
deactivate TaskManager
Queue <-
Queue -->
deactivate Queue
-> TaskManager: submit(task_description)
activate TaskManager
'note left
note over TaskManager
TaskDescription names a Master uid in *scheduler* field.
end note
TaskManager -> Queue: task_description
activate Queue
deactivate TaskManager
Queue <-
Queue -->
deactivate Queue
...
TaskManager ->
TaskManager <-- : master task results
A "master task" is an *executable* task (*mode* = ``rp.RAPTOR_MASTER``)
in which the script named by
:py:data:`radical.pilot.TaskDescription.executable`
manages the life cycle of a :py:class:`radical.pilot.raptor.Master`
(or subclass instance).
As of RP 1.14, the protocol is as follows.
.. uml::
title raptor master task lifetime management
participant "master task"
-> "master task" : stage in
create Master as master
"master task" -> master: create Master(cfg)
"master task" -> master: Master.submit_workers(descr=descr, count=n_workers)
"master task" -> master: Master.start()
alt optional hook for self-submitting additional tasks
"master task" -> master: Master.submit_tasks(tasks)
end
queue scheduler
scheduler -\\ master : request_cb
scheduler -\\ master : result_cb
master -> master: Master.stop() (optional)
"master task" -> master: Master.join()
[<-- "master task" : stage out
The *cfg* argument to :py:class:`radical.pilot.raptor.Master` does not currently
appear to be required.
*scalems* encodes worker requirements on the client side. The *scalems* master
script decodes the client-provided requirements, combines the information with
run time details and work load inspection, and produces the WorkerDescription
with which to :py:func:`~radical.pilot.raptor.Master.submit_workers()`.
.. uml::
class RaptorWorkerConfig {
count
descr
}
class WorkerDescription
RaptorWorkerConfig *--> "descr" WorkerDescription
namespace scalems.radical {
class MasterTaskConfiguration
class ClientWorkerRequirements
class ScaleMSMaster
ScaleMSMaster --> MasterTaskConfiguration : launches with
MasterTaskConfiguration *-- ClientWorkerRequirements
ClientWorkerRequirements -right-> .RaptorWorkerConfig : worker_description()
}
namespace radical.pilot.raptor {
class Master {
submit_workers()
}
Master::submit_workers .> .RaptorWorkerConfig
}
scalems.radical.ScaleMSMaster -up-|> radical.pilot.raptor.Master
.. warning:: The data structure for *descr* may still be evolving.
See https://github.com/radical-cybertools/radical.pilot/issues/2731
.. uml::
title scalems raptor runtime management
!pragma teoz true
box "client"
box "scalems.radical" #honeydew
participant RPDispatchingExecutor as client_runtime
end box
box "radical.pilot" #White
participant task_manager
end box
end box
queue "RP runtime" as scheduler
-> client_runtime : runtime_startup()
activate client_runtime
client_runtime -> client_runtime : _get_scheduler()
activate client_runtime
client_runtime -> client_runtime : master_script()
client_runtime -> client_runtime : master_input()
activate client_runtime
client_runtime -> client_runtime : worker_requirements()
activate client_runtime
note over client_runtime
TODO: Allocate Worker according to workload
through a separate call to the running Master.
end note
return ClientWorkerRequirements
return MasterTaskConfiguration
client_runtime -> client_runtime: launch Master Task
activate client_runtime
note over client_runtime, task_manager
We currently require a pre-existing venv for Master task.
TODO(#90,#141): Allow separate venv for Worker task and Tasks.
end note
client_runtime -> task_manager : submit master task
task_manager -\\ scheduler
note over task_manager, scheduler
Master cfg is staged with TaskDescription.
end note
deactivate client_runtime
note over client_runtime, task_manager
TODO(#92,#105): Make sure the Worker starts successfully!!!
end note
return master task
return dispatching context
TODO
----
Pass input and output objects more efficiently.
Worker processes come and go,
and are not descended from Master processes (which may not even be on the same node),
so we can't generally pass objects by reference or even in shared memory segments.
However, we can use optimized ZMQ facilities for converting network messages
directly to/from memory buffers.
In other cases, we can use the scalems typing system to specialize certain types
for optimized (de)serialization from/to file-backed storage or streams.
See https://github.com/SCALE-MS/scale-ms/issues/106
TODO
----
Get a stronger specification of the RP Raptor interface.
See https://github.com/radical-cybertools/radical.pilot/issues/2731
"""
from __future__ import annotations
__all__ = (
"ClientWorkerRequirements",
"MasterTaskConfiguration",
"master_script",
"master_input",
"master",
"worker_requirements",
"worker_description",
"SoftwareCompatibilityError",
"ScaleMSWorker",
"ScaleMSMaster",
"ScalemsRaptorWorkItem",
"WorkerDescription",
"RaptorWorkerConfig",
)
import abc
import argparse
import asyncio
import contextlib
import dataclasses
import functools
import importlib
import importlib.metadata
import json
import os
import sys
import tempfile
import typing
import warnings
import weakref
import zlib
from importlib.machinery import ModuleSpec
from importlib.util import find_spec
from collections.abc import Generator
import packaging.version
if typing.TYPE_CHECKING:
try:
from mpi4py.MPI import Comm
except ImportError:
Comm = None
# We import rp before `logging` to avoid warnings when rp monkey-patches the
# logging module. This `try` suite helps prevent auto-code-formatters from
# rearranging the import order of built-in versus third-party modules.
try:
import radical.pilot as rp
except (ImportError,):
warnings.warn("RADICAL Pilot installation not found.")
import scalems.exceptions
import scalems.radical
import scalems.messages
import scalems.file as _file
import scalems.store as _store
# QUESTION: How should we approach logging? To what degree can/should we integrate
# with rp logging?
import logging
logger = logging.getLogger(__name__)
# Note that we have not restricted propagation or attached a handler, so the messages
# to `logger` will end up going to the stderr of the process that imported this module
# (e.g. the master task or the Worker task) and appearing in the corresponding
# Task.stderr. This is available on the client side in the case of the master task,
# but only to the Master itself for the worker task.
# We should probably either add a specific LogHandler or integrate with the
# RP logging and Component based log files.
# See also
# * https://github.com/SCALE-MS/scale-ms/discussions/261
# * https://github.com/SCALE-MS/scale-ms/issues/255
@dataclasses.dataclass(frozen=True)
class BackendVersion:
"""Identifying information for the computing backend."""
name: str
"""Identifying name (presumably a module name)."""
version: str
"""Implementation revision identifier as a PEP 440 compatible version string."""
backend_version = BackendVersion(name="scalems.radical.raptor", version="0.0.0")
# TODO: Where does this identifier belong?
api_name = "scalems_v0"
"""Key for dispatching raptor Requests.
We can use this to identify the schema used for SCALE-MS tasks encoded in
arguments to raptor :py:data:`~radical.pilot.TASK_FUNCTION` mode executor.
"""
CPI_MESSAGE = "scalems.cpi"
"""Flag for scalems messages to be treated as CPI calls.
Used in the :py:attr:`~radical.pilot.TaskDescription.mode` field to indicate that the
object should be handled through the SCALEMS Compute Provider Interface machinery.
"""
class CpiCommand(abc.ABC):
_registry: typing.MutableMapping[str, typing.Type["CpiCommand"]] = weakref.WeakValueDictionary()
@classmethod
@abc.abstractmethod
def launch(cls, manager: ScaleMSMaster, task: TaskDictionary):
"""Process the RP Task as a CPI Command.
Called in ScaleMSMaster.request_cb().
"""
...
@classmethod
@abc.abstractmethod
def result_hook(cls, manager: ScaleMSMaster, task: TaskDictionary):
"""Called during Master.result_cb."""
@typing.final
@classmethod
def get(cls, command: scalems.messages.Command):
return CpiCommand._registry[command.__class__.__qualname__]
@classmethod
@abc.abstractmethod
def command_class(cls) -> str:
"""The qualified name of the associated `scalems.messages` Command class."""
...
def __init_subclass__(cls, **kwargs):
if cls is not CpiCommand:
CpiCommand._registry[cls.command_class()] = cls
super().__init_subclass__(**kwargs)
class CpiStop(CpiCommand):
"""Provide implementation for StopCommand in RP Raptor."""
@classmethod
def command_class(cls) -> str:
return scalems.messages.StopCommand.__qualname__
@classmethod
def launch(cls, manager: ScaleMSMaster, task: TaskDictionary):
logger.debug("CPI STOP issued.")
# TODO: Wait for pending tasks to complete and shut down Worker(s)
task["stderr"] = ""
task["stdout"] = ""
task["exit_code"] = 0
manager.cpi_finalize(task)
@classmethod
def result_hook(cls, manager: ScaleMSMaster, task: TaskDictionary):
manager.stop()
logger.debug(f"Finalized {str(task)}.")
class CpiHello(CpiCommand):
"""Provide implementation for HelloCommand in RP Raptor."""
@classmethod
def launch(cls, manager: ScaleMSMaster, task: TaskDictionary):
logger.debug("CPI HELLO in progress.")
task["stderr"] = ""
task["exit_code"] = 0
task["stdout"] = repr(backend_version)
# TODO: scalems object encoding.
task["return_value"] = dataclasses.asdict(backend_version)
logger.debug("Finalizing...")
manager.cpi_finalize(task)
@classmethod
def result_hook(cls, manager: ScaleMSMaster, task: TaskDictionary):
logger.debug(f"Finalized {str(task)}.")
@classmethod
def command_class(cls) -> str:
return scalems.messages.HelloCommand.__qualname__
class CpiAddItem(CpiCommand):
"""Add an item to the managed workflow record.
Descriptions of Work with no dependencies will be immediately scheduled for
execution (converted to RP tasks and submitted).
TBD: Data objects, references to existing data, completed tasks.
"""
def __init__(self, add_item: scalems.messages.AddItem):
encoded_item = add_item.encoded_item
item_dict = json.loads(encoded_item)
self.work_item = ScalemsRaptorWorkItem(
func=item_dict["func"],
module=item_dict["module"],
args=item_dict["args"],
kwargs=item_dict["kwargs"],
comm_arg_name=item_dict.get("comm_arg_name", None),
)
@classmethod
def launch(cls, manager: ScaleMSMaster, task: TaskDictionary):
"""Repackage a AddItem command as a rp.TaskDescription for submission to the Worker.
Note that we are not describing the exact function call directly,
but an encoded function call to be handled by the `run_in_worker`
dispatching function. The arguments will be serialized
together with the function code object into the
*work_item* key word argument for the RP Task.
The Master.request_cb() submits individual scalems tasks with this function.
More complex work is deserialized and managed by ScaleMSMaster using
the `raptor_work_deserializer` function, first.
See Also:
`scalems.radical.raptor.ScaleMSWorker.run_in_worker()`
"""
# submit the task and return its task ID right away,
# then allow its result to either just be used to
# * trigger dependent work,
# * only appear in the Master report, or
# * be available to follow-up LRO status checks.
add_item = typing.cast(
scalems.messages.AddItem, scalems.messages.Command.decode(task["description"]["metadata"])
)
encoded_item = add_item.encoded_item
# We do not yet use a strongly specified object schema. Just accept a dict.
item_dict = json.loads(encoded_item)
# Decouple the serialization schema since it is not strongly specified or robust.
work_item = ScalemsRaptorWorkItem(
func=item_dict["func"],
module=item_dict["module"],
args=item_dict["args"],
kwargs=item_dict["kwargs"],
comm_arg_name=item_dict.get("comm_arg_name", None),
)
# TODO(#277): Convert abstract inputs to concrete values. (More management interface.)
# TODO(#277): Check dependencies and runnability before submitting.
fingerprint = zlib.crc32(json.dumps(work_item, sort_keys=True).encode("utf8"))
# TODO: More robust fingerprint. Ref scalems.serialization and scalems.store
# TODO: Check for duplicates.
scalems_task_id = f"scalems-task-{fingerprint}"
scalems_task_description = rp.TaskDescription(
_RaptorTaskDescription(
uid=scalems_task_id,
# Note that (as of this writing) *executable* is required but unused for Raptor tasks.
executable="scalems",
scheduler=manager.uid,
# Note we could use metadata to encode additional info for ScaleMSMaster.request_cb,
# but it is not useful for execution of the rp.TASK_FUNCTION.
metadata=None,
mode=rp.TASK_FUNCTION,
function="run_in_worker",
args=[],
kwargs={"work_item": work_item},
)
)
# Note: There is no `Task` object returned from `Master.submit_tasks()`
# `Task` objects are currently only available on the client side;
# the agent side handles task dicts---which the master will receive through the *request_cb()*.
manager.submit_tasks(scalems_task_description)
logger.debug(f"Submitted {str(scalems_task_description)} in support of {str(task)}.")
# Record task metadata and track.
task["return_value"] = scalems_task_id
task["stdout"] = scalems_task_id
task["exit_code"] = 0
manager.cpi_finalize(task)
@classmethod
def result_hook(cls, manager: ScaleMSMaster, task: TaskDictionary):
logger.debug(f"Finalized {str(task)}.")
@classmethod
def command_class(cls) -> str:
return scalems.messages.AddItem.__qualname__
EncodableAsDict = typing.Mapping[str, "Encodable"]
EncodableAsList = typing.List["Encodable"]
Encodable = typing.Union[str, int, float, bool, None, EncodableAsDict, EncodableAsList]
# def object_decoder(obj: dict):
# """Provide the object_hook callback for a JSONDecoder."""
@functools.singledispatch
def object_encoder(obj) -> Encodable:
"""Provide the *default* callback for JSONEncoder."""
raise TypeError(f"No decoder for {obj.__class__.__qualname__}.")
# def get_decoder() -> json.JSONDecoder:
# """Get a JSONDecoder instance extended for types in this module."""
# decoder = json.JSONDecoder(object_hook=object_decoder)
# return decoder
#
#
# def get_encoder() -> json.JSONEncoder:
# """Get a JSONEncoder instance extended for types in this module."""
# encoder = json.JSONEncoder(default=object_encoder)
# return encoder
# @object_encoder.register
# def _(obj: rp.TaskDescription) -> dict:
# return obj.as_dict()
[docs]@dataclasses.dataclass(frozen=True)
class ClientWorkerRequirements:
"""Client-side details to inform worker provisioning.
This structure is part of the `scalems.radical.raptor.MasterTaskConfiguration`
provided to the master script. The master script uses this information
when calling `worker_description()`.
Use the :py:func:`worker_requirements()` creation function for a stable interface.
"""
cpu_processes: int
"""Number of ranks in the Worker MPI context.
TODO: We need to account for cores-per-process.
"""
gpus_per_process: int = 0
"""GPUs per Worker rank.
TODO: Reconcile with upcoming support for fractional ratios.
"""
pre_exec: tuple[str] = ()
"""Lines of shell expressions to be evaluated before launching the worker process."""
named_env: str = None
"""A registered virtual environment known to the raptor manager.
Warnings:
Not tested. Support has been delayed indefinitely.
"""
class _MasterTaskConfigurationDict(typing.TypedDict):
"""A `MasterTaskConfiguration` encoded as a `dict`."""
worker: dict # dict-encoded ClientWorkerRequirements.
versioned_modules: typing.List[typing.Tuple[str, str]]
[docs]@dataclasses.dataclass
class MasterTaskConfiguration:
"""Input to the script responsible for the RP raptor Master.
.. todo:: Check the schema for RP.
See https://github.com/radical-cybertools/radical.pilot/issues/2731
Produced on the client side with `master_input`. Deserialized in the
master task (`master`) to get a `RaptorWorkerConfig`.
"""
worker: ClientWorkerRequirements
"""Client-side details to inform worker provisioning.
Generated by `worker_requirements()`.
Used to prepare input for `worker_description()`.
"""
versioned_modules: typing.List[typing.Tuple[str, str]]
"""List of name, version specifier tuples for required modules."""
[docs] @classmethod
def from_dict(cls, obj: _MasterTaskConfigurationDict):
"""Decode from a native dict.
Support deserialization, such as from a JSON file in the master task.
"""
return cls(worker=ClientWorkerRequirements(**obj["worker"]), versioned_modules=list(obj["versioned_modules"]))
@object_encoder.register
def _(obj: MasterTaskConfiguration) -> dict:
return dataclasses.asdict(obj)
[docs]@functools.cache
def master_script() -> str:
"""Get the name of the RP raptor master script.
The script to run a RP Task based on a rp.raptor.Master is installed
with :py:mod`scalems`. Installation configures an "entry point" script
named ``scalems_rp_master``, but for generality this function should
be used to get the entry point name.
Before returning, this function confirms the availability of the entry point
script in the current Python environment. A client should arrange for
the script to be called in the execution environment and to confirm
that the (potentially remote) entry point matches the expected API.
Returns:
str: Installed name of the entry point wrapper for :py:func:`~scalems.radical.raptor.master()`
"""
try:
# TODO(Pyhon 3.10): Use importlib.metadata.entry_points(group='console_scripts')?
import pkg_resources
except ImportError:
pkg_resources = None
_master_script = "scalems_rp_master"
if pkg_resources is not None:
# It is not hugely important if we cannot perform this test.
# In reality, this should be performed at the execution site, and we can/should
# remove the check here once we have effective API compatibility checking.
# See https://github.com/SCALE-MS/scale-ms/issues/100
assert pkg_resources.get_entry_info("scalems", "console_scripts", "scalems_rp_master").name == _master_script
return _master_script
[docs]def worker_requirements(*, pre_exec: typing.Iterable[str], worker_venv: str) -> ClientWorkerRequirements:
"""Get the requirements for the work load, as known to the client.
TODO: Inspect workflow to optimize reusability of the initial Worker submission.
"""
# TODO: calculate cores.
# rp_config = scalems.radical.configuration()
# pilot_description = rp_config.rp_resource_params["PilotDescription"]
# available_cores = pilot_description.get("cores")
# if not available_cores:
# cores_per_node =
# ...
cores_per_worker = 1
gpus_per_worker = 0
workload_metadata = ClientWorkerRequirements(
named_env=worker_venv,
pre_exec=tuple(pre_exec),
cpu_processes=cores_per_worker,
gpus_per_process=gpus_per_worker,
)
return workload_metadata
parser = argparse.ArgumentParser(description="Command line entry point for RP Raptor master task.")
parser.add_argument("file", type=str, help="Input file (JSON) for configuring ScaleMSMaster instance.")
[docs]def master():
"""Entry point for raptor.Master task.
This function implements the :command:`scalems_rp_master` entry point script
called by the RADICAL Pilot executor to provide the raptor master task.
During installation, the `scalems` build system creates a :file:`scalems_rp_master`
`entry point <https://setuptools.pypa.io/en/latest/pkg_resources.html#entry-points>`__
script that provides command line access to this function. The Python signature
takes no arguments because the function processes command line arguments from `sys.argv`
.. uml::
title scalems raptor master task lifetime
!pragma teoz true
queue "RP runtime" as scheduler
box execution
box "scalems.radical" #honeydew
participant "master task"
participant ScaleMSMaster as master
end box
participant worker.py
end box
scheduler -> "master task" : stage in
?-> "master task" : scalems_rp_master
activate "master task"
note over "master task" : "TODO: Get asyncio event loop?"
note over "master task" : "TODO: Initialize scalems FileStore"
"master task" -> "master task" : from_dict
activate "master task"
return MasterTaskConfiguration
"master task" -> master **: create ScaleMSMaster()
"master task" -> "master task" : with configure_worker()
activate "master task"
"master task" -> master : configure_worker()
activate master
master -> worker.py ** : with _worker_file()
activate master
master -> master : _configure_worker()
activate master
master -> master : worker_description()
activate master
return
return WorkerDescription
deactivate master
return RaptorWorkerConfig
'master --> "master task" : RaptorWorkerConfig
'deactivate master
"master task" -> master: submit_workers(**config)
"master task" -> master: start()
note over "master task" : "TODO: wait for worker"
...
scheduler -\\ master : request_cb
scheduler -\\ master : result_cb
...
"master task" -> master : ~__exit__
activate master
master -> master : ~__exit__
activate master
master --> master : stop worker (TODO)
master -> worker.py !! : delete
deactivate master
master -> master: stop() (TBD)
"master task" -> master: join()
deactivate master
deactivate "master task"
scheduler <-- "master task" : stage out
deactivate "master task"
"""
# TODO: Configurable log level?
logger.setLevel(logging.DEBUG)
character_stream = logging.StreamHandler()
character_stream.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
character_stream.setFormatter(formatter)
logger.addHandler(character_stream)
file_logger = logging.FileHandler("scalems.radical.raptor.log")
file_logger.setLevel(logging.DEBUG)
file_logger.setFormatter(formatter)
logging.getLogger("scalems").addHandler(file_logger)
if not os.environ["RP_TASK_ID"]:
warnings.warn("Attempting to start a Raptor Master without RP execution environment.")
rp_version = packaging.version.parse(rp.version)
args = parser.parse_args()
if not os.path.exists(args.file):
raise RuntimeError(f"File not found: {args.file}")
with open(args.file, "r") as fh:
configuration = MasterTaskConfiguration.from_dict(json.load(fh))
logger.info(f"Launching ScaleMSMaster task in raptor {rp_version} with {repr(dataclasses.asdict(configuration))}.")
_master = ScaleMSMaster(configuration)
logger.debug(f"Created {repr(_master)}.")
requirements = configuration.worker
try:
with _master.configure_worker(requirements=requirements) as configs:
assert len(configs) == 1
worker_submission: rp.TaskDescription = configs[0]
assert os.path.isabs(worker_submission.raptor_file)
assert os.path.exists(worker_submission.raptor_file)
logger.debug(f"Submitting {len(configs)} {worker_submission.as_dict()}")
_master.submit_workers(descriptions=configs)
message = "Submitted workers: "
message += ", ".join(_master.workers.keys())
logger.info(message)
_master.start()
logger.debug("Master started.")
# Make sure at least one worker comes online.
_master.wait_workers(count=1)
logger.debug("Ready to submit raptor tasks.")
# Confirm all workers start successfully or produce useful error
# (then release temporary file).
_master.wait_workers()
for uid, worker in _master.workers.items():
logger.info(f"Worker {uid} in state {worker['status']}.")
_master.join()
logger.debug("Master joined.")
finally:
if sys.exc_info() != (None, None, None):
logger.exception("Master task encountered exception.")
logger.debug("Completed master task.")
def _configure_worker(*, requirements: ClientWorkerRequirements, filename: str) -> RaptorWorkerConfig:
"""Prepare the arguments for :py:func:`rp.raptor.Master.submit_workers()`.
Provide an abstraction for the submit_workers signature and parameter types,
which may still be evolving.
See also https://github.com/radical-cybertools/radical.pilot/issues/2731
"""
assert os.path.exists(filename)
# TODO(#248): Consider a "debug-mode" option to do a trial import from *filename*
# TODO(#302): Number of workers should be chosen after inspecting the requirements of the work load.
num_workers = 1
descr = worker_description(
named_env=requirements.named_env,
worker_file=filename,
pre_exec=requirements.pre_exec,
cpu_processes=requirements.cpu_processes,
gpus_per_process=requirements.gpus_per_process,
)
config: RaptorWorkerConfig = [descr] * num_workers
return config
class SoftwareCompatibilityError(RuntimeError):
"""Incompatible package versions or software interfaces."""
def _get_module_version(module: str):
"""Get version metadata for importable module."""
try:
found_version = importlib.metadata.version(module)
except importlib.metadata.PackageNotFoundError:
found_version = None
if found_version is None:
try:
spec: ModuleSpec = find_spec(module)
if spec and hasattr(spec, "parent"):
found_version = importlib.metadata.version(spec.parent)
except Exception as e:
logger.debug(f"Exception when trying to find {module}: ", exc_info=e)
if found_version is not None:
found_version = packaging.version.Version(found_version)
return found_version
[docs]class ScaleMSMaster(rp.raptor.Master):
"""Manage a RP Raptor scheduler target.
Extends :py:class:`rp.raptor.Master`.
We do not submit scalems tasks directly as RP Tasks from the client.
We encode scalems tasks as instructions to the Master task, which will be
decoded and translated to RP Tasks by the `ScaleMSMaster.request_cb` and
self-submitted to the `ScaleMSWorker`.
Results of such tasks are only available through to the :py:func:`result_cb`.
The Master can translate results of generated Tasks into results for the Task
carrying the coded instruction, or it can produce data files to stage during or
after the Master task. Additionally, the Master could respond to other special
instructions (encoded in later client-originated Tasks) to query or retrieve
generated task results.
.. uml::
title raptor Master
queue "Queue" as Queue
box "RP Agent"
participant Scheduler
end box
queue "Master input queue" as master_queue
box "scalems.radical.raptor"
participant ScaleMSMaster
participant rpt.Master
end box
queue "ZMQ Raptor work channel" as channel
activate Queue
Scheduler -> Queue: accepts work
activate Scheduler
Scheduler <-- Queue: task dictionary
deactivate Queue
note over Scheduler
Scheduler gets task from queue,
observes `scheduler` field and routes to Master.
end note
Scheduler -> master_queue: task dictionary
activate master_queue
deactivate Scheduler
note over ScaleMSMaster, rpt.Master
Back end RP queue manager
passes messages to Master callbacks.
end note
rpt.Master -> master_queue: accept Task
activate rpt.Master
rpt.Master <-- master_queue: cpi_task
deactivate master_queue
rpt.Master -> rpt.Master: _request_cb(cpi_task)
activate rpt.Master
rpt.Master -> ScaleMSMaster: request_cb(cpi_task)
activate ScaleMSMaster
ScaleMSMaster -> ScaleMSMaster: CpiCommand.launch()
activate ScaleMSMaster
alt optionally process or update requests
ScaleMSMaster -> ScaleMSMaster: submit_tasks(scalems_tasks)
activate ScaleMSMaster
deactivate ScaleMSMaster
ScaleMSMaster -> ScaleMSMaster: _result_cb(cpi_task)
activate ScaleMSMaster
deactivate ScaleMSMaster
else resolve a Control message
ScaleMSMaster -> ScaleMSMaster: _result_cb(cpi_task)
activate ScaleMSMaster
deactivate ScaleMSMaster
end
return
return filtered tasks
rpt.Master -> rpt.Master: submit_tasks(...)
activate rpt.Master
deactivate rpt.Master
rpt.Master -> channel: send message
deactivate rpt.Master
deactivate rpt.Master
rpt.Master -> channel: accept result
activate rpt.Master
rpt.Master <-- channel: scalems_tasks
deactivate channel
rpt.Master -> rpt.Master: _result_cb(scalems_tasks))
activate rpt.Master
rpt.Master -> ScaleMSMaster: result_cb(scalems_tasks)
activate ScaleMSMaster
ScaleMSMaster -> ScaleMSMaster: CpiCommand.result_hook()
alt optionally process or update pending requests
ScaleMSMaster -> ScaleMSMaster: issue#277
activate ScaleMSMaster
deactivate ScaleMSMaster
end
rpt.Master <-- ScaleMSMaster
deactivate ScaleMSMaster
rpt.Master -> master_queue: send message
activate master_queue
deactivate rpt.Master
deactivate rpt.Master
"""
[docs] def __init__(self, configuration: MasterTaskConfiguration):
"""Initialize a SCALE-MS Raptor Master.
Verify the environment. Perform some scalems-specific set up and
initialize the base class details.
"""
# Verify environment.
for module, version in configuration.versioned_modules:
logger.debug(f"Looking for {module} version {version}.")
found_version = _get_module_version(module)
if found_version is None:
raise SoftwareCompatibilityError(f"{module} not found.")
minimum_version = packaging.version.Version(version)
if found_version >= minimum_version:
logger.debug(f"Found {module} version {found_version}: Okay.")
else:
raise SoftwareCompatibilityError(f"{module} version {found_version} not compatible with {version}.")
# The rp.raptor base class will fail to initialize without the environment
# expected for a RP task launched by an RP agent. However, we may still
# want to acquire a partially-initialized instance for testing.
try:
super(ScaleMSMaster, self).__init__()
except KeyError:
warnings.warn("Master incomplete. Could not initialize raptor.Master base class.")
# Initialize internal state.
# TODO: Use a scalems RuntimeManager.
self.__worker_files = {}
@contextlib.contextmanager
def _worker_file(self):
"""Scoped temporary module file for raptor worker.
Not thread-safe. (Should it be?)
Write and return the path to a temporary Python module. The module imports
:py:class:`ScaleMSWorker` into its module namespace
so that the file and class can be used in the worker description for
:py:func:`rp.raptor.Master.submit_workers()`
"""
worker = ScaleMSWorker
# Note: there does not appear to be any utility in any lines to evaluate
# beyond the Worker subclass import. This namespace will not be available
# (through `globals()` or `locals()`, at least) to the rp.TASK_FUNCTION
# implementation in rp.raptor.worker_mpi._Worker._dispatch_function.
text = [
f"from {worker.__module__} import {worker.__name__}\n",
# f'from {run_in_worker.__module__} import {run_in_worker.__name__}\n'
]
# TODO: Use the Master's FileStore to get an appropriate fast shared filesystem.
with tempfile.TemporaryDirectory() as tmp_dir:
filename = self.__worker_files.get(worker, None)
if filename is None:
filename = next_numbered_file(dir=tmp_dir, name="scalems_worker", suffix=".py")
with open(filename, "w") as fh:
fh.writelines(text)
self.__worker_files[worker] = filename
yield filename
del self.__worker_files[worker]
[docs] def request_cb(self, tasks: typing.Sequence[TaskDictionary]) -> typing.Sequence[TaskDictionary]:
"""Allows all incoming requests to be processed by the Master.
RADICAL guarantees that :py:func:`~radical.pilot.raptor.Master.request_cb()`
calls are made sequentially in a single thread.
The implementation does not need to be thread-safe or reentrant.
(Ref: https://github.com/radical-cybertools/radical.utils/blob/master/src/radical/utils/zmq/queue.py#L386)
RADICAL does not guarantee that Tasks are processed in the same order in
which they are submitted by the client.
If overridden, request_cb() must return a :py:class:`list`. The returned list
is interpreted to be requests that should be processed normally after the callback.
This allows subclasses of rp.raptor.Master to add or remove requests before they become Tasks.
The returned list is submitted with self.submit_tasks() by the base class after the
callback returns.
A Master may call *self.submit_tasks()* to self-submit items (e.g. instead of or in
addition to manipulating the returned list in *request_cb()*).
It is the developer's responsibility to choose unique task IDs (uid) when crafting
items for Master.submit_tasks().
"""
try:
# It is convenient to compartmentalize the filtering and dispatching
# of scalems tasks in a generator function, but this callback signature
# requires a `list` to be returned.
remaining_tasks = self._scalems_handle_requests(tasks)
# Let non-scalems tasks percolate to the default machinery.
return list(remaining_tasks)
except Exception as e:
logger.exception("scalems request filter propagated an exception.")
# TODO: Submit a clean-up task.
# Use a thread or asyncio event loop controlled by the main master task thread
# to issue a cancel to all outstanding tasks and stop the workers.
# Question: Is there a better / more appropriate way to exit cleanly on errors?
self.stop()
# Note that we are probably being called in a non-root thread.
# TODO: Avoid letting an exception escape unhandled.
# WARNING: RP 1.18 suppresses exceptions from request_cb(), but there is a note
# indicating that exceptions will cause task failure in a future version.
# But the failure modes of the scalems master task are not yet well-defined.
# See also
# * https://github.com/SCALE-MS/scale-ms/issues/218 and
# * https://github.com/SCALE-MS/scale-ms/issues/229
raise e
[docs] def cpi_finalize(self, task: TaskDictionary):
"""Short-circuit the normal Raptor protocol to finalize a task.
This is an alias for Master._result_cb(), but is a public method used
in the :py:class:`CpiCommand.launch` method for
:py:class:`~scalems.messages.Control` commands, which do not
call :py:func:`Master.submit_tasks`.
"""
self._result_cb(task)
def _scalems_handle_requests(self, tasks: typing.Sequence[TaskDictionary]):
for task in tasks:
_finalized = False
try:
mode = task["description"]["mode"]
# Allow non-scalems work to be handled normally.
if mode != CPI_MESSAGE:
logger.debug(f"Deferring {str(task)} to regular RP handling.")
yield task
continue
else:
command = scalems.messages.Command.decode(task["description"]["metadata"])
logger.debug(f"Received message {command} in {str(task)}")
impl: typing.Type[CpiCommand] = CpiCommand.get(command)
# # TODO(#277)
# # Check work dependencies. Generate necessary data staging.
# self._dependents[task['uid']] = ...
# # Generate sub-tasks and manage dependencies. Prune tasks that
# # are already completed. Produce appropriate error if work cannot
# # be performed. Manage sufficient state that result_cb is able to
# # submit tasks as their dependencies are met.
#
# self._workload.add(task)
# ...
impl.launch(self, task)
# Note: this _finalized flag is immediately useless for non-Control commands.
# TODO: We need more task state maintenance, probably with thread-safety.
_finalized = True
except Exception as e:
# Exceptions here are presumably bugs in scalems or RP.
# Almost certainly, we should shut down after cleaning up.
# 1. TODO: Record error in log for ScaleMSMaster.
# 2. Make sure that task is resolved.
# 3. Trigger clean shut down of master task.
if not _finalized:
# TODO: Mark failed, or note exception
self._result_cb(task)
raise e
[docs] def result_cb(self, tasks: typing.Sequence[TaskDictionary]):
"""SCALE-MS specific handling of completed tasks.
We perform special handling for two types of tasks.
1. Tasks submitted throughcompleted by the collaborating Worker(s).
2. Tasks intercepted and resolved entirely by the Master (CPI calls).
"""
# Note: At least as of RP 1.18, exceptions from result_cb() are suppressed.
for task in tasks:
logger.debug(f'Result callback for {task["description"]["mode"]} task {task["uid"]}: {task}')
mode = task["description"]["mode"]
# Allow non-scalems work to be handled normally.
if mode == CPI_MESSAGE:
command = scalems.messages.Command.decode(task["description"]["metadata"])
logger.debug(f"Finalizing {command} in {str(task)}")
impl: typing.Type[CpiCommand] = CpiCommand.get(command)
impl.result_hook(self, task)
# # Release dependent tasks.
# # https://github.com/SCALE-MS/randowtal/blob/c58452a3eaf0c058337a85f4fca3f51c5983a539/test_am
# # /brer_master.py#L114
# # TODO: use scalems work graph management.
# if task['uid'] in self._dependents:
# # Warning: The dependent task may have multiple dependencies.
# dep = self._dependents[task['uid']]
# self._log.debug('=== submit dep %s', dep['uid'])
# self.submit_tasks(dep)
#
# mode = task['description']['mode']
#
# # NOTE: `state` will be `AGENT_EXECUTING`
# self._log.debug('=== out: %s', task['stdout'])
# self._log.debug('result_cb %s: %s [%s] [%s]',
# task['uid'],
# task['state'],
# sorted(task['stdout']),
# task['return_value'])
# #TODO(#108)
# # Complete and publish
# # https://github.com/SCALE-MS/randowtal/blob/c58452a3eaf0c058337a85f4fca3f51c5983a539/test_am
# # /brer_master.py#L114
# # Check whether all of the submitted tasks have been completed.
# if self._submitted == self._completed:
# # we are done, not more workloads to wait for - return the tasks
# # and terminate
# # TODO: We can advance the rp task state with a callback registered with the corresponding
# # unwrapped scalems task.
# for req in self._workload:
# # TODO: create raptor method
# req['task']['target_state'] = rp.DONE
# self.advance(req['task'], rp.AGENT_STAGING_OUTPUT_PENDING,
# publish=True, push=True)
[docs]class WorkerDescription(rp.TaskDescription):
"""Worker description.
See Also:
* :py:meth:`~radical.pilot.raptor.Master.submit_workers()`
* https://github.com/radical-cybertools/radical.pilot/issues/2731
"""
cores_per_rank: typing.Optional[int]
environment: typing.Optional[dict[str, str]]
gpus_per_rank: typing.Optional[int]
named_env: typing.Optional[str]
pre_exec: typing.Optional[list[str]]
ranks: int
raptor_class: str
raptor_file: str
mode: str # rp.RAPTOR_WORKER
RaptorWorkerConfig = list[WorkerDescription]
"""Client-specified Worker requirements.
Used by master script when calling `worker_description()`.
Created internally with :py:func:`_configure_worker()`.
See Also:
:py:mod:`scalems.radical.raptor.ClientWorkerRequirements`
"""
[docs]def worker_description(
*,
named_env: str,
worker_file: str,
cores_per_process: int = None,
cpu_processes: int = None,
gpus_per_process: int = None,
pre_exec: typing.Iterable[str] = (),
environment: typing.Optional[typing.Mapping[str, str]] = None,
) -> WorkerDescription:
"""Get a worker description for Master.submit_workers().
Parameters:
cores_per_process (int, optional): See `radical.pilot.TaskDescription.cores_per_rank`
cpu_processes (int, optional): See `radical.pilot.TaskDescription.ranks`
environment (dict, optional): Environment variables to set in the Worker task.
gpus_per_process (int, optional): See `radical.pilot.TaskDescription.gpus_per_rank`
named_env (str): Python virtual environment registered with `radical.pilot.Pilot.prepare_env`
(currently ignored. see #90).
pre_exec (list[str]): Shell command lines for preparing the worker environment.
worker_file (str): Standalone module from which to import ScaleMSWorker.
*worker_file* is generated for the master script by the caller.
See :py:mod:`scalems.radical.raptor`
The *uid* for the Worker task is defined by the Master.submit_workers().
"""
descr = WorkerDescription()
if cores_per_process is not None:
descr.cores_per_rank = cores_per_process
if environment is not None:
descr.environment = environment
if gpus_per_process is not None:
descr.gpus_per_rank = gpus_per_process
if named_env is not None:
# descr.named_env=None
logger.debug(f"Ignoring named_env={named_env}. Parameter is currently unused.")
descr.pre_exec = list(pre_exec)
if cpu_processes is not None:
descr.ranks = cpu_processes
descr.raptor_class = "ScaleMSWorker"
descr.raptor_file = worker_file
descr.mode = rp.RAPTOR_WORKER
return descr
class RaptorTaskExecutor(typing.Protocol):
"""Represent the signature of executor functions for rp.raptor.MPIWorker."""
def __call__(self, *args, comm: Comm, **kwargs) -> Encodable:
...
[docs]class ScaleMSWorker(rp.raptor.MPIWorker):
"""Specialize the Raptor MPI Worker for scalems dispatching of serialised work.
scalems tasks encode the importable function and inputs in the arguments to
the `run_in_worker` dispatching function,
which is available to the `ScaleMSWorker` instance.
.. uml::
title ScaleMSWorker task dispatching
queue "ZMQ Raptor work channel" as channel
box "scalems.radical.raptor.Worker"
participant ScaleMSWorker
participant rpt.Worker
end box
box "usermodule"
participant usermodule
end box
box "target venv"
end box
rpt.Worker -> channel: pop message
activate rpt.Worker
rpt.Worker <-- channel: rp.TASK_FUNCTION mode Task
rpt.Worker -> rpt.Worker: _dispatch_function()
activate rpt.Worker
alt scalems encoded workload
rpt.Worker -> ScaleMSWorker: run_in_worker()
activate ScaleMSWorker
ScaleMSWorker -> ScaleMSWorker: unpack encoded function call
ScaleMSWorker -> ScaleMSWorker: from usermodule import func
ScaleMSWorker -> usermodule: ""func(*args, **kwargs)""
activate usermodule
ScaleMSWorker <-- usermodule
deactivate usermodule
return
else pickled rp.PythonTask workload (not implemented)
rpt.Worker -> rpt.Worker: unpickled scalems handler
activate rpt.Worker
rpt.Worker -> usermodule: ""func(*args, **kwargs)""
activate usermodule
rpt.Worker <-- usermodule
deactivate usermodule
rpt.Worker --> rpt.Worker: {out, err, ret, value}
deactivate rpt.Worker
end
deactivate rpt.Worker
rpt.Worker -> channel: put result
deactivate rpt.Worker
"""
[docs] def run_in_worker(self, *, work_item: ScalemsRaptorWorkItem, comm=None):
"""Unpack and run a task requested through RP Raptor.
Satisfies RaptorTaskExecutor protocol. Named as the *function* for
rp.TASK_FUNCTION mode tasks generated by scalems.
This function MUST be imported and referentiable by its *__qualname__* from
the scope in which ScaleMSWorker methods execute. (See ScaleMSMaster._worker_file)
Parameters:
comm (mpi4py.MPI.Comm, optional): MPI communicator to be used for the task.
work_item: dictionary of encoded function call from *kwargs* in the TaskDescription.
See Also:
:py:func:`CpiAddItem.launch()`
"""
module = importlib.import_module(work_item["module"])
func = getattr(module, work_item["func"])
args = list(work_item["args"])
kwargs = work_item["kwargs"].copy()
comm_arg_name = work_item.get("comm_arg_name", None)
if comm_arg_name is not None:
if comm_arg_name:
kwargs[comm_arg_name] = comm
else:
args.append(comm)
logger.debug(
"Calling {func} with args {args} and kwargs {kwargs}",
{"func": func.__qualname__, "args": repr(args), "kwargs": repr(kwargs)},
)
return func(*args, **kwargs)
def raptor_work_deserializer(*args, **kwargs):
"""Unpack a workflow document from a Task.
When a `ScaleMSMaster.request_cb()` receives a raptor task, it uses this
function to unpack the instructions and construct a work load.
"""
raise scalems.exceptions.MissingImplementationError
[docs]class ScalemsRaptorWorkItem(typing.TypedDict):
"""Encode the function call to implement a task in the workflow.
Parameter type for the `run_in_worker` function *work_item* argument.
This structure must be trivially serializable (by `msgpack`) so that it can
be passed in a TaskDescription.kwargs field. Consistency with higher level
`scalems.serialization` is not essential, but can be pursued in the future.
The essential role is to represent an importable callable and its arguments.
At run time, the dispatching function (`run_in_worker`) conceivably has access
to module scoped state, but does not have direct access to the Worker (or
any resources other than the `mpi4py.MPI.Comm` object). Therefore, abstract
references to input data should be resolved at the Master in terms of the
expected Worker environment before preparing and submitting the Task.
TODO: Evolve this to something sufficiently general to be a scalems.WorkItem.
TODO: Clarify the translation from an abstract representation of work in terms
of the workflow record to the concrete representation with literal values and
local filesystem paths.
TODO: Record extra details like implicit filesystem interactions,
environment variables, etc. TBD whether they belong in a separate object.
"""
func: str
"""A callable to be retrieved as an attribute in *module*."""
module: str
"""The qualified name of a module importable by the Worker."""
args: list
"""Positional arguments for *func*."""
kwargs: dict
"""Key word arguments for *func*."""
comm_arg_name: typing.Optional[str]
"""Identify how to provide an MPI communicator to *func*, if at all.
If *comm_arg_name* is not None, the callable will be provided with the
MPI communicator. If *comm_arg_name* is an empty string, the communicator
is provided as the first positional argument. Otherwise, *comm_arg_name*
must be a valid key word argument by which to pass the communicator to *func*.
"""
[docs]class _RaptorTaskDescription(typing.TypedDict):
"""Describe a Task to be executed through a Raptor Worker.
A specialization of `radical.pilot.TaskDescription`.
Note the distinctions of a TaskDescription to be processed by a raptor.Master.
The meaning or utility of some fields is dependent on the values of other fields.
TODO: We need some additional fields, like *environment* and fields related
to launch method and resources reservation. Is the schema for rp.TaskDescription
sufficiently strong and well-documented that we can remove this hinting type?
(Check again at RP >= 1.19)
"""
uid: str
"""Unique identifier for the Task across the Session."""
executable: str
"""Unused by Raptor tasks."""
scheduler: str
"""The UID of the raptor.Master scheduler task.
This field is relevant to tasks routed from client TaskManagers. It is not
used for tasks originating in master tasks.
.. ref https://github.com/SCALE-MS/scale-ms/discussions/258#discussioncomment-4087870
"""
metadata: Encodable
"""An arbitrary user-provided payload.
May be any type that is encodable by :py:mod:`msgpack` (i.e. built-in types).
"""
mode: str
"""The executor mode for the Worker to use.
For ``rp.TASK_FUNCTION`` mode, either *function* or *method* must name a task executor.
Depending on the Worker (sub)class, resources such as an `mpi4py.MPI.Comm`
will be provided as the first positional argument to the executor.
``*args`` and ``**kwargs`` will be provided to the executor from the corresponding
fields.
See Also:
* :py:data:`CPI_MESSAGE`
* :py:class:`RaptorTaskExecutor`
"""
function: str
"""Executor for ``rp.TASK_FUNCTION`` mode.
Names the callable for dispatching.
The callable can either be a function object present in the namespace of the interpreter launched
by the Worker for the task, or a :py:class:`radical.pilot.pytask.PythonTask` pickled function object.
"""
args: list
"""For ``rp.TASK_FUNCTION`` mode, list of positional arguments provided to the executor function."""
kwargs: dict
"""For ``rp.TASK_FUNCTION`` mode, a dictionary of key word arguments provided to the executor function."""
[docs]class TaskDictionary(typing.TypedDict):
"""Task representations seen by *request_cb* and *result_cb*.
Other fields may be present, but the objects in the sequences provided to
:py:meth:`scalems.radical.raptor.ScaleMSMaster.request_cb()` and
:py:meth:`scalems.radical.raptor.ScaleMSMaster.result_cb()` have the following fields.
Result fields will not be populated until the Task runs.
For the expected fields, see the source code for
:py:meth:`~radical.pilot.Task.as_dict()`:
https://radicalpilot.readthedocs.io/en/stable/_modules/radical/pilot/task.html#Task.as_dict
"""
uid: str
"""Canonical identifier for the Task.
Note that *uid* may be omitted from the original TaskDescription.
"""
description: _RaptorTaskDescription
"""Encoding of the original task description."""
stdout: str
"""Task standard output."""
stderr: str
"""Task standard error."""
exit_code: int
"""Task return code."""
return_value: typing.Any
"""Function return value.
Refer to the :py:class:`RaptorTaskExecutor` Protocol.
"""
exception: str
"""Exception type name and message."""
exception_detail: str
"""Full exception details with stack trace."""
state: str
"""RADICAL Pilot Task state."""
target_state: str
"""State to which the Task should be advanced.
Valid values are string constants from :py:mod:`radical.pilot.states`.
Used internally, such as in Master._result_cb().
"""
def next_numbered_file(*, dir, name, suffix):
"""Get the next available filename with the form {dir}/{name}{i}{suffix}."""
if not os.path.exists(dir):
raise ValueError(f'Directory "{dir}" does not exist.')
filelist = os.listdir(dir)
template = str(name) + "{i}" + str(suffix)
i = 1
filename = template.format(i=i)
while filename in filelist:
i += 1
filename = template.format(i=i)
return os.path.join(dir, filename)