scalems.radical execution

User interface is documented at scalems.radical.

execution module interface

scalems.radical.configuration(*args, **kwargs)[source]

Get (and optionally set) the RADICAL runtime configuration.

With no arguments, returns the current configuration. If a configuration has not yet been set, the command line parser is invoked to try to build a new configuration.

If arguments are provided, try to construct a scalems.radical.runtime.Configuration and use it to initialize the module.

It is an error to try to initialize the module more than once.

Return type:

Configuration

scalems.radical.workflow_manager(loop, directory=None)[source]

Manage a workflow context for RADICAL Pilot work loads.

The rp.Session is created when the Python Context Manager is “entered”, so the asyncio event loop must be running before then.

To help enforce this, we use an async Context Manager, at least in the initial implementation. However, the implementation is not thread-safe. It is not reentrant, but this is not checked. We probably _do_ want to constrain ourselves to zero or one Sessions per environment, but we _do_ need to support multiple Pilots and task submission scopes (_resource requirement groups). Further discussion is welcome.

Warning

The importer of this module should be sure to import radical.pilot before importing the built-in logging module to avoid spurious warnings.

Parameters:

loop (AbstractEventLoop) –

scalems.radical.runtime support module

Manage the RADICAL Pilot start-up and shut-down.

The provided Runtime class encapsulates stateful resources that, once acquired, should be shut down explicitly. Runtime instances may be used as context managers to ensure the proper protocol is followed, or the caller can take responsibility for calling Runtime.close() to shut down.

Note: Consider whether Runtime context manager is reentrant or multi-use.

The Runtime state encapsulates several nested states. A Session, TaskManager, and PilotManager must be created for the Runtime to be usable. Additionally, Pilots and scheduler tasks may be added or removed during the Runtime lifetime. To better support the alternate scenarios when a Runtime instance may be provided to a scalems.radical component in an arbitrary state, consider making the with block scoped, such that it only returns the Runtime instance to its initial state when exiting, rather than shutting down completely. See also https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack

Deferred:

Runtime can avoid providing direct access to RP interface, and instead run an entire RP Session state machine in a thread (separate from the asyncio event loop thread), relaying RP scripting commands through queues, in order to completely prevent misuse and to insulate the asyncio event loop from blocking RP commands. We need to get a better sense of the RP flow combinatorics before we can reasonably pursue this.

title scalems on radical.pilot run time

box "SCALE-MS framework" #honeydew
participant WorkflowManager as client_workflowmanager
participant RuntimeManager
participant "runner task" as scalems.execution
end box
box "SCALE-MS RP adapter" #linen
participant Runtime as client_runtime
participant Executor as client_executor
end box

autoactivate on

client_workflowmanager -> client_executor: async with executor
activate client_workflowmanager
client_executor -> RuntimeManager: ~__aenter__()
RuntimeManager -> client_executor: runtime_configuration()
RuntimeManager -> client_executor: runtime_startup()

client_executor -> : rp.Session()
return
client_executor -> client_runtime **: Session
activate client_runtime
client_executor -> : rp.PilotManager()
return
client_executor -> client_runtime: pilot_manager()
return
client_executor -> : rp.TaskManager()
return
client_executor -> client_runtime: task_manager()
return
client_executor -> : pilot_manager.submit_pilots()
return
client_executor -> client_runtime: pilot()
note left
Pilot venv is determined by resource definition (JSON file).
end note
return

group ref [scalems.radical.raptor]
client_executor -> client_executor: get_scheduler()
return
end

client_executor -> client_runtime: set scheduler
return

client_executor -> scalems.execution **: create_task(manage_execution)
client_executor -> scalems.execution: await runner_started
RuntimeManager <-- client_executor: set runner_task
deactivate client_executor
deactivate RuntimeManager

client_workflowmanager -> client_workflowmanager #gray: async with dispatcher

...Raptor workload handling...

return leave dispatcher context

== Shut down runtime ==

client_executor -> RuntimeManager: ~__aexit__()
RuntimeManager -> RuntimeManager: await runner_task
RuntimeManager <-- scalems.execution
deactivate RuntimeManager
RuntimeManager -> client_executor: runtime_shutdown()

client_runtime <- client_executor
return session
client_runtime <- client_executor
return scheduler
group Cancel Master task [if scheduler is not None]
client_runtime <- client_executor
return task_manager
client_executor -> : task_manager.cancel_tasks()
return
client_runtime <- client_executor
return scheduler
client_executor -> : runtime.scheduler.wait()
return
end
client_executor -> : session.close()
return

client_executor -> client_runtime !!
deactivate client_executor
deactivate RuntimeManager

client_workflowmanager <-- client_executor: leave executor context
deactivate client_workflowmanager

scalems.radical.runtime.executor_factory(manager, params=None)[source]
Parameters:
class scalems.radical.runtime.RPDispatchingExecutor[source]

Client side manager for work dispatched through RADICAL Pilot.

Configuration points:

  • resource config

  • pilot config

  • session config?

Extends scalems.execution.RuntimeManager

runtime: scalems.radical.runtime.Runtime

See scalems.execution.RuntimeManager.runtime

__init__(*, editor_factory=None, datastore=None, loop, configuration, dispatcher_lock=None)[source]

Create a client side execution manager.

Warning

The creation method does not fully initialize the instance.

Initialization and de-initialization occurs through the Python (async) context manager protocol.

Parameters:
runtime_configuration()[source]

Provide scoped Configuration.

Merge the runtime manager’s configuration with the global configuration, update the global configuration, and yield the configuration for a with block.

Restores the previous global configuration when exiting the with block.

Warning

Not thread-safe.

We do not check for re-entrance, which will cause race conditions w.r.t. which Context state is restored! Moreover, the Configuration object is not currently hashable and does not have an equality test defined.

This contextmanager is not async, but it could be (and is) used within an asynchronous context manager, so we don’t have anything structurally prohibiting reentrant calls, even without multithreading.

async runtime_startup()[source]

Establish the RP Session.

Acquire a maximally re-usable set of RP resources. The scope established by this function is as broad as it can be within the life of the workflow manager.

Once instance.runtime_startup() succeeds, instance.runtime_shutdown() must be called to clean up resources. Use the async context manager behavior of the instance to automatically follow this protocol. I.e. instead of calling instance.runtime_startup(); ...; instance.runtime_shutdown(), use:

async with instance:
    ...
Raises:
Return type:

Task

TODO: More concurrency.

The rp.Pilot and raptor task can be separately awaited, and we should allow input data staging to begin as soon as we have enough run time details to do it. We need to clarify which tasks should be in which state to consider the asynchronous context manager to have been successfully “entered”. I expect that success includes a Pilot in state PMGR_ACTIVE, and a raptor Task in state AGENT_EXECUTING, and asyncio.Task handles available for other aspects, like synchronization of metadata and initiation of input data staging. However, we may prefer that the workflow script can continue evaluation on the client side while waiting for the Pilot job, and that we avoid blocking until we absolutely have to (presumably when exiting the dispatching context).

static runtime_shutdown(runtime)[source]

Manage tear down of the RADICAL Pilot Session and resources.

Several aspects of the RP runtime interface use blocking calls. This method should be run in a non-root thread (concurrent.futures.Future) that the event loop can manage as an asyncio-wrapped task.

Overrides scalems.execution.RuntimeManager

Parameters:

runtime (Runtime) –

class scalems.radical.runtime.Configuration[source]

Module configuration information.

See also

__init__(datastore=None, execution_target='local.localhost', rp_resource_params=<factory>, target_venv=None, enable_raptor=False)
Parameters:
  • datastore (FileStore) –

  • execution_target (str) –

  • rp_resource_params (dict) –

  • target_venv (str) –

  • enable_raptor (bool) –

Return type:

None

datastore: FileStore = None
enable_raptor: bool = False
execution_target: str = 'local.localhost'
rp_resource_params: dict
target_venv: str = None
class scalems.radical.runtime.Runtime[source]

Container for scalems.radical runtime state data.

Note

This class is almost exclusively a container. Lifetime management is assumed to be handled externally.

__init__(session)[source]
Parameters:

session (radical.pilot.Session) –

pilot() Pilot | None[source]
pilot(pilot: str) Pilot
pilot(pilot: Pilot) Pilot

Get (optionally set) the current Pilot.

Parameters:

pilot (radical.pilot.Pilot, str, None) – Set to RP Pilot instance or identifier, if provided.

Returns:

instance, if set, else None

Return type:

radical.pilot.Pilot

Raises:
  • ValueError – for invalid identifier.

  • APIError – for invalid RP Session configuration.

pilot_manager() PilotManager | None[source]
pilot_manager(pilot_manager: str) PilotManager
pilot_manager(pilot_manager: PilotManager) PilotManager

Get (optionally set) the current PilotManager.

Parameters:

pilot_manager (optional, radical.pilot.PilotManager, str) – Set to RP PilotManager instance or identifier, if provided.

Returns:

instance, if set, else None.

Return type:

radical.pilot.PilotManager

Raises:
  • ValueError – for invalid identifier.

  • APIError – for invalid RP Session configuration.

reset(session)[source]

Reset the runtime state.

Close any existing resources and revert to a new Runtime state containing only the provided session.

Parameters:

session (radical.pilot.Session) –

task_manager() TaskManager | None[source]
task_manager(task_manager: str) TaskManager
task_manager(task_manager: TaskManager) TaskManager

Get (optionally set) the current TaskManager.

Parameters:

task_manager (optional, radical.pilot.TaskManager, str) – Set to RP TaskManager instance or identifier, if provided.

Returns:

instance, if set, else None.

Return type:

radical.pilot.TaskManager

Raises:
  • ValueError – for invalid identifier.

  • APIError – for invalid RP Session configuration.

resources: Task[dict] | None = None

The active Pilot resources, if any.

The runtime_startup routine schedules a Task to get a copy of the Pilot.resource_details[‘rm_info’] dictionary, once the Pilot reaches state PMGR_ACTIVE.

scheduler: Task | None = None

The active raptor scheduler task, if any.

property session: radical.pilot.Session

The current radical.pilot.Session (may already be closed).

async scalems.radical.runtime.rp_task(rptask)[source]

Mediate between a radical.pilot.Task and an asyncio.Future.

Schedule an asyncio Task to receive the result of the RP Task. The asyncio Task must also make sure that asyncio cancellation propagates to the rp.Task.cancel, and vice versa.

This function should be awaited immediately to make sure the necessary call-backs get registered. The result will be an asyncio.Task, which should be awaited separately.

Internally, this function provides a call-back to the rp.Task. The call-back provided to RP cannot directly call asyncio.Future methods (such as set_result() or set_exception()) because RP will be making the call from another thread without mediation by the asyncio event loop.

As such, we provide a thread-safe event handler to propagate the RP Task call-back to to this asyncio.Task result. (See _rp_callback() and RPFinalTaskState)

Canceling the returned task will cause rptask to be canceled. Canceling rptask will cause this task to be canceled.

Parameters:

rptask (radical.pilot.Task) – RADICAL Pilot Task that has already been submitted.

Returns:

A Task that, when awaited, returns the rp.Task instance in its final state.

Return type:

Task[radical.pilot.Task]

scalems.radical.runtime.scalems_callback(fut, *, item)[source]

Propagate the results of a Future to the subscribed item.

Partially bind item to use this as the argument to fut.add_done_callback().

Warning

In the long run, we should not extend the life of the reference returned by edit_item, and we need to consider the robust way to publish item results.

Note

This is not currently RP-specific and we should look at how best to factor results publishing for workflow items. It may be that this function is the appropriate place to convert RP Task results to scalems results.

Parameters:
  • fut (Future) –

  • item (Task) –

async scalems.radical.runtime.submit(*, item, task_manager, pre_exec, scheduler=None)[source]

Dispatch a WorkflowItem to be handled by RADICAL Pilot.

Submits an rp.Task and returns an asyncio.Task watcher for the submitted task.

Creates a Future, registering a done_callback to publish the task result with item.set_result().

A callback is registered with the rp.Task to set an Event on completion. An asyncio.Task watcher task monitors the Event(s) and gets a reference to an asyncio.Future through which results can be published to the scalems workflow item. (Currently the Future is created in this function, but should probably be acquired directly from the item itself.) The watcher task waits for the rp.Task finalization event or for the Future to be cancelled. Periodically, the watcher task “wakes up” to check if something has gone wrong, such as the rp.Task completing without setting the finalization event.

Caveats

There is an unavoidable race condition in the check performed by the watcher task. We don’t know how long after an rp.Task completes before its callbacks will run and we can’t check whether the callback has been scheduled. The watcher task cannot be allowed to complete successfully until we know that the callback has either run or will never run.

The delay between the rp.Task state change and the callback execution should be less than a second. We can allow the watcher to wake up occasionally (on the order of minutes), so we can assume that it will never take more than a full iteration of the waiting loop for the callback to propagate, unless there is a bug in RP. For simplicity, we can just note whether rptask.state in rp.FINAL before the watcher goes to sleep and raise an error if the callback is not triggered in an iteration where we have set such a flag.

If our watcher sends a cancellation to the rp.Task, there is no need to continue to monitor the rp.Task state and the watcher may exit.

Parameters:
  • item (Task) – The workflow item to be submitted

  • task_manager (radical.pilot.TaskManager) – A radical.pilot.TaskManager instance through which the task should be submitted.

  • pre_exec (list) – radical.pilot.Task.pre_exec prototype.

  • scheduler (str) – The string name of the “scheduler,” corresponding to the UID of a Task running a rp.raptor.Master (if Raptor is enabled).

Returns:

a “Future[rp.Task]” for a rp.Task in its final state.

Return type:

asyncio.Task

The caller must await the result of the coroutine to obtain an asyncio.Task that can be cancelled or awaited as a proxy to direct RP task management. The Task will hold a coroutine that is guaranteed to already be running, failed, or canceled. The caller should check the status of the task immediately before making assumptions about whether a Future has been successfully bound to the managed workflow item.

The returned asyncio.Task can be used to cancel the rp.Task (and the Future) or to await the RP.Task cleanup.

To submit tasks as a batch, await an array of submit() results in the same dispatching context. (TBD)

scalems.radical.raptor

Support for scalems on radical.pilot.raptor.

Define the connective tissue for SCALE-MS tasks embedded in rp.Task arguments.

Provide the RP Raptor Master and Worker details. Implement the master task script as a package entry point that we expect to be executable from the command line in a virtual environment where the scalems package is installed, without further modification to the PATH.

The client should be reasonably certain that the target environment has a compatible installation of RP and scalems. A rp.raptor.Master task script is installed with the scalems package. The script name is provide by the module function master_script(), and will be resolvable on the PATH for a Python interpreter process in an environment that has the scalems package installed.

We should try to keep this module as stable as possible so that the run time interface provided by scalems to RP is robust, and the entry point scripts change as little as possible across versions. scalems.radical runtime details live in runtime.py.

As of radical.pilot version 1.18, TaskDescription supports a mode field and several additional fields. The extended schema for the TaskDescription depends on the value of mode. The structured data provided to the executor callable is composed from these additional fields according to the particular mode.

We use the radical.pilot.task_description.TASK_FUNCTION mode, specifying a function field to name a callable in the (global) namespace accessible by the worker process.

We populate the worker global namespace with imported callables in the module file from which Raptor imports our radical.pilot.raptor.MPIWorker subclass, ScaleMSWorker.

The callable for call accepts *args and **kwargs, which are extracted from fields in the TaskDescription. Since we derive from MPIWorker, an mpi4py communicator is inserted at args[0].

Protocol

As of RP 1.18, the radical.pilot.raptor interface is not documented. The following diagrams illustrate the approximate architecture of a Raptor Session.

title RP Raptor client Session

box "RP client"
participant TaskManager
end box

'queue "Taskmanager: Scheduler and Input Queue" as Queue
queue "Scheduler/Input Queue" as Queue

-> TaskManager: submit(master_task_description)
activate TaskManager

'note left
note over TaskManager
 TaskDescription uses `mode=rp.RAPTOR_MASTER`.
 Task args provide details for the script
 so that a Master can start appropriate Workers.
end note

TaskManager -> Queue: task_description
activate Queue
deactivate TaskManager
Queue <-
Queue -->
deactivate Queue

-> TaskManager: submit(task_description)
activate TaskManager

'note left
note over TaskManager
 TaskDescription names a Master uid in *scheduler* field.
end note

TaskManager -> Queue: task_description
activate Queue
deactivate TaskManager
Queue <-
Queue -->
deactivate Queue
...
TaskManager ->
TaskManager <-- : master task results

A “master task” is an executable task (mode = rp.RAPTOR_MASTER) in which the script named by radical.pilot.TaskDescription.executable manages the life cycle of a radical.pilot.raptor.Master (or subclass instance). As of RP 1.14, the protocol is as follows.

title raptor master task lifetime management

participant "master task"

-> "master task" : stage in

create Master as master
"master task" -> master: create Master(cfg)
"master task" -> master: Master.submit_workers(descr=descr, count=n_workers)
"master task" -> master: Master.start()
alt optional hook for self-submitting additional tasks
"master task" -> master: Master.submit_tasks(tasks)
end
queue scheduler
scheduler -\ master : request_cb
scheduler -\ master : result_cb
master -> master: Master.stop() (optional)
"master task" -> master: Master.join()

[<-- "master task" : stage out

The cfg argument to radical.pilot.raptor.Master does not currently appear to be required.

scalems encodes worker requirements on the client side. The scalems master script decodes the client-provided requirements, combines the information with run time details and work load inspection, and produces the WorkerDescription with which to submit_workers().

class RaptorWorkerConfig {
 count
 descr
}

class WorkerDescription

RaptorWorkerConfig *--> "descr" WorkerDescription

namespace scalems.radical {
 class MasterTaskConfiguration

 class ClientWorkerRequirements

 class ScaleMSMaster

 ScaleMSMaster --> MasterTaskConfiguration : launches with

 MasterTaskConfiguration *-- ClientWorkerRequirements

 ClientWorkerRequirements -right-> .RaptorWorkerConfig : worker_description()
}

namespace radical.pilot.raptor {
 class Master {
  submit_workers()
 }
 Master::submit_workers .> .RaptorWorkerConfig
}

scalems.radical.ScaleMSMaster -up-|> radical.pilot.raptor.Master

Warning

The data structure for descr may still be evolving. See https://github.com/radical-cybertools/radical.pilot/issues/2731

TODO

Pass input and output objects more efficiently.

Worker processes come and go, and are not descended from Master processes (which may not even be on the same node), so we can’t generally pass objects by reference or even in shared memory segments. However, we can use optimized ZMQ facilities for converting network messages directly to/from memory buffers.

In other cases, we can use the scalems typing system to specialize certain types for optimized (de)serialization from/to file-backed storage or streams.

See https://github.com/SCALE-MS/scale-ms/issues/106

TODO

Get a stronger specification of the RP Raptor interface.

See https://github.com/radical-cybertools/radical.pilot/issues/2731

master task

scalems specialization of the “master” component in the radical.pilot.raptor federated scheduling protocol.

scalems.radical.raptor.master()[source]

Entry point for raptor.Master task.

This function implements the scalems_rp_master entry point script called by the RADICAL Pilot executor to provide the raptor master task.

During installation, the scalems build system creates a scalems_rp_master entry point script that provides command line access to this function. The Python signature takes no arguments because the function processes command line arguments from sys.argv

scalems.radical.raptor.master_script()[source]

Get the name of the RP raptor master script.

The script to run a RP Task based on a rp.raptor.Master is installed with :py:mod`scalems`. Installation configures an “entry point” script named scalems_rp_master, but for generality this function should be used to get the entry point name.

Before returning, this function confirms the availability of the entry point script in the current Python environment. A client should arrange for the script to be called in the execution environment and to confirm that the (potentially remote) entry point matches the expected API.

Returns:

Installed name of the entry point wrapper for master()

Return type:

str

async scalems.radical.raptor.master_input(*, filestore, worker_pre_exec, worker_venv)[source]

Provide the input file for a SCALE-MS Raptor Master script.

Parameters:
  • worker_venv (str) – Directory path for the Python virtual environment in which to execute the worker.

  • worker_pre_exec (list[str]) – List of shell command lines to execute in the worker environment before its launch script.

  • filestore (FileStore) – (local) FileStore that will manage the generated AbstractFileReference.

Return type:

AbstractFileReference

scalems.radical.raptor.worker_requirements(*, pre_exec, worker_venv)[source]

Get the requirements for the work load, as known to the client.

TODO: Inspect workflow to optimize reusability of the initial Worker submission.

Parameters:
Return type:

ClientWorkerRequirements

class scalems.radical.raptor.MasterTaskConfiguration[source]

Input to the script responsible for the RP raptor Master.

Produced on the client side with master_input. Deserialized in the master task (master) to get a RaptorWorkerConfig.

__init__(worker, versioned_modules)
Parameters:
Return type:

None

classmethod from_dict(obj)[source]

Decode from a native dict.

Support deserialization, such as from a JSON file in the master task.

Parameters:

obj (_MasterTaskConfigurationDict) –

versioned_modules: List[Tuple[str, str]]

List of name, version specifier tuples for required modules.

worker: ClientWorkerRequirements

Client-side details to inform worker provisioning.

Generated by worker_requirements(). Used to prepare input for worker_description().

class scalems.radical.raptor.ClientWorkerRequirements[source]

Client-side details to inform worker provisioning.

This structure is part of the scalems.radical.raptor.MasterTaskConfiguration provided to the master script. The master script uses this information when calling worker_description().

Use the worker_requirements() creation function for a stable interface.

__init__(cpu_processes, gpus_per_process=0, pre_exec=(), named_env=None)
Parameters:
  • cpu_processes (int) –

  • gpus_per_process (int) –

  • pre_exec (tuple[str]) –

  • named_env (str) –

Return type:

None

cpu_processes: int

Number of ranks in the Worker MPI context.

TODO: We need to account for cores-per-process.

gpus_per_process: int = 0

GPUs per Worker rank.

TODO: Reconcile with upcoming support for fractional ratios.

named_env: str = None

A registered virtual environment known to the raptor manager.

Warning

Not tested. Support has been delayed indefinitely.

pre_exec: tuple[str] = ()

Lines of shell expressions to be evaluated before launching the worker process.

class scalems.radical.raptor.ScaleMSMaster[source]

Manage a RP Raptor scheduler target.

Extends rp.raptor.Master.

We do not submit scalems tasks directly as RP Tasks from the client. We encode scalems tasks as instructions to the Master task, which will be decoded and translated to RP Tasks by the ScaleMSMaster.request_cb and self-submitted to the ScaleMSWorker.

Results of such tasks are only available through to the result_cb(). The Master can translate results of generated Tasks into results for the Task carrying the coded instruction, or it can produce data files to stage during or after the Master task. Additionally, the Master could respond to other special instructions (encoded in later client-originated Tasks) to query or retrieve generated task results.

title raptor Master

queue "Queue" as Queue

box "RP Agent"
participant Scheduler
end box

queue "Master input queue" as master_queue

box "scalems.radical.raptor"
participant ScaleMSMaster
participant rpt.Master
end box

queue "ZMQ Raptor work channel" as channel

activate Queue

Scheduler -> Queue: accepts work
activate Scheduler
Scheduler <-- Queue: task dictionary
deactivate Queue

note over Scheduler
Scheduler gets task from queue,
observes `scheduler` field and routes to Master.
end note

Scheduler -> master_queue: task dictionary
activate master_queue
deactivate Scheduler

note over ScaleMSMaster, rpt.Master
Back end RP queue manager
passes messages to Master callbacks.
end note

rpt.Master -> master_queue: accept Task
activate rpt.Master
rpt.Master <-- master_queue: cpi_task
deactivate master_queue

rpt.Master -> rpt.Master: _request_cb(cpi_task)
activate rpt.Master
rpt.Master -> ScaleMSMaster: request_cb(cpi_task)
activate ScaleMSMaster

ScaleMSMaster -> ScaleMSMaster: CpiCommand.launch()
activate ScaleMSMaster
alt optionally process or update requests
  ScaleMSMaster -> ScaleMSMaster: submit_tasks(scalems_tasks)
  activate ScaleMSMaster
  deactivate ScaleMSMaster
  ScaleMSMaster -> ScaleMSMaster: _result_cb(cpi_task)
  activate ScaleMSMaster
  deactivate ScaleMSMaster
else resolve a Control message
  ScaleMSMaster -> ScaleMSMaster: _result_cb(cpi_task)
  activate ScaleMSMaster
  deactivate ScaleMSMaster
end
return

return filtered tasks

rpt.Master -> rpt.Master: submit_tasks(...)
activate rpt.Master
deactivate rpt.Master
rpt.Master -> channel: send message
deactivate rpt.Master
deactivate rpt.Master

rpt.Master -> channel: accept result
activate rpt.Master
rpt.Master <-- channel: scalems_tasks
deactivate channel

rpt.Master -> rpt.Master: _result_cb(scalems_tasks))
activate rpt.Master
rpt.Master -> ScaleMSMaster: result_cb(scalems_tasks)
activate ScaleMSMaster

ScaleMSMaster -> ScaleMSMaster: CpiCommand.result_hook()

alt optionally process or update pending requests
ScaleMSMaster -> ScaleMSMaster: issue#277
activate ScaleMSMaster
deactivate ScaleMSMaster
end

rpt.Master <-- ScaleMSMaster
deactivate ScaleMSMaster

rpt.Master -> master_queue: send message
activate master_queue
deactivate rpt.Master
deactivate rpt.Master

__init__(configuration)[source]

Initialize a SCALE-MS Raptor Master.

Verify the environment. Perform some scalems-specific set up and initialize the base class details.

Parameters:

configuration (MasterTaskConfiguration) –

configure_worker(requirements)[source]

Scoped temporary module file for raptor worker.

Write and return the path to a temporary Python module. The module imports ScaleMSWorker into its module namespace so that the file and class can be used in the worker description for rp.raptor.Master.submit_workers()

Parameters:

requirements (ClientWorkerRequirements) –

Return type:

Generator[list[scalems.radical.raptor.WorkerDescription], None, None]

cpi_finalize(task)[source]

Short-circuit the normal Raptor protocol to finalize a task.

This is an alias for Master._result_cb(), but is a public method used in the CpiCommand.launch method for Control commands, which do not call Master.submit_tasks().

Parameters:

task (TaskDictionary) –

request_cb(tasks)[source]

Allows all incoming requests to be processed by the Master.

RADICAL guarantees that request_cb() calls are made sequentially in a single thread. The implementation does not need to be thread-safe or reentrant. (Ref: https://github.com/radical-cybertools/radical.utils/blob/master/src/radical/utils/zmq/queue.py#L386)

RADICAL does not guarantee that Tasks are processed in the same order in which they are submitted by the client.

If overridden, request_cb() must return a list. The returned list is interpreted to be requests that should be processed normally after the callback. This allows subclasses of rp.raptor.Master to add or remove requests before they become Tasks. The returned list is submitted with self.submit_tasks() by the base class after the callback returns.

A Master may call self.submit_tasks() to self-submit items (e.g. instead of or in addition to manipulating the returned list in request_cb()).

It is the developer’s responsibility to choose unique task IDs (uid) when crafting items for Master.submit_tasks().

Parameters:

tasks (Sequence[TaskDictionary]) –

Return type:

Sequence[TaskDictionary]

result_cb(tasks)[source]

SCALE-MS specific handling of completed tasks.

We perform special handling for two types of tasks. 1. Tasks submitted throughcompleted by the collaborating Worker(s). 2. Tasks intercepted and resolved entirely by the Master (CPI calls).

Parameters:

tasks (Sequence[TaskDictionary]) –

worker task

scalems specialization of the “worker” component in the radical.pilot.raptor federated scheduling protocol.

scalems.radical.raptor.worker_description(*, named_env, worker_file, cores_per_process=None, cpu_processes=None, gpus_per_process=None, pre_exec=(), environment=None)[source]

Get a worker description for Master.submit_workers().

Parameters:
Return type:

WorkerDescription

worker_file is generated for the master script by the caller. See scalems.radical.raptor

The uid for the Worker task is defined by the Master.submit_workers().

class scalems.radical.raptor.ScaleMSWorker[source]

Specialize the Raptor MPI Worker for scalems dispatching of serialised work.

scalems tasks encode the importable function and inputs in the arguments to the run_in_worker dispatching function, which is available to the ScaleMSWorker instance.

title ScaleMSWorker task dispatching

queue "ZMQ Raptor work channel" as channel

box "scalems.radical.raptor.Worker"
participant ScaleMSWorker
participant rpt.Worker
end box

box "usermodule"
participant usermodule
end box

box "target venv"
end box

rpt.Worker -> channel: pop message
activate rpt.Worker
rpt.Worker <-- channel: rp.TASK_FUNCTION mode Task
rpt.Worker -> rpt.Worker: _dispatch_function()
activate rpt.Worker

alt scalems encoded workload
  rpt.Worker -> ScaleMSWorker: run_in_worker()
  activate ScaleMSWorker
  ScaleMSWorker -> ScaleMSWorker: unpack encoded function call
  ScaleMSWorker -> ScaleMSWorker: from usermodule import func
  ScaleMSWorker -> usermodule: ""func(*args, **kwargs)""
  activate usermodule
  ScaleMSWorker <-- usermodule
  deactivate usermodule
  return
else pickled rp.PythonTask workload (not implemented)
  rpt.Worker -> rpt.Worker: unpickled scalems handler
  activate rpt.Worker
  rpt.Worker -> usermodule: ""func(*args, **kwargs)""
  activate usermodule
  rpt.Worker <-- usermodule
  deactivate usermodule
  rpt.Worker --> rpt.Worker: {out, err, ret, value}
  deactivate rpt.Worker
end

deactivate rpt.Worker
rpt.Worker -> channel: put result
deactivate rpt.Worker

run_in_worker(*, work_item, comm=None)[source]

Unpack and run a task requested through RP Raptor.

Satisfies RaptorTaskExecutor protocol. Named as the function for rp.TASK_FUNCTION mode tasks generated by scalems.

This function MUST be imported and referentiable by its __qualname__ from the scope in which ScaleMSWorker methods execute. (See ScaleMSMaster._worker_file)

Parameters:
  • comm (mpi4py.MPI.Comm, optional) – MPI communicator to be used for the task.

  • work_item (ScalemsRaptorWorkItem) – dictionary of encoded function call from kwargs in the TaskDescription.

See also

CpiAddItem.launch()

task handling

scalems instructions are embedded in TaskDescriptions with the scalems.radical.raptor.CPI_MESSAGE mode, using the metadata field for the payload. Executable work is re-encoded by ScaleMSMaster for ScaleMSWorker (using a ScalemsRaptorWorkItem in the work_item field of the kwargs of a TASK_FUNCTION mode Task) to be dispatched through scalems machinery in the Worker process.

scalems.radical.raptor.api_name

Key for dispatching raptor Requests.

We can use this to identify the schema used for SCALE-MS tasks encoded in arguments to raptor TASK_FUNCTION mode executor.

scalems.radical.raptor.CPI_MESSAGE

Flag for scalems messages to be treated as CPI calls.

Used in the mode field to indicate that the object should be handled through the SCALEMS Compute Provider Interface machinery.

class scalems.radical.raptor.ScalemsRaptorWorkItem[source]

Encode the function call to implement a task in the workflow.

Parameter type for the run_in_worker function work_item argument.

This structure must be trivially serializable (by msgpack) so that it can be passed in a TaskDescription.kwargs field. Consistency with higher level scalems.serialization is not essential, but can be pursued in the future.

The essential role is to represent an importable callable and its arguments. At run time, the dispatching function (run_in_worker) conceivably has access to module scoped state, but does not have direct access to the Worker (or any resources other than the mpi4py.MPI.Comm object). Therefore, abstract references to input data should be resolved at the Master in terms of the expected Worker environment before preparing and submitting the Task.

TODO: Evolve this to something sufficiently general to be a scalems.WorkItem.

TODO: Clarify the translation from an abstract representation of work in terms

of the workflow record to the concrete representation with literal values and local filesystem paths.

TODO: Record extra details like implicit filesystem interactions,

environment variables, etc. TBD whether they belong in a separate object.

args: list

Positional arguments for func.

comm_arg_name: str | None

Identify how to provide an MPI communicator to func, if at all.

If comm_arg_name is not None, the callable will be provided with the MPI communicator. If comm_arg_name is an empty string, the communicator is provided as the first positional argument. Otherwise, comm_arg_name must be a valid key word argument by which to pass the communicator to func.

func: str

A callable to be retrieved as an attribute in module.

kwargs: dict

Key word arguments for func.

module: str

The qualified name of a module importable by the Worker.

compatibility helpers

These classes are not formal types, but are used to represent (untyped) interfaces in radical.pilot.raptor.

scalems.radical.raptor.RaptorWorkerConfig

Client-specified Worker requirements.

Used by master script when calling worker_description(). Created internally with _configure_worker().

class scalems.radical.raptor.WorkerDescription[source]

Worker description.

class scalems.radical.raptor.TaskDictionary[source]

Task representations seen by request_cb and result_cb.

Other fields may be present, but the objects in the sequences provided to scalems.radical.raptor.ScaleMSMaster.request_cb() and scalems.radical.raptor.ScaleMSMaster.result_cb() have the following fields. Result fields will not be populated until the Task runs.

For the expected fields, see the source code for as_dict(): https://radicalpilot.readthedocs.io/en/stable/_modules/radical/pilot/task.html#Task.as_dict

description: _RaptorTaskDescription

Encoding of the original task description.

exception: str

Exception type name and message.

exception_detail: str

Full exception details with stack trace.

exit_code: int

Task return code.

return_value: Any

Function return value.

Refer to the RaptorTaskExecutor Protocol.

state: str

RADICAL Pilot Task state.

stderr: str

Task standard error.

stdout: str

Task standard output.

target_state: str

State to which the Task should be advanced.

Valid values are string constants from radical.pilot.states.

Used internally, such as in Master._result_cb().

uid: str

Canonical identifier for the Task.

Note that uid may be omitted from the original TaskDescription.

class scalems.radical.raptor._RaptorTaskDescription[source]

Describe a Task to be executed through a Raptor Worker.

A specialization of radical.pilot.TaskDescription.

Note the distinctions of a TaskDescription to be processed by a raptor.Master.

The meaning or utility of some fields is dependent on the values of other fields.

TODO: We need some additional fields, like environment and fields related

to launch method and resources reservation. Is the schema for rp.TaskDescription sufficiently strong and well-documented that we can remove this hinting type? (Check again at RP >= 1.19)

args: list

For rp.TASK_FUNCTION mode, list of positional arguments provided to the executor function.

executable: str

Unused by Raptor tasks.

function: str

Executor for rp.TASK_FUNCTION mode.

Names the callable for dispatching.

The callable can either be a function object present in the namespace of the interpreter launched by the Worker for the task, or a radical.pilot.pytask.PythonTask pickled function object.

kwargs: dict

For rp.TASK_FUNCTION mode, a dictionary of key word arguments provided to the executor function.

metadata: str | int | float | bool | None | Mapping[str, Encodable] | List[Encodable]

An arbitrary user-provided payload.

May be any type that is encodable by msgpack (i.e. built-in types).

mode: str

The executor mode for the Worker to use.

For rp.TASK_FUNCTION mode, either function or method must name a task executor. Depending on the Worker (sub)class, resources such as an mpi4py.MPI.Comm will be provided as the first positional argument to the executor. *args and **kwargs will be provided to the executor from the corresponding fields.

See also

scheduler: str

The UID of the raptor.Master scheduler task.

This field is relevant to tasks routed from client TaskManagers. It is not used for tasks originating in master tasks.

uid: str

Unique identifier for the Task across the Session.