scalems.radical execution
User interface is documented at scalems.radical
.
execution module interface
- scalems.radical.configuration(*args, **kwargs)[source]
Get a RADICAL runtime configuration.
Accepts a single
argparse.Namespace
argument (seeparser()
) or key word arguments. With no arguments, the command line parser is invoked to try to build a new configuration.If key word arguments are provided, try to construct a
scalems.radical.runtime.RuntimeConfiguration
.See also
current_configuration()
retrieves the configuration for an active RuntimeManager, if any.- Return type:
- scalems.radical.workflow_manager(loop, directory=None)[source]
Manage a workflow context for RADICAL Pilot work loads.
The rp.Session is created when the Python Context Manager is “entered”, so the asyncio event loop must be running before then.
To help enforce this, we use an async Context Manager, at least in the initial implementation. However, the implementation is not thread-safe. It is not reentrant, but this is not checked. We probably _do_ want to constrain ourselves to zero or one Sessions per environment, but we _do_ need to support multiple Pilots and task submission scopes (_resource requirement groups). Further discussion is welcome.
Warning
The importer of this module should be sure to import radical.pilot before importing the built-in logging module to avoid spurious warnings.
- Parameters:
loop (AbstractEventLoop) –
scalems.radical support module
Manage the RADICAL Pilot start-up and shut-down.
The provided RuntimeSession class encapsulates stateful resources that, once acquired, should be shut down explicitly. RuntimeSession instances may be used as context managers to ensure the proper protocol is followed, or the caller can take responsibility for calling RuntimeSession.close() to shut down. TODO: Fix!
Note: Consider whether RuntimeSession context manager is reentrant or multi-use.
The RuntimeSession state encapsulates several nested states. A Session, TaskManager,
and PilotManager must be created for the RuntimeSession to be usable. Additionally, Pilots and
scheduler tasks may be added or removed during the RuntimeSession lifetime. To better support
the alternate scenarios when a RuntimeSession instance may be provided to a scalems.radical
component in an arbitrary state, consider making the with
block scoped, such that
it only returns the RuntimeSession instance to its initial state when exiting, rather than
shutting down completely.
See also https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack
- Deferred:
RuntimeSession can avoid providing direct access to RP interface, and instead run an entire RP Session state machine in a thread (separate from the asyncio event loop thread), relaying RP scripting commands through queues, in order to completely prevent misuse and to insulate the asyncio event loop from blocking RP commands. We need to get a better sense of the RP flow combinatorics before we can reasonably pursue this.
- scalems.radical.runtime.executor_factory(manager, params=None)[source]
- Parameters:
manager (WorkflowManager) –
params (RuntimeConfiguration) –
- class scalems.radical.runtime.RPDispatchingExecutor[source]
Client side manager for work dispatched through RADICAL Pilot.
Extends
scalems.execution.RuntimeManager
Configuration points:
resource config
pilot config
session config?
We try to wrap rp UI calls in separate threads. Note, though, that
The rp.Session needs to be created in the root thread to be able to correctly manage signal handlers and subprocesses, and
We need to be able to schedule RP Task callbacks in the same process as the asyncio event loop in order to handle Futures for RP tasks.
See https://github.com/SCALE-MS/randowtal/issues/2
- runtime: RuntimeSession
- __init__(*, editor_factory=None, datastore=None, loop, configuration)[source]
Create a client side execution manager.
Warning
The creation method does not fully initialize the instance.
Initialization and de-initialization occurs through the Python (async) context manager protocol.
- Parameters:
datastore (FileStore) –
loop (AbstractEventLoop) –
configuration (RuntimeConfiguration) –
- runtime_configuration()[source]
Provide scoped Configuration.
Merge the runtime manager’s configuration with the global configuration, update the global configuration, and yield the configuration for a
with
block.Restores the previous global configuration when exiting the
with
block.Warning
Not thread-safe.
We use the ContextVar to check for (and disallow) re-entrance. This contextmanager is not async, but it could be (and is) used within an asynchronous context manager. If used from multiple threads, we would not have anything structurally prohibiting reentrant calls.
Design notes
Instance configuration is coupled to module state except as allowed through
contextvars.copy()
.- Return type:
- async runtime_startup()[source]
Establish the RP Session.
Acquire a maximally re-usable set of RP resources. The scope established by this function is as broad as it can be within the life of the workflow manager.
Once instance.runtime_startup() succeeds, instance.runtime_shutdown() must be called to clean up resources. Use the async context manager behavior of the instance to automatically follow this protocol. I.e. instead of calling
instance.runtime_startup(); ...; instance.runtime_shutdown()
, use:async with instance: ...
- Raises:
DispatchError – if task dispatching could not be set up.
asyncio.CancelledError – if parent
asyncio.Task
is cancelled while executing.
- Return type:
Task
Note
Signal handling
RP is known to use IPC signals in several cases. We believe that the client environment should only experience an internally triggered SIGINT in a single code path, and the behavior can be suppressed by setting the Pilot’s
exit_on_error
attribute toFalse
.We could use loop.add_signal_handler() to convert to an exception that we can raise in an appropriate task, but this is probably unnecessary. Moreover, with Python 3.11, we get a sensible signal handling behavior (for SIGINT) with
asyncio.Runner
. Per https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption, we can just make sure that our run time resources will be properly shut down in the event of aasyncio.CancelledError
, including sending appropriate calls to theradical.pilot
framework.- TODO: More concurrency.
The rp.Pilot and raptor task can be separately awaited, and we should allow input data staging to begin as soon as we have enough run time details to do it. We need to clarify which tasks should be in which state to consider the asynchronous context manager to have been successfully “entered”. I expect that success includes a Pilot in state PMGR_ACTIVE, and a raptor Task in state AGENT_EXECUTING, and asyncio.Task handles available for other aspects, like synchronization of metadata and initiation of input data staging. However, we may prefer that the workflow script can continue evaluation on the client side while waiting for the Pilot job, and that we avoid blocking until we absolutely have to (presumably when exiting the dispatching context).
- runtime_shutdown(runtime)[source]
Manage tear down of the RADICAL Pilot Session and resources.
Several aspects of the RP runtime interface use blocking calls. This method should be run in a non-root thread (concurrent.futures.Future) that the event loop can manage as an asyncio-wrapped task.
Overrides
scalems.execution.RuntimeManager
- Parameters:
runtime (RuntimeSession) –
- class scalems.radical.runtime.RuntimeConfiguration[source]
Module configuration information.
See also
scalems.radical.runtime_configuration.configuration()
scalems.radical.runtime.parser
scalems.radical.runtime.RuntimeSession
- __init__(execution_target='local.localhost', rp_resource_params=<factory>, target_venv=None, enable_raptor=False)
- execution_target: str = 'local.localhost'
Platform identifier for the RADCIAL Pilot execution resource.
- rp_resource_params: RPResourceParams
Schema for this member container may not be stable.
- class scalems.radical.session.RuntimeSession[source]
Container for scalems.radical runtime state data.
Use a creation function to provide RuntimeSession with an asyncio event loop. Interact with the RuntimeSession in the main thread whenever possible. Let the RuntimeSession dispatch slow rp UI calls to other threads as needed and appropriate.
Note
There is very little automated error recovery. For examples of expansive checking and re-launching of runtime resources, refer to the fixtures in conftest.py at or before revision 41b965a27c5af9abc115677b738085c35766b5b6.
- __init__(session, *, loop, configuration)[source]
- Parameters:
session (radical.pilot.Session) –
loop (AbstractEventLoop) –
configuration (RuntimeConfiguration) –
- close()[source]
Direct the runtime to shut down and release resources.
Warning
This function may return before resources have been finalized. Follow a call to
close()
withwait_closed()
to give the event loop a chance to cycle.
- pilot()[source]
Get active Pilot.
Allows lazy initialization of the Pilot resource.
- Returns:
- The current Pilot instance, if available and valid,
or a new Pilot instance in the configured PilotManager.
- Return type:
- Raises:
APIError – for invalid RP Session configuration.
- pilot_manager() PilotManager | None [source]
- pilot_manager(pilot_manager: str) PilotManager
- pilot_manager(pilot_manager: PilotManager) PilotManager
Get (optionally set) the current PilotManager.
- Parameters:
pilot_manager (optional, radical.pilot.PilotManager, str) – Set to RP PilotManager instance or identifier, if provided.
- Returns:
instance, if set, else
None
.- Return type:
- Raises:
ValueError – for invalid identifier.
APIError – for invalid RP Session configuration.
- task_manager() TaskManager | None [source]
- task_manager(task_manager: str) TaskManager
- task_manager(task_manager: TaskManager) TaskManager
Get (optionally set) the current TaskManager.
- Parameters:
task_manager (optional, radical.pilot.TaskManager, str) – Set to RP TaskManager instance or identifier, if provided.
- Returns:
instance, if set, else
None
.- Return type:
- Raises:
ValueError – for invalid identifier.
APIError – for invalid RP Session configuration.
- async wait_closed()[source]
Wait for a closing session to be closed.
Use with
close()
to allow the asyncio event loop to resolve outstanding tasks.TODO: Do we need this? This method may not be necessary. Or it may be more necessary in the future.
- resources: Task[dict] | None = None
The active Pilot resources, if any.
The runtime_startup routine schedules a Task to get a copy of the Pilot.resource_details[‘rm_info’] dictionary, once the Pilot reaches state PMGR_ACTIVE.
- property session: radical.pilot.Session
The current radical.pilot.Session (may already be closed).
- async scalems.radical.task.rp_task(rptask)[source]
Mediate between a radical.pilot.Task and an asyncio.Future.
Schedule an asyncio Task to receive the result of the RP Task. The asyncio Task must also make sure that asyncio cancellation propagates to the rp.Task.cancel, and vice versa.
This function should be awaited immediately to make sure the necessary call-backs get registered. The result will be an asyncio.Task, which should be awaited separately.
Internally, this function provides a call-back to the rp.Task. The call-back provided to RP cannot directly call asyncio.Future methods (such as set_result() or set_exception()) because RP will be making the call from another thread without mediation by the asyncio event loop.
As such, we provide a thread-safe event handler to propagate the RP Task call-back to to this asyncio.Task result. (See
_rp_callback()
andRPFinalTaskState
)Canceling the returned task will cause rptask to be canceled. Canceling rptask will cause this task to be canceled.
- Parameters:
rptask (radical.pilot.Task) – RADICAL Pilot Task that has already been submitted.
- Returns:
A Task that, when awaited, returns the rp.Task instance in its final state.
- Return type:
Task[radical.pilot.Task]
- scalems.radical.task.scalems_callback(fut, *, item)[source]
Propagate the results of a Future to the subscribed item.
Partially bind item to use this as the argument to fut.add_done_callback().
Warning
In the long run, we should not extend the life of the reference returned by edit_item, and we need to consider the robust way to publish item results.
Note
This is not currently RP-specific and we should look at how best to factor results publishing for workflow items. It may be that this function is the appropriate place to convert RP Task results to scalems results.
- Parameters:
fut (Future) –
item (Task) –
- async scalems.radical.task.submit(*, item, task_manager, pre_exec, raptor_id=None)[source]
Dispatch a WorkflowItem to be handled by RADICAL Pilot.
Submits an rp.Task and returns an asyncio.Task watcher for the submitted task.
Creates a Future, registering a done_callback to publish the task result with item.set_result().
A callback is registered with the rp.Task to set an Event on completion. An asyncio.Task watcher task monitors the Event(s) and gets a reference to an asyncio.Future through which results can be published to the scalems workflow item. (Currently the Future is created in this function, but should probably be acquired directly from the item itself.) The watcher task waits for the rp.Task finalization event or for the Future to be cancelled. Periodically, the watcher task “wakes up” to check if something has gone wrong, such as the rp.Task completing without setting the finalization event.
Caveats
There is an unavoidable race condition in the check performed by the watcher task. We don’t know how long after an rp.Task completes before its callbacks will run and we can’t check whether the callback has been scheduled. The watcher task cannot be allowed to complete successfully until we know that the callback has either run or will never run.
The delay between the rp.Task state change and the callback execution should be less than a second. We can allow the watcher to wake up occasionally (on the order of minutes), so we can assume that it will never take more than a full iteration of the waiting loop for the callback to propagate, unless there is a bug in RP. For simplicity, we can just note whether
rptask.state in rp.FINAL
before the watcher goes to sleep and raise an error if the callback is not triggered in an iteration where we have set such a flag.If our watcher sends a cancellation to the rp.Task, there is no need to continue to monitor the rp.Task state and the watcher may exit.
- Parameters:
item (Task) – The workflow item to be submitted
task_manager (radical.pilot.TaskManager) – A
radical.pilot.TaskManager
instance through which the task should be submitted.pre_exec (list) –
radical.pilot.Task.pre_exec
prototype.raptor_id (str) – The string name of the “scheduler,” corresponding to the UID of a Task running a rp.raptor.Master (if Raptor is enabled).
- Returns:
a “Future[rp.Task]” for a rp.Task in its final state.
- Return type:
The caller must await the result of the coroutine to obtain an asyncio.Task that can be cancelled or awaited as a proxy to direct RP task management. The Task will hold a coroutine that is guaranteed to already be running, failed, or canceled. The caller should check the status of the task immediately before making assumptions about whether a Future has been successfully bound to the managed workflow item.
The returned asyncio.Task can be used to cancel the rp.Task (and the Future) or to await the RP.Task cleanup.
To submit tasks as a batch, await an array of submit() results in the same dispatching context. (TBD)
scalems.radical.raptor
Support for scalems on radical.pilot.raptor.
Define the connective tissue for SCALE-MS tasks embedded in rp.Task arguments.
Provide the RP Raptor and Worker details. Implement the raptor task script as a package entry point that we expect to be executable from the command line in a virtual environment where the scalems package is installed, without further modification to the PATH.
The client should be reasonably certain that the target environment has a
compatible installation of RP and scalems. A rp.raptor.Master task script is
installed with the scalems package. The script name is provide by the
module function raptor_script()
, and will
be resolvable on the PATH for a Python interpreter process in an
environment that has the scalems
package installed.
We should try to keep this module as stable as possible so that the run time interface provided by scalems to RP is robust, and the entry point scripts change as little as possible across versions. scalems.radical runtime details live in runtime.py.
As of radical.pilot
version 1.18, TaskDescription supports
a mode field and several additional fields. The extended schema for the TaskDescription
depends on the value of mode. The structured data provided to the executor callable
is composed from these additional fields according to the particular mode.
We use the radical.pilot.task_description.TASK_FUNCTION
mode,
specifying a function field to name a callable in the (global) namespace
accessible by the worker process.
We populate the worker global namespace with imported callables in the module
file from which Raptor imports our radical.pilot.raptor.MPIWorker
subclass,
ScaleMSWorker
.
The callable for call accepts *args
and **kwargs
, which are extracted
from fields in the TaskDescription. Since we derive from MPIWorker, an mpi4py
communicator is inserted at args[0]
.
Protocol
As of RP 1.18, the radical.pilot.raptor
interface is not documented.
The following diagrams illustrate the approximate architecture of a Raptor Session.
A “raptor task” is an executable task (mode = rp.RAPTOR_MASTER
)
in which the script named by
radical.pilot.TaskDescription.executable
manages the life cycle of a radical.pilot.raptor.Master
(or subclass instance).
As of RP 1.14, the protocol is as follows.
The cfg argument to radical.pilot.raptor.Master
does not currently
appear to be required.
scalems encodes worker requirements on the client side. The scalems raptor
script decodes the client-provided requirements, combines the information with
run time details and work load inspection, and produces the WorkerDescription
with which to submit_workers()
.
TODO
Pass input and output objects more efficiently.
Worker processes come and go, and are not descended from Raptor processes (which may not even be on the same node), so we can’t generally pass objects by reference or even in shared memory segments. However, we can use optimized ZMQ facilities for converting network messages directly to/from memory buffers.
In other cases, we can use the scalems typing system to specialize certain types for optimized (de)serialization from/to file-backed storage or streams.
TODO
Get a stronger specification of the RP Raptor interface.
See https://github.com/radical-cybertools/radical.pilot/issues/2731
raptor task
scalems
specialization of the “raptor” component in the
radical.pilot.raptor
federated scheduling protocol.
- scalems.radical.raptor.raptor()[source]
Entry point for raptor.Master task.
This function implements the scalems_raptor entry point script called by the RADICAL Pilot executor to provide the raptor raptor task.
During installation, the
scalems
build system creates ascalems_raptor
entry point script that provides command line access to this function. The Python signature takes no arguments because the function processes command line arguments fromsys.argv
- async scalems.radical.raptor.raptor_input(*, filestore)[source]
Provide the input file for a SCALE-MS Raptor script.
The resulting configuration file is staged with the Raptor scheduler task and provided as a command line argument. Most information can be provided after the Raptor starts (through CPI commands), moving forward, but we retain this input mechanism for now.
Produces a file containing a serialized
RaptorConfiguration
.- Parameters:
filestore (FileStore) – (local) FileStore that will manage the generated AbstractFileReference.
- Return type:
- scalems.radical.raptor.worker_requirements(*, pre_exec, ranks_per_worker, cores_per_rank=1, gpus_per_rank=0)[source]
Get the requirements for the work load, as known to the client.
- Parameters:
ranks_per_worker (int) –
cores_per_rank (int) –
gpus_per_rank (SupportsFloat) –
- Return type:
- class scalems.radical.raptor.RaptorConfiguration[source]
Input to the script responsible for the RP raptor.
Produced on the client side with
raptor_input
. Deserialized in the raptor task (raptor
) to get aRaptorWorkerConfig
.- __init__(versioned_modules)
- class scalems.radical.raptor.ClientWorkerRequirements[source]
Client-side details to inform worker provisioning.
This structure is part of the
scalems.radical.raptor.RaptorConfiguration
provided to the raptor script. The raptor script uses this information when callingworker_description()
.Use the
worker_requirements()
creation function for a stable interface.- __init__(cores_per_process, cpu_processes, gpus_per_rank=0, pre_exec=(), named_env=None)
- cores_per_process: int
Number CPU cores reserved per process (or MPI rank) for threading or child processes.
- class scalems.radical.raptor.ScaleMSRaptor[source]
Manage a RP Raptor raptor_id target.
Extends
rp.raptor.Master
.We do not submit scalems tasks directly as RP Tasks from the client. We encode scalems tasks as instructions to the Raptor task, which will be decoded and translated to RP Tasks by the
ScaleMSRaptor.request_cb
and self-submitted to theScaleMSWorker
.Results of such tasks are only available through to the
result_cb()
. The Raptor can translate results of generated Tasks into results for the Task carrying the coded instruction, or it can produce data files to stage during or after the Raptor task. Additionally, the Raptor could respond to other special instructions (encoded in later client-originated Tasks) to query or retrieve generated task results.- __init__(configuration)[source]
Initialize a SCALE-MS Raptor.
Verify the environment. Perform some scalems-specific set up and initialize the base class details.
- Parameters:
configuration (RaptorConfiguration) –
- configure_worker(requirements)[source]
Scoped temporary module file for raptor worker.
Write and return the path to a temporary Python module. The module imports
ScaleMSWorker
into its module namespace so that the file and class can be used in the worker description forrp.raptor.Master.submit_workers()
- Parameters:
requirements (ClientWorkerRequirements) –
- Return type:
Generator[list[scalems.radical.raptor.WorkerDescription], None, None]
- cpi_finalize(task)[source]
Short-circuit the normal Raptor protocol to finalize a task.
This is an alias for Raptor._result_cb(), but is a public method used in the
CpiCommand.launch
method forControl
commands, which do not callScaleMSRaptor.submit_tasks()
.- Parameters:
task (TaskDictionary) –
- request_cb(tasks)[source]
Allows all incoming requests to be processed by the Raptor.
RADICAL guarantees that
request_cb()
calls are made sequentially in a single thread. The implementation does not need to be thread-safe or reentrant. (Ref: https://github.com/radical-cybertools/radical.utils/blob/master/src/radical/utils/zmq/queue.py#L386)RADICAL does not guarantee that Tasks are processed in the same order in which they are submitted by the client.
If overridden, request_cb() must return a
list
. The returned list is interpreted to be requests that should be processed normally after the callback. This allows subclasses of rp.raptor.Master to add or remove requests before they become Tasks. The returned list is submitted with self.submit_tasks() by the base class after the callback returns.A Raptor may call self.submit_tasks() to self-submit items (e.g. instead of or in addition to manipulating the returned list in request_cb()).
It is the developer’s responsibility to choose unique task IDs (uid) when crafting items for Raptor.submit_tasks().
- Parameters:
tasks (Sequence[TaskDictionary]) –
- Return type:
- result_cb(tasks)[source]
SCALE-MS specific handling of completed tasks.
We perform special handling for two types of tasks. 1. Tasks submitted throughcompleted by the collaborating Worker(s). 2. Tasks intercepted and resolved entirely by the Raptor (CPI calls).
- Parameters:
tasks (Sequence[TaskDictionary]) –
worker task
scalems
specialization of the “worker” component in the
radical.pilot.raptor
federated scheduling protocol.
- scalems.radical.raptor.worker_description(*, named_env, worker_file, cores_per_process=None, cpu_processes=None, gpus_per_rank=None, pre_exec=(), environment=None)[source]
Get a worker description for ScaleMSRaptor.submit_workers().
- Parameters:
cores_per_process (int, optional) – See
radical.pilot.TaskDescription.cores_per_rank
cpu_processes (int, optional) – See
radical.pilot.TaskDescription.ranks
environment (dict, optional) – Environment variables to set in the Worker task.
gpus_per_rank (float, optional) – See
radical.pilot.TaskDescription.gpus_per_rank
named_env (str) – Python virtual environment registered with
radical.pilot.Pilot.prepare_env
(currently ignored. see #90).pre_exec (list[str]) – Shell command lines for preparing the worker environment.
worker_file (str) – Standalone module from which to import ScaleMSWorker.
- Return type:
worker_file is generated for the raptor script by the caller. See
scalems.radical.raptor
The uid for the Worker task is defined by the ScaleMSRaptor.submit_workers().
- class scalems.radical.raptor.ScaleMSWorker[source]
Specialize the Raptor MPI Worker for scalems dispatching of serialised work.
scalems tasks encode the importable function and inputs in the arguments to the
run_in_worker
dispatching function, which is available to theScaleMSWorker
instance.- run_in_worker(*, work_item, comm=None)[source]
Unpack and run a task requested through RP Raptor.
Satisfies RaptorTaskExecutor protocol. Named as the function for rp.TASK_FUNCTION mode tasks generated by scalems.
This function MUST be imported and referentiable by its __qualname__ from the scope in which ScaleMSWorker methods execute. (See ScaleMSRaptor._worker_file)
- Parameters:
comm (mpi4py.MPI.Comm, optional) – MPI communicator to be used for the task.
work_item (ScalemsRaptorWorkItem) – dictionary of encoded function call from kwargs in the TaskDescription.
See also
CpiAddItem.launch()
task handling
scalems
instructions are embedded in TaskDescriptions with the
scalems.radical.raptor.CPI_MESSAGE
mode,
using the metadata
field for the payload.
Executable work is re-encoded by ScaleMSRaptor
for ScaleMSWorker
(using a ScalemsRaptorWorkItem
in the work_item field of the kwargs of
a TASK_FUNCTION
mode Task)
to be dispatched through scalems
machinery in the Worker process.
- scalems.radical.raptor.api_name
Key for dispatching raptor Requests.
We can use this to identify the schema used for SCALE-MS tasks encoded in arguments to raptor
TASK_FUNCTION
mode executor.
- scalems.radical.raptor.CPI_MESSAGE
Flag for scalems messages to be treated as CPI calls.
Used in the
mode
field to indicate that the object should be handled through the SCALEMS Compute Provider Interface machinery.
- class scalems.radical.raptor.ScalemsRaptorWorkItem[source]
Encode the function call to implement a task in the workflow.
Parameter type for the
run_in_worker
function work_item argument.This structure must be trivially serializable (by
msgpack
) so that it can be passed in a TaskDescription.kwargs field. Consistency with higher levelscalems.serialization
is not essential, but can be pursued in the future.The essential role is to represent an importable callable and its arguments. At run time, the dispatching function (
run_in_worker
) conceivably has access to module scoped state, but does not have direct access to the Worker (or any resources other than thempi4py.MPI.Comm
object). Therefore, abstract references to input data should be resolved at the Raptor in terms of the expected Worker environment before preparing and submitting the Task.TODO: Evolve this to something sufficiently general to be a scalems.WorkItem.
- TODO: Clarify the translation from an abstract representation of work in terms
of the workflow record to the concrete representation with literal values and local filesystem paths.
- TODO: Record extra details like implicit filesystem interactions,
environment variables, etc. TBD whether they belong in a separate object.
- comm_arg_name: str | None
Identify how to provide an MPI communicator to func, if at all.
If comm_arg_name is not None, the callable will be provided with the MPI communicator. If comm_arg_name is an empty string, the communicator is provided as the first positional argument. Otherwise, comm_arg_name must be a valid key word argument by which to pass the communicator to func.
compatibility helpers
These classes are not formal types, but are used to represent (untyped)
interfaces in radical.pilot.raptor
.
- scalems.radical.raptor.RaptorWorkerConfig
Client-specified Worker requirements.
Used by raptor script when calling
worker_description()
. Created internally with_configure_worker()
.
- class scalems.radical.raptor.WorkerDescription[source]
Worker description.
See also
submit_workers()
https://github.com/radical-cybertools/radical.pilot/issues/2731
- class scalems.radical.raptor.TaskDictionary[source]
Task representations seen by request_cb and result_cb.
Other fields may be present, but the objects in the sequences provided to
scalems.radical.raptor.ScaleMSRaptor.request_cb()
andscalems.radical.raptor.ScaleMSRaptor.result_cb()
have the following fields. Result fields will not be populated until the Task runs.For the expected fields, see the source code for
as_dict()
: https://radicalpilot.readthedocs.io/en/stable/_modules/radical/pilot/task.html#Task.as_dict- description: _RaptorTaskDescription
Encoding of the original task description.
- class scalems.radical.raptor._RaptorTaskDescription[source]
Describe a Task to be executed through a Raptor Worker.
A specialization of
radical.pilot.TaskDescription
.Note the distinctions of a TaskDescription to be processed by a raptor.Master.
The meaning or utility of some fields is dependent on the values of other fields.
- TODO: We need some additional fields, like environment and fields related
to launch method and resources reservation. Is the schema for rp.TaskDescription sufficiently strong and well-documented that we can remove this hinting type? (Check again at RP >= 1.19)
- args: list
For
rp.TASK_FUNCTION
mode, list of positional arguments provided to the executor function.
- function: str
Executor for
rp.TASK_FUNCTION
mode.Names the callable for dispatching.
The callable can either be a function object present in the namespace of the interpreter launched by the Worker for the task, or a
radical.pilot.pytask.PythonTask
pickled function object.
- kwargs: dict
For
rp.TASK_FUNCTION
mode, a dictionary of key word arguments provided to the executor function.
- metadata: str | int | float | bool | None | Mapping[str, Encodable] | List[Encodable]
An arbitrary user-provided payload.
May be any type that is encodable by
msgpack
(i.e. built-in types).
- mode: str
The executor mode for the Worker to use.
For
rp.TASK_FUNCTION
mode, either function or method must name a task executor. Depending on the Worker (sub)class, resources such as anmpi4py.MPI.Comm
will be provided as the first positional argument to the executor.*args
and**kwargs
will be provided to the executor from the corresponding fields.See also
RaptorTaskExecutor