scalems.radical execution

User interface is documented at scalems.radical.

execution module interface

scalems.radical.configuration(*args, **kwargs)[source]

Get a RADICAL runtime configuration.

Accepts a single argparse.Namespace argument (see parser()) or key word arguments. With no arguments, the command line parser is invoked to try to build a new configuration.

If key word arguments are provided, try to construct a scalems.radical.runtime.RuntimeConfiguration.

See also

current_configuration() retrieves the configuration for an active RuntimeManager, if any.

Return type:

RuntimeConfiguration

scalems.radical.workflow_manager(loop, directory=None)[source]

Manage a workflow context for RADICAL Pilot work loads.

The rp.Session is created when the Python Context Manager is “entered”, so the asyncio event loop must be running before then.

To help enforce this, we use an async Context Manager, at least in the initial implementation. However, the implementation is not thread-safe. It is not reentrant, but this is not checked. We probably _do_ want to constrain ourselves to zero or one Sessions per environment, but we _do_ need to support multiple Pilots and task submission scopes (_resource requirement groups). Further discussion is welcome.

Warning

The importer of this module should be sure to import radical.pilot before importing the built-in logging module to avoid spurious warnings.

Parameters:

loop (AbstractEventLoop) –

scalems.radical support module

Manage the RADICAL Pilot start-up and shut-down.

The provided RuntimeSession class encapsulates stateful resources that, once acquired, should be shut down explicitly. RuntimeSession instances may be used as context managers to ensure the proper protocol is followed, or the caller can take responsibility for calling RuntimeSession.close() to shut down. TODO: Fix!

Note: Consider whether RuntimeSession context manager is reentrant or multi-use.

The RuntimeSession state encapsulates several nested states. A Session, TaskManager, and PilotManager must be created for the RuntimeSession to be usable. Additionally, Pilots and scheduler tasks may be added or removed during the RuntimeSession lifetime. To better support the alternate scenarios when a RuntimeSession instance may be provided to a scalems.radical component in an arbitrary state, consider making the with block scoped, such that it only returns the RuntimeSession instance to its initial state when exiting, rather than shutting down completely. See also https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack

Deferred:

RuntimeSession can avoid providing direct access to RP interface, and instead run an entire RP Session state machine in a thread (separate from the asyncio event loop thread), relaying RP scripting commands through queues, in order to completely prevent misuse and to insulate the asyncio event loop from blocking RP commands. We need to get a better sense of the RP flow combinatorics before we can reasonably pursue this.

title scalems on radical.pilot run time

box "SCALE-MS framework" #honeydew
participant WorkflowManager as client_workflowmanager
participant RuntimeManager
participant "runner task" as scalems.execution
end box
box "SCALE-MS RP adapter" #linen
participant RuntimeSession as client_runtime
participant Executor as client_executor
end box

autoactivate on

client_workflowmanager -> client_workflowmanager: async with executor
client_workflowmanager -> client_executor: ~__aenter__()
activate client_workflowmanager
client_executor -> RuntimeManager: ~__aenter__() base class method
RuntimeManager -> client_executor: runtime_configuration()
return
RuntimeManager -> client_executor: runtime_startup()

client_executor -> : rp.Session()
return
client_executor -> client_runtime **: Session
activate client_runtime
client_executor -> : rp.PilotManager()
return
client_executor -> client_runtime: pilot_manager()
return
client_executor -> : rp.TaskManager()
return
client_executor -> client_runtime: task_manager()
return
client_executor -> : pilot_manager.submit_pilots()
return
client_executor -> client_runtime: pilot()
note left
Pilot venv is determined by resource definition (JSON file).
end note
return

group ref [scalems.radical.raptor]
client_executor -> client_executor: get_raptor()
return
end

client_executor -> client_runtime: set raptor
return

client_executor ->> scalems.execution **: create_task(manage_raptor)
client_executor -> scalems.execution: await runner_started
RuntimeManager <-- client_executor: asyncio.Task
RuntimeManager -> RuntimeManager: set runner_task
return
RuntimeManager --> client_executor: self
client_workflowmanager <-- client_executor: RuntimeManager
client_workflowmanager --> client_workflowmanager: as manager
deactivate RuntimeManager

client_workflowmanager -> client_workflowmanager #gray: async with dispatcher

...Raptor workload handling...

return leave dispatcher context

== Shut down runtime ==

client_workflowmanager -> client_executor: ~__aexit__()
client_executor -> RuntimeManager: ~__aexit__() base class method
RuntimeManager ->> scalems.execution: enqueue a stop control message
deactivate scalems.execution
RuntimeManager -> RuntimeManager: await runner_task
note right
    drain the queue
end note
RuntimeManager <-- scalems.execution
deactivate RuntimeManager
RuntimeManager ->> client_executor: runtime_shutdown()

client_runtime <- client_executor
return session
client_runtime <- client_executor
return raptor

group Finalize Raptor task [if raptor is not None]
    group Give Raptor some time to shut down [if raptor.state not in FINAL]
        client_executor -> : runtime.raptor.wait(timeout=60)
    end
    group Forcibly cancel Raptor task [if raptor.state not in FINAL]
        client_runtime <- client_executor
        return task_manager
        client_executor -> : task_manager.cancel_tasks()
        return
        client_runtime <- client_executor
        return raptor
        client_executor -> : runtime.raptor.wait()
        return
    end
end

client_executor -> : session.close()
return

RuntimeManager -> RuntimeManager: await runtime_shutdown()

client_executor -> client_runtime !!
RuntimeManager <<-- client_executor
RuntimeManager --> RuntimeManager
RuntimeManager --> client_executor

client_workflowmanager <-- client_executor: leave executor context
client_workflowmanager --> client_workflowmanager

scalems.radical.runtime.executor_factory(manager, params=None)[source]
Parameters:
class scalems.radical.runtime.RPDispatchingExecutor[source]

Client side manager for work dispatched through RADICAL Pilot.

Extends scalems.execution.RuntimeManager

Configuration points:

  • resource config

  • pilot config

  • session config?

We try to wrap rp UI calls in separate threads. Note, though, that

  • The rp.Session needs to be created in the root thread to be able to correctly manage signal handlers and subprocesses, and

  • We need to be able to schedule RP Task callbacks in the same process as the asyncio event loop in order to handle Futures for RP tasks.

See https://github.com/SCALE-MS/randowtal/issues/2

runtime: RuntimeSession

See scalems.execution.RuntimeManager.runtime

__init__(*, editor_factory=None, datastore=None, loop, configuration)[source]

Create a client side execution manager.

Warning

The creation method does not fully initialize the instance.

Initialization and de-initialization occurs through the Python (async) context manager protocol.

Parameters:
runtime_configuration()[source]

Provide scoped Configuration.

Merge the runtime manager’s configuration with the global configuration, update the global configuration, and yield the configuration for a with block.

Restores the previous global configuration when exiting the with block.

Warning

Not thread-safe.

We use the ContextVar to check for (and disallow) re-entrance. This contextmanager is not async, but it could be (and is) used within an asynchronous context manager. If used from multiple threads, we would not have anything structurally prohibiting reentrant calls.

Design notes

Instance configuration is coupled to module state except as allowed through contextvars.copy().

Return type:

RuntimeConfiguration

async runtime_startup()[source]

Establish the RP Session.

Acquire a maximally re-usable set of RP resources. The scope established by this function is as broad as it can be within the life of the workflow manager.

Once instance.runtime_startup() succeeds, instance.runtime_shutdown() must be called to clean up resources. Use the async context manager behavior of the instance to automatically follow this protocol. I.e. instead of calling instance.runtime_startup(); ...; instance.runtime_shutdown(), use:

async with instance:
    ...
Raises:
Return type:

Task

Note

Signal handling

RP is known to use IPC signals in several cases. We believe that the client environment should only experience an internally triggered SIGINT in a single code path, and the behavior can be suppressed by setting the Pilot’s exit_on_error attribute to False.

We could use loop.add_signal_handler() to convert to an exception that we can raise in an appropriate task, but this is probably unnecessary. Moreover, with Python 3.11, we get a sensible signal handling behavior (for SIGINT) with asyncio.Runner. Per https://docs.python.org/3/library/asyncio-runner.html#handling-keyboard-interruption, we can just make sure that our run time resources will be properly shut down in the event of a asyncio.CancelledError, including sending appropriate calls to the radical.pilot framework.

See Also:

https://github.com/SCALE-MS/randowtal/issues/1

TODO: More concurrency.

The rp.Pilot and raptor task can be separately awaited, and we should allow input data staging to begin as soon as we have enough run time details to do it. We need to clarify which tasks should be in which state to consider the asynchronous context manager to have been successfully “entered”. I expect that success includes a Pilot in state PMGR_ACTIVE, and a raptor Task in state AGENT_EXECUTING, and asyncio.Task handles available for other aspects, like synchronization of metadata and initiation of input data staging. However, we may prefer that the workflow script can continue evaluation on the client side while waiting for the Pilot job, and that we avoid blocking until we absolutely have to (presumably when exiting the dispatching context).

runtime_shutdown(runtime)[source]

Manage tear down of the RADICAL Pilot Session and resources.

Several aspects of the RP runtime interface use blocking calls. This method should be run in a non-root thread (concurrent.futures.Future) that the event loop can manage as an asyncio-wrapped task.

Overrides scalems.execution.RuntimeManager

Parameters:

runtime (RuntimeSession) –

class scalems.radical.runtime.RuntimeConfiguration[source]

Module configuration information.

See also

  • scalems.radical.runtime_configuration.configuration()

  • scalems.radical.runtime.parser

  • scalems.radical.runtime.RuntimeSession

__init__(execution_target='local.localhost', rp_resource_params=<factory>, target_venv=None, enable_raptor=False)
Parameters:
  • execution_target (str) –

  • rp_resource_params (RPResourceParams) –

  • target_venv (str) –

  • enable_raptor (bool) –

Return type:

None

enable_raptor: bool = False
execution_target: str = 'local.localhost'

Platform identifier for the RADCIAL Pilot execution resource.

rp_resource_params: RPResourceParams

Schema for this member container may not be stable.

target_venv: str = None

Path to a pre-configured Python virtual environment on execution_target.

class scalems.radical.session.RuntimeSession[source]

Container for scalems.radical runtime state data.

Use a creation function to provide RuntimeSession with an asyncio event loop. Interact with the RuntimeSession in the main thread whenever possible. Let the RuntimeSession dispatch slow rp UI calls to other threads as needed and appropriate.

Note

There is very little automated error recovery. For examples of expansive checking and re-launching of runtime resources, refer to the fixtures in conftest.py at or before revision 41b965a27c5af9abc115677b738085c35766b5b6.

__init__(session, *, loop, configuration)[source]
Parameters:
close()[source]

Direct the runtime to shut down and release resources.

Warning

This function may return before resources have been finalized. Follow a call to close() with wait_closed() to give the event loop a chance to cycle.

pilot()[source]

Get active Pilot.

Allows lazy initialization of the Pilot resource.

Returns:

The current Pilot instance, if available and valid,

or a new Pilot instance in the configured PilotManager.

Return type:

radical.pilot.Pilot

Raises:

APIError – for invalid RP Session configuration.

pilot_manager() PilotManager | None[source]
pilot_manager(pilot_manager: str) PilotManager
pilot_manager(pilot_manager: PilotManager) PilotManager

Get (optionally set) the current PilotManager.

Parameters:

pilot_manager (optional, radical.pilot.PilotManager, str) – Set to RP PilotManager instance or identifier, if provided.

Returns:

instance, if set, else None.

Return type:

radical.pilot.PilotManager

Raises:
  • ValueError – for invalid identifier.

  • APIError – for invalid RP Session configuration.

task_manager() TaskManager | None[source]
task_manager(task_manager: str) TaskManager
task_manager(task_manager: TaskManager) TaskManager

Get (optionally set) the current TaskManager.

Parameters:

task_manager (optional, radical.pilot.TaskManager, str) – Set to RP TaskManager instance or identifier, if provided.

Returns:

instance, if set, else None.

Return type:

radical.pilot.TaskManager

Raises:
  • ValueError – for invalid identifier.

  • APIError – for invalid RP Session configuration.

async wait_closed()[source]

Wait for a closing session to be closed.

Use with close() to allow the asyncio event loop to resolve outstanding tasks.

TODO: Do we need this? This method may not be necessary. Or it may be more necessary in the future.

resources: Task[dict] | None = None

The active Pilot resources, if any.

The runtime_startup routine schedules a Task to get a copy of the Pilot.resource_details[‘rm_info’] dictionary, once the Pilot reaches state PMGR_ACTIVE.

property session: radical.pilot.Session

The current radical.pilot.Session (may already be closed).

async scalems.radical.task.rp_task(rptask)[source]

Mediate between a radical.pilot.Task and an asyncio.Future.

Schedule an asyncio Task to receive the result of the RP Task. The asyncio Task must also make sure that asyncio cancellation propagates to the rp.Task.cancel, and vice versa.

This function should be awaited immediately to make sure the necessary call-backs get registered. The result will be an asyncio.Task, which should be awaited separately.

Internally, this function provides a call-back to the rp.Task. The call-back provided to RP cannot directly call asyncio.Future methods (such as set_result() or set_exception()) because RP will be making the call from another thread without mediation by the asyncio event loop.

As such, we provide a thread-safe event handler to propagate the RP Task call-back to to this asyncio.Task result. (See _rp_callback() and RPFinalTaskState)

Canceling the returned task will cause rptask to be canceled. Canceling rptask will cause this task to be canceled.

Parameters:

rptask (radical.pilot.Task) – RADICAL Pilot Task that has already been submitted.

Returns:

A Task that, when awaited, returns the rp.Task instance in its final state.

Return type:

Task[radical.pilot.Task]

scalems.radical.task.scalems_callback(fut, *, item)[source]

Propagate the results of a Future to the subscribed item.

Partially bind item to use this as the argument to fut.add_done_callback().

Warning

In the long run, we should not extend the life of the reference returned by edit_item, and we need to consider the robust way to publish item results.

Note

This is not currently RP-specific and we should look at how best to factor results publishing for workflow items. It may be that this function is the appropriate place to convert RP Task results to scalems results.

Parameters:
  • fut (Future) –

  • item (Task) –

async scalems.radical.task.submit(*, item, task_manager, pre_exec, raptor_id=None)[source]

Dispatch a WorkflowItem to be handled by RADICAL Pilot.

Submits an rp.Task and returns an asyncio.Task watcher for the submitted task.

Creates a Future, registering a done_callback to publish the task result with item.set_result().

A callback is registered with the rp.Task to set an Event on completion. An asyncio.Task watcher task monitors the Event(s) and gets a reference to an asyncio.Future through which results can be published to the scalems workflow item. (Currently the Future is created in this function, but should probably be acquired directly from the item itself.) The watcher task waits for the rp.Task finalization event or for the Future to be cancelled. Periodically, the watcher task “wakes up” to check if something has gone wrong, such as the rp.Task completing without setting the finalization event.

Caveats

There is an unavoidable race condition in the check performed by the watcher task. We don’t know how long after an rp.Task completes before its callbacks will run and we can’t check whether the callback has been scheduled. The watcher task cannot be allowed to complete successfully until we know that the callback has either run or will never run.

The delay between the rp.Task state change and the callback execution should be less than a second. We can allow the watcher to wake up occasionally (on the order of minutes), so we can assume that it will never take more than a full iteration of the waiting loop for the callback to propagate, unless there is a bug in RP. For simplicity, we can just note whether rptask.state in rp.FINAL before the watcher goes to sleep and raise an error if the callback is not triggered in an iteration where we have set such a flag.

If our watcher sends a cancellation to the rp.Task, there is no need to continue to monitor the rp.Task state and the watcher may exit.

Parameters:
  • item (Task) – The workflow item to be submitted

  • task_manager (radical.pilot.TaskManager) – A radical.pilot.TaskManager instance through which the task should be submitted.

  • pre_exec (list) – radical.pilot.Task.pre_exec prototype.

  • raptor_id (str) – The string name of the “scheduler,” corresponding to the UID of a Task running a rp.raptor.Master (if Raptor is enabled).

Returns:

a “Future[rp.Task]” for a rp.Task in its final state.

Return type:

asyncio.Task

The caller must await the result of the coroutine to obtain an asyncio.Task that can be cancelled or awaited as a proxy to direct RP task management. The Task will hold a coroutine that is guaranteed to already be running, failed, or canceled. The caller should check the status of the task immediately before making assumptions about whether a Future has been successfully bound to the managed workflow item.

The returned asyncio.Task can be used to cancel the rp.Task (and the Future) or to await the RP.Task cleanup.

To submit tasks as a batch, await an array of submit() results in the same dispatching context. (TBD)

scalems.radical.raptor

Support for scalems on radical.pilot.raptor.

Define the connective tissue for SCALE-MS tasks embedded in rp.Task arguments.

Provide the RP Raptor and Worker details. Implement the raptor task script as a package entry point that we expect to be executable from the command line in a virtual environment where the scalems package is installed, without further modification to the PATH.

The client should be reasonably certain that the target environment has a compatible installation of RP and scalems. A rp.raptor.Master task script is installed with the scalems package. The script name is provide by the module function raptor_script(), and will be resolvable on the PATH for a Python interpreter process in an environment that has the scalems package installed.

We should try to keep this module as stable as possible so that the run time interface provided by scalems to RP is robust, and the entry point scripts change as little as possible across versions. scalems.radical runtime details live in runtime.py.

As of radical.pilot version 1.18, TaskDescription supports a mode field and several additional fields. The extended schema for the TaskDescription depends on the value of mode. The structured data provided to the executor callable is composed from these additional fields according to the particular mode.

We use the radical.pilot.task_description.TASK_FUNCTION mode, specifying a function field to name a callable in the (global) namespace accessible by the worker process.

We populate the worker global namespace with imported callables in the module file from which Raptor imports our radical.pilot.raptor.MPIWorker subclass, ScaleMSWorker.

The callable for call accepts *args and **kwargs, which are extracted from fields in the TaskDescription. Since we derive from MPIWorker, an mpi4py communicator is inserted at args[0].

Protocol

As of RP 1.18, the radical.pilot.raptor interface is not documented. The following diagrams illustrate the approximate architecture of a Raptor Session.

title RP Raptor client Session

box "RP client"
participant TaskManager
end box

'queue "Taskmanager: Scheduler and Input Queue" as Queue
queue "Scheduler/Input Queue" as Queue

-> TaskManager: submit(master_task_description)
activate TaskManager

'note left
note over TaskManager
 TaskDescription uses `mode=rp.RAPTOR_MASTER`.
 Task args provide details for the script
 so that a Raptor can start appropriate Workers.
end note

TaskManager -> Queue: task_description
activate Queue
deactivate TaskManager
Queue <-
Queue -->
deactivate Queue

-> TaskManager: submit(task_description)
activate TaskManager

'note left
note over TaskManager
 TaskDescription names a Raptor uid in *raptor_id* field.
end note

TaskManager -> Queue: task_description
activate Queue
deactivate TaskManager
Queue <-
Queue -->
deactivate Queue
...
TaskManager ->
TaskManager <-- : raptor task results

A “raptor task” is an executable task (mode = rp.RAPTOR_MASTER) in which the script named by radical.pilot.TaskDescription.executable manages the life cycle of a radical.pilot.raptor.Master (or subclass instance). As of RP 1.14, the protocol is as follows.

title raptor raptor task lifetime management

participant "raptor task"

-> "raptor task" : stage in

create Raptor as raptor
"raptor task" -> raptor: create Raptor(cfg)
"raptor task" -> raptor: Raptor.submit_workers(descr=descr, count=n_workers)
"raptor task" -> raptor: Raptor.start()
alt optional hook for self-submitting additional tasks
"raptor task" -> raptor: Raptor.submit_tasks(tasks)
end
queue scheduler
scheduler -\ raptor : request_cb
scheduler -\ raptor : result_cb
raptor -> raptor: Raptor.stop() (optional)
"raptor task" -> raptor: Raptor.join()

[<-- "raptor task" : stage out

The cfg argument to radical.pilot.raptor.Master does not currently appear to be required.

scalems encodes worker requirements on the client side. The scalems raptor script decodes the client-provided requirements, combines the information with run time details and work load inspection, and produces the WorkerDescription with which to submit_workers().

class RaptorWorkerConfig {
 count
 descr
}

class WorkerDescription

RaptorWorkerConfig *--> "descr" WorkerDescription

namespace scalems.radical {
 class RaptorConfiguration

 class ClientWorkerRequirements

 class ScaleMSRaptor

 ScaleMSRaptor --> RaptorConfiguration : launches with

 RaptorConfiguration *-- ClientWorkerRequirements

 ClientWorkerRequirements -right-> .RaptorWorkerConfig : worker_description()
}

namespace radical.pilot.raptor {
 class Master {
  submit_workers()
 }
 Master::submit_workers .> .RaptorWorkerConfig
}

scalems.radical.ScaleMSRaptor -up-|> radical.pilot.raptor.Master

TODO

Pass input and output objects more efficiently.

Worker processes come and go, and are not descended from Raptor processes (which may not even be on the same node), so we can’t generally pass objects by reference or even in shared memory segments. However, we can use optimized ZMQ facilities for converting network messages directly to/from memory buffers.

In other cases, we can use the scalems typing system to specialize certain types for optimized (de)serialization from/to file-backed storage or streams.

See https://github.com/SCALE-MS/scale-ms/issues/106

TODO

Get a stronger specification of the RP Raptor interface.

See https://github.com/radical-cybertools/radical.pilot/issues/2731

raptor task

scalems specialization of the “raptor” component in the radical.pilot.raptor federated scheduling protocol.

scalems.radical.raptor.raptor()[source]

Entry point for raptor.Master task.

This function implements the scalems_raptor entry point script called by the RADICAL Pilot executor to provide the raptor raptor task.

During installation, the scalems build system creates a scalems_raptor entry point script that provides command line access to this function. The Python signature takes no arguments because the function processes command line arguments from sys.argv

async scalems.radical.raptor.raptor_input(*, filestore)[source]

Provide the input file for a SCALE-MS Raptor script.

The resulting configuration file is staged with the Raptor scheduler task and provided as a command line argument. Most information can be provided after the Raptor starts (through CPI commands), moving forward, but we retain this input mechanism for now.

Produces a file containing a serialized RaptorConfiguration.

Parameters:

filestore (FileStore) – (local) FileStore that will manage the generated AbstractFileReference.

Return type:

AbstractFileReference

scalems.radical.raptor.worker_requirements(*, pre_exec, ranks_per_worker, cores_per_rank=1, gpus_per_rank=0)[source]

Get the requirements for the work load, as known to the client.

Parameters:
Return type:

ClientWorkerRequirements

class scalems.radical.raptor.RaptorConfiguration[source]

Input to the script responsible for the RP raptor.

Produced on the client side with raptor_input. Deserialized in the raptor task (raptor) to get a RaptorWorkerConfig.

__init__(versioned_modules)
Parameters:

versioned_modules (List[Tuple[str, str]]) –

Return type:

None

classmethod from_dict(obj)[source]

Decode from a native dict.

Support deserialization, such as from a JSON file in the raptor task.

Parameters:

obj (_RaptorConfigurationDict) –

versioned_modules: List[Tuple[str, str]]

List of name, version specifier tuples for required modules.

class scalems.radical.raptor.ClientWorkerRequirements[source]

Client-side details to inform worker provisioning.

This structure is part of the scalems.radical.raptor.RaptorConfiguration provided to the raptor script. The raptor script uses this information when calling worker_description().

Use the worker_requirements() creation function for a stable interface.

__init__(cores_per_process, cpu_processes, gpus_per_rank=0, pre_exec=(), named_env=None)
Parameters:
  • cores_per_process (int) –

  • cpu_processes (int) –

  • gpus_per_rank (float) –

  • pre_exec (tuple[str]) –

  • named_env (str | None) –

Return type:

None

cores_per_process: int

Number CPU cores reserved per process (or MPI rank) for threading or child processes.

cpu_processes: int

Number of ranks in the Worker MPI context.

gpus_per_rank: float = 0

GPUs per Worker rank.

named_env: str | None = None

A registered virtual environment known to the raptor manager.

Warning

Not tested. Support has been delayed indefinitely.

pre_exec: tuple[str] = ()

Lines of shell expressions to be evaluated before launching the worker process.

class scalems.radical.raptor.ScaleMSRaptor[source]

Manage a RP Raptor raptor_id target.

Extends rp.raptor.Master.

We do not submit scalems tasks directly as RP Tasks from the client. We encode scalems tasks as instructions to the Raptor task, which will be decoded and translated to RP Tasks by the ScaleMSRaptor.request_cb and self-submitted to the ScaleMSWorker.

Results of such tasks are only available through to the result_cb(). The Raptor can translate results of generated Tasks into results for the Task carrying the coded instruction, or it can produce data files to stage during or after the Raptor task. Additionally, the Raptor could respond to other special instructions (encoded in later client-originated Tasks) to query or retrieve generated task results.

title Raptor

queue "Queue" as Queue

box "RP Agent"
participant Scheduler
end box

queue "Raptor input queue" as master_queue

box "scalems.radical.raptor"
participant ScaleMSRaptor
participant rpt.Master
end box

queue "ZMQ Raptor work channel" as channel

activate Queue

Scheduler -> Queue: accepts work
activate Scheduler
Scheduler <-- Queue: task dictionary
deactivate Queue

note over Scheduler
Scheduler gets task from queue,
observes `scheduler` field and routes to Raptor.
end note

Scheduler -> master_queue: task dictionary
activate master_queue
deactivate Scheduler

note over ScaleMSRaptor, rpt.Master
Back end RP queue manager
passes messages to Raptor callbacks.
end note

rpt.Master -> master_queue: accept Task
activate rpt.Master
rpt.Master <-- master_queue: cpi_task
deactivate master_queue

rpt.Master -> rpt.Master: _request_cb(cpi_task)
activate rpt.Master
rpt.Master -> ScaleMSRaptor: request_cb(cpi_task)
activate ScaleMSRaptor

ScaleMSRaptor -> ScaleMSRaptor: CpiCommand.launch()
activate ScaleMSRaptor
alt optionally process or update requests
  ScaleMSRaptor -> ScaleMSRaptor: submit_tasks(scalems_tasks)
  activate ScaleMSRaptor
  deactivate ScaleMSRaptor
  ScaleMSRaptor -> ScaleMSRaptor: _result_cb(cpi_task)
  activate ScaleMSRaptor
  deactivate ScaleMSRaptor
else resolve a Control message
  ScaleMSRaptor -> ScaleMSRaptor: _result_cb(cpi_task)
  activate ScaleMSRaptor
  deactivate ScaleMSRaptor
end
return

return filtered tasks

rpt.Master -> rpt.Master: submit_tasks(...)
activate rpt.Master
deactivate rpt.Master
rpt.Master -> channel: send message
deactivate rpt.Master
deactivate rpt.Master

rpt.Master -> channel: accept result
activate rpt.Master
rpt.Master <-- channel: scalems_tasks
deactivate channel

rpt.Master -> rpt.Master: _result_cb(scalems_tasks))
activate rpt.Master
rpt.Master -> ScaleMSRaptor: result_cb(scalems_tasks)
activate ScaleMSRaptor

ScaleMSRaptor -> ScaleMSRaptor: CpiCommand.result_hook()

alt optionally process or update pending requests
ScaleMSRaptor -> ScaleMSRaptor: issue#277
activate ScaleMSRaptor
deactivate ScaleMSRaptor
end

rpt.Master <-- ScaleMSRaptor
deactivate ScaleMSRaptor

rpt.Master -> master_queue: send message
activate master_queue
deactivate rpt.Master
deactivate rpt.Master

__init__(configuration)[source]

Initialize a SCALE-MS Raptor.

Verify the environment. Perform some scalems-specific set up and initialize the base class details.

Parameters:

configuration (RaptorConfiguration) –

configure_worker(requirements)[source]

Scoped temporary module file for raptor worker.

Write and return the path to a temporary Python module. The module imports ScaleMSWorker into its module namespace so that the file and class can be used in the worker description for rp.raptor.Master.submit_workers()

Parameters:

requirements (ClientWorkerRequirements) –

Return type:

Generator[list[scalems.radical.raptor.WorkerDescription], None, None]

cpi_finalize(task)[source]

Short-circuit the normal Raptor protocol to finalize a task.

This is an alias for Raptor._result_cb(), but is a public method used in the CpiCommand.launch method for Control commands, which do not call ScaleMSRaptor.submit_tasks().

Parameters:

task (TaskDictionary) –

request_cb(tasks)[source]

Allows all incoming requests to be processed by the Raptor.

RADICAL guarantees that request_cb() calls are made sequentially in a single thread. The implementation does not need to be thread-safe or reentrant. (Ref: https://github.com/radical-cybertools/radical.utils/blob/master/src/radical/utils/zmq/queue.py#L386)

RADICAL does not guarantee that Tasks are processed in the same order in which they are submitted by the client.

If overridden, request_cb() must return a list. The returned list is interpreted to be requests that should be processed normally after the callback. This allows subclasses of rp.raptor.Master to add or remove requests before they become Tasks. The returned list is submitted with self.submit_tasks() by the base class after the callback returns.

A Raptor may call self.submit_tasks() to self-submit items (e.g. instead of or in addition to manipulating the returned list in request_cb()).

It is the developer’s responsibility to choose unique task IDs (uid) when crafting items for Raptor.submit_tasks().

Parameters:

tasks (Sequence[TaskDictionary]) –

Return type:

Sequence[TaskDictionary]

result_cb(tasks)[source]

SCALE-MS specific handling of completed tasks.

We perform special handling for two types of tasks. 1. Tasks submitted throughcompleted by the collaborating Worker(s). 2. Tasks intercepted and resolved entirely by the Raptor (CPI calls).

Parameters:

tasks (Sequence[TaskDictionary]) –

worker task

scalems specialization of the “worker” component in the radical.pilot.raptor federated scheduling protocol.

scalems.radical.raptor.worker_description(*, named_env, worker_file, cores_per_process=None, cpu_processes=None, gpus_per_rank=None, pre_exec=(), environment=None)[source]

Get a worker description for ScaleMSRaptor.submit_workers().

Parameters:
Return type:

WorkerDescription

worker_file is generated for the raptor script by the caller. See scalems.radical.raptor

The uid for the Worker task is defined by the ScaleMSRaptor.submit_workers().

class scalems.radical.raptor.ScaleMSWorker[source]

Specialize the Raptor MPI Worker for scalems dispatching of serialised work.

scalems tasks encode the importable function and inputs in the arguments to the run_in_worker dispatching function, which is available to the ScaleMSWorker instance.

title ScaleMSWorker task dispatching

queue "ZMQ Raptor work channel" as channel

box "scalems.radical.raptor.Worker"
participant ScaleMSWorker
participant rpt.Worker
end box

box "usermodule"
participant usermodule
end box

box "target venv"
end box

rpt.Worker -> channel: pop message
activate rpt.Worker
rpt.Worker <-- channel: rp.TASK_FUNCTION mode Task
rpt.Worker -> rpt.Worker: _dispatch_function()
activate rpt.Worker

alt scalems encoded workload
  rpt.Worker -> ScaleMSWorker: run_in_worker()
  activate ScaleMSWorker
  ScaleMSWorker -> ScaleMSWorker: unpack encoded function call
  ScaleMSWorker -> ScaleMSWorker: from usermodule import func
  ScaleMSWorker -> usermodule: ""func(*args, **kwargs)""
  activate usermodule
  ScaleMSWorker <-- usermodule
  deactivate usermodule
  return
else pickled rp.PythonTask workload (not implemented)
  rpt.Worker -> rpt.Worker: unpickled scalems handler
  activate rpt.Worker
  rpt.Worker -> usermodule: ""func(*args, **kwargs)""
  activate usermodule
  rpt.Worker <-- usermodule
  deactivate usermodule
  rpt.Worker --> rpt.Worker: {out, err, ret, value}
  deactivate rpt.Worker
end

deactivate rpt.Worker
rpt.Worker -> channel: put result
deactivate rpt.Worker

run_in_worker(*, work_item, comm=None)[source]

Unpack and run a task requested through RP Raptor.

Satisfies RaptorTaskExecutor protocol. Named as the function for rp.TASK_FUNCTION mode tasks generated by scalems.

This function MUST be imported and referentiable by its __qualname__ from the scope in which ScaleMSWorker methods execute. (See ScaleMSRaptor._worker_file)

Parameters:
  • comm (mpi4py.MPI.Comm, optional) – MPI communicator to be used for the task.

  • work_item (ScalemsRaptorWorkItem) – dictionary of encoded function call from kwargs in the TaskDescription.

See also

CpiAddItem.launch()

task handling

scalems instructions are embedded in TaskDescriptions with the scalems.radical.raptor.CPI_MESSAGE mode, using the metadata field for the payload. Executable work is re-encoded by ScaleMSRaptor for ScaleMSWorker (using a ScalemsRaptorWorkItem in the work_item field of the kwargs of a TASK_FUNCTION mode Task) to be dispatched through scalems machinery in the Worker process.

scalems.radical.raptor.api_name

Key for dispatching raptor Requests.

We can use this to identify the schema used for SCALE-MS tasks encoded in arguments to raptor TASK_FUNCTION mode executor.

scalems.radical.raptor.CPI_MESSAGE

Flag for scalems messages to be treated as CPI calls.

Used in the mode field to indicate that the object should be handled through the SCALEMS Compute Provider Interface machinery.

class scalems.radical.raptor.ScalemsRaptorWorkItem[source]

Encode the function call to implement a task in the workflow.

Parameter type for the run_in_worker function work_item argument.

This structure must be trivially serializable (by msgpack) so that it can be passed in a TaskDescription.kwargs field. Consistency with higher level scalems.serialization is not essential, but can be pursued in the future.

The essential role is to represent an importable callable and its arguments. At run time, the dispatching function (run_in_worker) conceivably has access to module scoped state, but does not have direct access to the Worker (or any resources other than the mpi4py.MPI.Comm object). Therefore, abstract references to input data should be resolved at the Raptor in terms of the expected Worker environment before preparing and submitting the Task.

TODO: Evolve this to something sufficiently general to be a scalems.WorkItem.

TODO: Clarify the translation from an abstract representation of work in terms

of the workflow record to the concrete representation with literal values and local filesystem paths.

TODO: Record extra details like implicit filesystem interactions,

environment variables, etc. TBD whether they belong in a separate object.

args: list

Positional arguments for func.

comm_arg_name: str | None

Identify how to provide an MPI communicator to func, if at all.

If comm_arg_name is not None, the callable will be provided with the MPI communicator. If comm_arg_name is an empty string, the communicator is provided as the first positional argument. Otherwise, comm_arg_name must be a valid key word argument by which to pass the communicator to func.

func: str

A callable to be retrieved as an attribute in module.

kwargs: dict

Key word arguments for func.

module: str

The qualified name of a module importable by the Worker.

compatibility helpers

These classes are not formal types, but are used to represent (untyped) interfaces in radical.pilot.raptor.

scalems.radical.raptor.RaptorWorkerConfig

Client-specified Worker requirements.

Used by raptor script when calling worker_description(). Created internally with _configure_worker().

class scalems.radical.raptor.WorkerDescription[source]

Worker description.

class scalems.radical.raptor.TaskDictionary[source]

Task representations seen by request_cb and result_cb.

Other fields may be present, but the objects in the sequences provided to scalems.radical.raptor.ScaleMSRaptor.request_cb() and scalems.radical.raptor.ScaleMSRaptor.result_cb() have the following fields. Result fields will not be populated until the Task runs.

For the expected fields, see the source code for as_dict(): https://radicalpilot.readthedocs.io/en/stable/_modules/radical/pilot/task.html#Task.as_dict

description: _RaptorTaskDescription

Encoding of the original task description.

exception: str

Exception type name and message.

exception_detail: str

Full exception details with stack trace.

exit_code: int

Task return code.

return_value: Any

Function return value.

Refer to the RaptorTaskExecutor Protocol.

state: str

RADICAL Pilot Task state.

stderr: str

Task standard error.

stdout: str

Task standard output.

target_state: str

State to which the Task should be advanced.

Valid values are string constants from radical.pilot.states.

Used internally, such as in Master._result_cb().

uid: str

Canonical identifier for the Task.

Note that uid may be omitted from the original TaskDescription.

class scalems.radical.raptor._RaptorTaskDescription[source]

Describe a Task to be executed through a Raptor Worker.

A specialization of radical.pilot.TaskDescription.

Note the distinctions of a TaskDescription to be processed by a raptor.Master.

The meaning or utility of some fields is dependent on the values of other fields.

TODO: We need some additional fields, like environment and fields related

to launch method and resources reservation. Is the schema for rp.TaskDescription sufficiently strong and well-documented that we can remove this hinting type? (Check again at RP >= 1.19)

args: list

For rp.TASK_FUNCTION mode, list of positional arguments provided to the executor function.

executable: str

Unused by Raptor tasks.

function: str

Executor for rp.TASK_FUNCTION mode.

Names the callable for dispatching.

The callable can either be a function object present in the namespace of the interpreter launched by the Worker for the task, or a radical.pilot.pytask.PythonTask pickled function object.

kwargs: dict

For rp.TASK_FUNCTION mode, a dictionary of key word arguments provided to the executor function.

metadata: str | int | float | bool | None | Mapping[str, Encodable] | List[Encodable]

An arbitrary user-provided payload.

May be any type that is encodable by msgpack (i.e. built-in types).

mode: str

The executor mode for the Worker to use.

For rp.TASK_FUNCTION mode, either function or method must name a task executor. Depending on the Worker (sub)class, resources such as an mpi4py.MPI.Comm will be provided as the first positional argument to the executor. *args and **kwargs will be provided to the executor from the corresponding fields.

See also

raptor_id: str

The UID of the raptor.Master scheduler task.

This field is relevant to tasks routed from client TaskManagers. It is not used for tasks originating in raptor tasks.

uid: str

Unique identifier for the Task across the Session.