"""Basic SCALE-MS Commands.
Core commands for building workflows.
The availability and behavior of these commands is part of the SCALE-MS
specification.
"""
# We expect these commands to be identified at the top level scalems namespace
# with e.g. ('scalems', 'logical_not'). We keep them in a submodule just
# to be tidy.
__all__ = [
"desequence",
"extend_sequence",
"gather",
"logical_and",
"logical_not",
"map",
"reduce",
"resequence",
"scatter",
"subgraph",
"while_loop",
]
import typing
from scalems import exceptions
[docs]def subgraph(variables=None):
"""Allow operations to be configured in a sub-context.
The object returned functions as a Python context manager. When entering the
context manager (the beginning of the ``with`` block), the object has an
attribute for each of the named ``variables``. Reading from these variables
gets a proxy for the initial value or its update from a previous loop iteration.
At the end of the ``with`` block, any values or data flows assigned to these
attributes become the output for an iteration.
After leaving the ``with`` block, the variables are no longer assignable, but
can be called as bound methods to get the current value of a variable.
When the object is run, operations bound to the variables are ``reset`` and
run to update the variables.
Example::
@scalems.function_wrapper(output={'data': float})
def add_float(a: float, b: float) -> float:
return a + b
@scalems.function_wrapper(output={'data': bool})
def less_than(lhs: float, rhs: float, output=None):
output.data = lhs < rhs
subgraph = scalems.subgraph(variables={'float_with_default': 1.0, 'bool_data': True})
with subgraph:
# Define the update for float_with_default to come from an add_float operation.
subgraph.float_with_default = add_float(subgraph.float_with_default, 1.).output.data
subgraph.bool_data = less_than(lhs=subgraph.float_with_default, rhs=6.).output.data
operation_instance = subgraph()
operation_instance.run()
assert operation_instance.values['float_with_default'] == 2.
loop = scalems.while_loop(function=subgraph, condition=subgraph.bool_data)
handle = loop()
assert handle.output.float_with_default.result() == 6
"""
raise exceptions.MissingImplementationError()
[docs]def logical_not(value):
"""Negate boolean inputs."""
raise exceptions.MissingImplementationError()
[docs]def logical_and(iterable):
"""Produce a boolean value resulting from the logical AND of the elements of the input.
The API does not specify whether the result is published before all values
of *iterable* have been inspected, but (for checkpointing, reproducibility,
and simplicity of implementation), the operation is not marked "complete"
until all inputs have been resolved.
"""
raise exceptions.MissingImplementationError()
[docs]def while_loop(*, function, condition, max_iteration=10, **kwargs):
"""Generate and run a chain of operations such that condition evaluates True.
Returns a reference that acts like a single operation in the current
work graph, but which is a proxy to the operation at the end of a dynamically generated chain
of operations. At run time, *condition* is evaluated for the last element in
the current chain. If *condition* evaluates False, the chain is extended and
the next element is executed. When *condition* evaluates True, the object
returned by `while_loop` becomes a proxy for the last element in the chain.
TODO: Allow external access to intermediate results / iterations?
Arguments:
function: a callable that produces an instance of an operation when called (with ``**kwargs``, if provided).
condition: a call-back that returns a boolean when provided an operation instance.
max_iteration: execute the loop no more than this many times (default 10)
*function* (and/or *condition*) may be a "partial function" and/or stateful function object
(such as is produced by subgraph()) in order to retain state between iterations.
TODO: Specify requirements of *function* and *condition*. (Picklable? scalems.Function compatible?)
TODO: Describe how *condition* could be fed asynchronously from outside of the wrapped *function*.
TODO: Does *condition* need to evaluate directly to a native bool, or do we
detect whether a Future is produced, and call ``result()`` if necessary?
Warning:
*max_iteration* is provided in part to minimize the cost of bugs in early
versions of this software. The default value may be changed or
removed on short notice.
"""
raise exceptions.MissingImplementationError()
[docs]def desequence(iterable):
"""Remove sequencing from an iterable.
Given an input of shape (N, [M, ...]), produce an iterable of resources
(of shape (M,...)). Ordering information from the outer dimension is lost,
even in an ensemble scope, in which parallel data flows generally retain
their ordering.
This allows iterative tools (e.g. `map`) to use unordered or asynchronous
iteration on resource slices as they become available.
"""
raise exceptions.MissingImplementationError()
[docs]def resequence(keys, collection):
"""Set the order of a collection.
Use the ordered list of keys to define the sequence of the elements in the
collection.
In addition to reordering an ordered collection, this function is useful
for applying a sequence from one part of a work flow to data that has been
processed asynchronously.
"""
raise exceptions.MissingImplementationError()
[docs]def gather(iterable):
"""Convert an iterable or decomposable collection to a complete collection.
Use to synchronize/localize data. Reference with ambiguous dimensionality or
decomposable dimensions are converted to references with concrete dimensionality
and non-decomposable dimensions. When the output of *gather* is used as input
to another function, the entire results of *iterable* are available in the
operation context.
This function should generally not be necessary when defining a work flow,
but may be necessary to disambiguate data flow topology, such as when N
operations should each consume an N-dimensional resource in its entirety,
in which case, *gather* is implicitly an *all_gather*.
Alternatives:
For decomposable dimensions, we may want to allow a named *scope* for
the decomposition. Then, explicit *gather* use cases would be replaced
by an annotation to change the *scope* for a dimension, and, as part of
a function implementation's assertion of its decomposability, a function
could express its ensemble/decomposition behavior in terms of a particular
target scope of an operation.
"""
raise exceptions.MissingImplementationError()
[docs]def scatter(iterable, axis=1):
"""Explicitly decompose data.
For input with outer dimension size N and dimensionality D,
or for an iterable of length N and elements with dimensionality D-1,
produce a Resource whose outer dimensions are decomposed down to dimension
D-*axis*.
Note: This function is not necessary if we either require fixed dimensionality
of function inputs or minimize implicit broadcast, scatter, and gather behavior.
Otherwise, we will need to disambiguate decomposition.
"""
raise exceptions.MissingImplementationError()
[docs]def reduce(function, iterable):
"""Repeatedly apply a function.
For an Iterable[T] and a function that maps (T, T) -> T, apply the function
to map Iterable[T] -> T. Assumes *function* is associative.
If *iterable* is ordered, commutation of operands is preserved,
but associativity of the generated operations is not specified.
If *iterable* is unordered, the caller is responsible for ensuring that
*function* obeys the commutative property.
Compare to :py:func:`functools.reduce`
"""
raise exceptions.MissingImplementationError()
[docs]def extend_sequence(sequence_a, sequence_b):
"""Combine sequential data into a new sequence."""
raise exceptions.MissingImplementationError()
[docs]def map(function, iterable, shape=None):
"""Generate a collection of operations by iteration.
Apply *function* to each element of *iterable*.
If *iterable* is ordered, the generated operation collection is ordered.
If *iterable* is unordered, the generated operation collection is unordered.
"""
raise exceptions.MissingImplementationError()
class Callable(typing.Protocol):
"""This protocol describes the required function signature for a SCALE-MS command."""
def __call__(self):
...
class Command(typing.Protocol):
"""Protocol describing a SCALE-MS Command."""
def command(obj: Callable) -> Command:
"""Decorate a callable to create a SCALE-MS Command."""
raise exceptions.MissingImplementationError
[docs]def function_wrapper(output: dict = None):
# Suppress warnings in the example code.
# noinspection PyUnresolvedReferences
"""Generate a decorator for wrapped functions with signature manipulation.
New function accepts the same arguments, with additional arguments required by
the API.
The new function returns an object with an ``output`` attribute containing the
named outputs.
Example:
>>> @function_wrapper(output={'spam': str, 'foo': str})
... def myfunc(parameter: str = None, output=None):
... output.spam = parameter
... output.foo = parameter + ' ' + parameter
...
>>> operation1 = myfunc(parameter='spam spam')
>>> assert operation1.spam.result() == 'spam spam'
>>> assert operation1.foo.result() == 'spam spam spam spam'
Arguments:
output (dict): output names and types
If ``output`` is provided to the wrapper, a data structure will be passed to
the wrapped functions with the named attributes so that the function can easily
publish multiple named results. Otherwise, the ``output`` of the generated operation
will just capture the return value of the wrapped function.
"""
raise exceptions.MissingImplementationError()
[docs]def poll():
"""Inspect the execution status of an operation.
Inspects the execution graph state in the current context at the time of
execution.
Used in a work graph, this adds a non-deterministic aspect, but adds truly
asynchronous adaptability.
"""
raise exceptions.MissingImplementationError()