Python package reference

SCALE-MS - Scalable Adaptive Large Ensembles of Molecular Simulations.

This package provides Python driven data flow scripting and graph execution for molecular science computational research protocols.

Documentation is published online at https://scale-ms.readthedocs.io/.

Refer to https://github.com/SCALE-MS/scale-ms/wiki for development documentation.

Invocation:

ScaleMS scripts describe a workflow. To run the workflow, you must tell ScaleMS how to dispatch the work for execution.

For most use cases, you can run the script in the context of a particular execution scheme by using the -m Python command line flag to specify a ScaleMS execution module:

# Execute with the default local execution manager.
python -m scalems.local myworkflow.py
# Execute with the RADICAL Pilot based execution manager.
python -m scalems.radical myworkflow.py

Execution managers can be configured and used from within the workflow script, but the user has extra responsibility to properly shut down the execution manager, and the resulting workflow may be less portable. For details, refer to the documentation for particular WorkflowContexts.

Object model

When the scalems package is imported, a default Context is instantiated to manage the API session. The Python scripting interface allows a directed acyclic graph of Resources and resource dependencies to be declared for management by the context. Resources may be static or stateful. Resources have type and shape. Resource type may be a fundamental data type or a nested and structured type. An Operation is a stateful Resource whose type maps to a scalems compatible Function.

Interfaces

scalems Resource references are proxies to resources managed by the framework.

A Resource reference may be used as input to a scalems compatible function.

A Resource provides a Future interface if the Resource represents an immutable data event that can be converted to concrete data in the client context. Future.result() forces the framework to resolve any pending dependencies and blocks until a local object can be provided to the caller.

When a client uses a function to add work to a work flow, the function returns a reference to an Operation.

An Operation reference has (read-only) attributes for the named resources it provides. These resources may be nested.

Operations provide a run() method to force execution at the point of call. run() is an alias for Resource.result()

Execution Module

Every SCALE-MS object reference belongs to a workflow managed by a WorkflowManager. Workflows may be executed through different means and with different resources through distinct modules. Different middleware implementations may be accessed directly, but we recommend selecting a management module when invoking Python from the command line with the -m option.

See SCALE-MS invocation for usage information.

See scalems.invocation for more about Execution Modules.

Entry point

The entry point for a scalems workflow script is the function decorated with scalems.app

@scalems.app[source]

Annotate a callable for execution by SCALEMS.

Parameters:

func (Callable) –

Return type:

Callable

Basic functions

Core Function implementations provided with the SCALE-MS package.

scalems.executable(*args, manager=None, **kwargs)[source]

Execute a command line program.

Configure an executable to run in one (or more) subprocess(es). Executes when run in an execution Context, as part of a work graph. Process environment and execution mechanism depends on the execution environment, but is likely similar to (or implemented in terms of) the POSIX execvp system call.

Shell processing of argv is disabled to improve determinism. This means that shell expansions such as environment variables, globbing (*), and other special symbols (like ~ for home directory) are not available. This allows a simpler and more robust implementation, as well as a better ability to uniquely identify the effects of a command line operation. If you think this disallows important use cases, please let us know.

Parameters:
  • manager (WorkflowManager) – Workflow manager to which the work should be submitted.

  • args – a tuple (or list) to be the subprocess arguments, including the executable

args is required. Additional key words are optional.

Parameters:
  • outputs (Mapping) – labeled output files, mapping command line flag to one (or more) filenames.

  • inputs (Mapping) – labeled input files, mapping command line flag to one (or more) filenames.

  • environment (Mapping) – environment variables to be set in the process environment.

  • stdin (str) – source for posix style standard input file handle (default None).

  • stdout (str) – Capture standard out to a filesystem artifact, even if it is not consumed in the workflow.

  • stderr (str) – Capture standard error to a filesystem artifact, even if it is not consumed in the workflow.

  • resources (Mapping) – Name additional required resources, such as an MPI environment.

  • manager (WorkflowManager) –

Program arguments are iteratively added to the command line with standard Python iteration, so you should use a tuple or list even if you have only one parameter. I.e. If you provide a string with arguments="asdf" then it will be passed as ... "a" "s" "d" "f". To pass a single string argument, arguments=("asdf") or arguments=["asdf"].

inputs and outputs should be a dictionary with string keys, where the keys name command line “flags” or options.

Note that the Execution Context (e.g. RPContext, LocalContext, DockerContext) determines the handling of resources. Typical values in resources may include

  • procs_per_task (int): Number of processes to spawn for an instance of the exec.

  • threads_per_proc (int): Number of threads to allocate for each process.

  • gpus_per_task (int): Number of GPU devices to allocate for and instance of the exec.

  • launcher (str): Task launch mechanism, such as mpiexec.

Returns:

Output collection contains exitcode, stdout, stderr, file.

Parameters:

manager (WorkflowManager) –

The file output has the same keys as the outputs key word argument.

Example

Execute a command named exe that takes a flagged option for input and output file names (stored in a local Python variable my_filename and as the string literal 'exe.out') and an origin flag that uses the next three arguments to define a vector.

>>> my_filename = "somefilename"
>>> command = scalems.executable(
...    ('exe', '--origin', 1.0, 2.0, 3.0),
...    inputs={'--infile': scalems.file(my_filename)},
...    outputs={'--outfile': scalems.file('exe.out')})
>>> assert hasattr(command, 'file')
>>> import os
>>> assert os.path.exists(command.file['--outfile'].result())
>>> assert hasattr(command, 'exitcode')
scalems.commands.extend_sequence(sequence_a, sequence_b)[source]

Combine sequential data into a new sequence.

scalems.commands.logical_and(iterable)[source]

Produce a boolean value resulting from the logical AND of the elements of the input.

The API does not specify whether the result is published before all values of iterable have been inspected, but (for checkpointing, reproducibility, and simplicity of implementation), the operation is not marked “complete” until all inputs have been resolved.

scalems.commands.logical_not(value)[source]

Negate boolean inputs.

Dynamic functions

Dynamic functions generate operations during graph execution.

scalems.commands.map(function, iterable, shape=None)[source]

Generate a collection of operations by iteration.

Apply function to each element of iterable.

If iterable is ordered, the generated operation collection is ordered.

If iterable is unordered, the generated operation collection is unordered.

scalems.commands.reduce(function, iterable)[source]

Repeatedly apply a function.

For an Iterable[T] and a function that maps (T, T) -> T, apply the function to map Iterable[T] -> T. Assumes function is associative.

If iterable is ordered, commutation of operands is preserved, but associativity of the generated operations is not specified. If iterable is unordered, the caller is responsible for ensuring that function obeys the commutative property.

Compare to functools.reduce()

scalems.commands.while_loop(*, function, condition, max_iteration=10, **kwargs)[source]

Generate and run a chain of operations such that condition evaluates True.

Returns a reference that acts like a single operation in the current work graph, but which is a proxy to the operation at the end of a dynamically generated chain of operations. At run time, condition is evaluated for the last element in the current chain. If condition evaluates False, the chain is extended and the next element is executed. When condition evaluates True, the object returned by while_loop becomes a proxy for the last element in the chain.

TODO: Allow external access to intermediate results / iterations?

Parameters:
  • function – a callable that produces an instance of an operation when called (with **kwargs, if provided).

  • condition – a call-back that returns a boolean when provided an operation instance.

  • max_iteration – execute the loop no more than this many times (default 10)

function (and/or condition) may be a “partial function” and/or stateful function object (such as is produced by subgraph()) in order to retain state between iterations.

TODO: Specify requirements of function and condition. (Picklable? scalems.Function compatible?)

TODO: Describe how condition could be fed asynchronously from outside of the wrapped function.

TODO: Does condition need to evaluate directly to a native bool, or do we

detect whether a Future is produced, and call result() if necessary?

Warning

max_iteration is provided in part to minimize the cost of bugs in early versions of this software. The default value may be changed or removed on short notice.

scalems.commands.poll()[source]

Inspect the execution status of an operation.

Inspects the execution graph state in the current context at the time of execution.

Used in a work graph, this adds a non-deterministic aspect, but adds truly asynchronous adaptability.

Data shaping functions

Establish and manipulate data flow topology.

scalems.commands.desequence(iterable)[source]

Remove sequencing from an iterable.

Given an input of shape (N, [M, …]), produce an iterable of resources (of shape (M,…)). Ordering information from the outer dimension is lost, even in an ensemble scope, in which parallel data flows generally retain their ordering.

This allows iterative tools (e.g. map) to use unordered or asynchronous iteration on resource slices as they become available.

scalems.commands.resequence(keys, collection)[source]

Set the order of a collection.

Use the ordered list of keys to define the sequence of the elements in the collection.

In addition to reordering an ordered collection, this function is useful for applying a sequence from one part of a work flow to data that has been processed asynchronously.

Helpers

Tools for dynamically generating Functions.

scalems.commands.function_wrapper(output=None)[source]

Generate a decorator for wrapped functions with signature manipulation.

New function accepts the same arguments, with additional arguments required by the API.

The new function returns an object with an output attribute containing the named outputs.

Example

>>> @function_wrapper(output={'spam': str, 'foo': str})
... def myfunc(parameter: str = None, output=None):
...    output.spam = parameter
...    output.foo = parameter + ' ' + parameter
...
>>> operation1 = myfunc(parameter='spam spam')
>>> assert operation1.spam.result() == 'spam spam'
>>> assert operation1.foo.result() == 'spam spam spam spam'
Parameters:

output (dict) – output names and types

If output is provided to the wrapper, a data structure will be passed to the wrapped functions with the named attributes so that the function can easily publish multiple named results. Otherwise, the output of the generated operation will just capture the return value of the wrapped function.

scalems.commands.subgraph(variables=None)[source]

Allow operations to be configured in a sub-context.

The object returned functions as a Python context manager. When entering the context manager (the beginning of the with block), the object has an attribute for each of the named variables. Reading from these variables gets a proxy for the initial value or its update from a previous loop iteration. At the end of the with block, any values or data flows assigned to these attributes become the output for an iteration.

After leaving the with block, the variables are no longer assignable, but can be called as bound methods to get the current value of a variable.

When the object is run, operations bound to the variables are reset and run to update the variables.

Example:

@scalems.function_wrapper(output={'data': float})
def add_float(a: float, b: float) -> float:
    return a + b

@scalems.function_wrapper(output={'data': bool})
def less_than(lhs: float, rhs: float, output=None):
    output.data = lhs < rhs

subgraph = scalems.subgraph(variables={'float_with_default': 1.0, 'bool_data': True})
with subgraph:
    # Define the update for float_with_default to come from an add_float operation.
    subgraph.float_with_default = add_float(subgraph.float_with_default, 1.).output.data
    subgraph.bool_data = less_than(lhs=subgraph.float_with_default, rhs=6.).output.data
operation_instance = subgraph()
operation_instance.run()
assert operation_instance.values['float_with_default'] == 2.

loop = scalems.while_loop(function=subgraph, condition=subgraph.bool_data)
handle = loop()
assert handle.output.float_with_default.result() == 6

Speculative functions

These functions are probably not explicitly necessary, or at least not appropriate for the high level interface.

scalems.commands.gather(iterable)[source]

Convert an iterable or decomposable collection to a complete collection.

Use to synchronize/localize data. Reference with ambiguous dimensionality or decomposable dimensions are converted to references with concrete dimensionality and non-decomposable dimensions. When the output of gather is used as input to another function, the entire results of iterable are available in the operation context.

This function should generally not be necessary when defining a work flow, but may be necessary to disambiguate data flow topology, such as when N operations should each consume an N-dimensional resource in its entirety, in which case, gather is implicitly an all_gather.

Alternatives:

For decomposable dimensions, we may want to allow a named scope for the decomposition. Then, explicit gather use cases would be replaced by an annotation to change the scope for a dimension, and, as part of a function implementation’s assertion of its decomposability, a function could express its ensemble/decomposition behavior in terms of a particular target scope of an operation.

scalems.commands.scatter(iterable, axis=1)[source]

Explicitly decompose data.

For input with outer dimension size N and dimensionality D, or for an iterable of length N and elements with dimensionality D-1, produce a Resource whose outer dimensions are decomposed down to dimension D-axis.

Note: This function is not necessary if we either require fixed dimensionality of function inputs or minimize implicit broadcast, scatter, and gather behavior. Otherwise, we will need to disambiguate decomposition.

scalems.commands.broadcast()
scalems.commands.concatenate(iterable: Iterable[T]) T

Equivalent to reduce(extend_sequence, iterable)

scalems.commands.partial()

Provide an alternative to functools.partial() that plays well with SCALE-MS checkpointing and dispatched execution.

Base classes

class scalems.commands.Subgraph

Base class with which to define Functions in terms of sub-graphs.

Proposed alternative to the subgraph-builder context manager provided by subgraph().

Example:

# Create a subgraph Function with several Variables.
#
# * *simulation* names an input/output Variable.
# * *conformation* names an output Variable.
# * *P* names an internal state and output Variable.
# * *is_converged* names an output Variable.
#
class MyFusedOperation(Subgraph):
    # The Subgraph metaclass applies special handling to these class variables
    # because of their type.
    simulation = Subgraph.InputOutputVariable(simulate)
    conformation = Subgraph.OutputVariable(default=simulation.conformation)
    P = Subgraph.OutputVariable(default=scalems.float(0., shape=(N, N)))
    is_converged = Subgraph.OutputVariable(default=False)

    # Update the simulation input at the beginning of an iteration.
    simulation.update(modify_input(input=simulation, conformation=conformation))

    # The Subgraph metaclass will hide these variables from clients.
    md = simulate(input=simulation)
    allframes = scalems.concatenate(md.trajectory)
    adaptive_msm = analysis.msm_analyzer(allframes, P)

    # Update Variables at the end of an iteration.
    simulation.update(md)
    P.update(adaptive_msm.transition_matrix)
    conformation.update(adaptive_msm.conformation)
    is_converged.update(adaptive_msm.is_converged)

    # That's all. The class body defined here is passed to the Subgraph
    # metaclass to generate the actual class definition, which will be
    # a SCALE-MS compatible Function that supports a (hidden) iteration
    # protocol, accessible with the `while_loop` dynamic Function.

loop = scalems.while_loop(function=MyFusedOperation,
                          condition=scalems.logical_not(MyFusedOperation.is_converged),
                          simulation=initial_input)
loop.run()

Logging

Python logging facilities use the built-in logging module.

Upon import, the scalems package sets a placeholder “NullHandler” to block propagation of log messages to the handler of last resort (and to sys.stderr).

If you want to see logging output on sys.stderr, attach a logging.StreamHandler to the ‘scalems’ logger.

Example:

character_stream = logging.StreamHandler()
# Optional: Set log level.
logging.getLogger('scalems').setLevel(logging.DEBUG)
character_stream.setLevel(logging.DEBUG)
# Optional: create formatter and add to character stream handler
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
character_stream.setFormatter(formatter)
# add handler to logger
logging.getLogger('scalems').addHandler(character_stream)

To handle log messages that are issued while importing scalems and its submodules, attach the handler before importing scalems. Note that if scalems.radical will be used, you should import radical.pilot before importing logging to avoid spurious warnings.

Refer to submodule documentation for hierarchical loggers to allow granular control of log handling (e.g. logging.getLogger('scalems.radical')). Refer to the Python logging module for information on connecting to and handling logger output.

Exceptions

Exceptions thrown by SCALE-MS are catchable as scalems.ScaleMSException.

Additional common exceptions are defined in this module. scalems submodules may define additional exceptions, but all will be derived from exceptions specified in scalems.exceptions.

exception scalems.exceptions.APIError[source]

Specified interfaces are being violated.

exception scalems.exceptions.ContextError[source]

A Context operation could not be performed.

exception scalems.exceptions.DispatchError[source]

SCALE-MS is unable to execute work or manage data in the requested environment.

exception scalems.exceptions.DuplicateKeyError[source]

An identifier is being reused in a situation where this is not supported.

exception scalems.exceptions.InternalError[source]

An otherwise unclassifiable error has occurred (a bug).

Please report the bug at https://github.com/SCALE-MS/scale-ms/issues

exception scalems.exceptions.MissingImplementationError[source]

The expected feature is not available.

This indicates a bug or incomplete implementation. If error message does not cite an existing tracked issue, please file a bug report. https://github.com/SCALE-MS/scale-ms/issues

exception scalems.exceptions.ProtocolError[source]

A behavioral protocol has not been followed correctly.

exception scalems.exceptions.ProtocolWarning[source]

Unexpected behavior is detected that is not fatal, but which may indicate a bug.

exception scalems.exceptions.ScaleMSError[source]

Base exception for scalems package errors.

Users should be able to use this base class to catch errors emitted by SCALE-MS.

exception scalems.exceptions.ScaleMSWarning[source]

Base Warning for scalems package warnings.

Users and testers should be able to use this base class to filter warnings emitted by SCALE-MS.

exception scalems.exceptions.ScopeError[source]

A command or reference is not valid in the current scope or Context.

scalems.exceptions.deprecated(explanation)[source]

Mark a deprecated definition.

Wraps a callable to issue a DeprecationWarning when called.

Use as a parameterized decorator:

@deprecated("func is deprecated because...")
def func():
    ...
Parameters:

explanation (str) –

class scalems.file.DataLocalizationError[source]

The requested file operation is not possible in the given context.

This may be because a referenced file exists in a different FileStore than the current default. Note that data localization is a potentially expensive operation and so must be explicitly requested by the user.

__init__(*args, **kwargs)
__new__(**kwargs)