Source code for scalems.subprocess._subprocess

"""Define the ScaleMS Subprocess command.

`scalems.executable` is used to execute a program in one (or more) subprocesses.
It is an alternative to the built-in Python `subprocess.Popen` or `asyncio.create_subprocess_exec`
with extensions to better support ScaleMS execution dispatching and ensemble data flow.

The core task is represented by `scalems.subprocess.SubprocessTask`, but also
requires the definition of `scalems.subprocess.SubprocessInput` and
`scalems.subprocess.SubprocessResult`.



In the first iteration, we can use dataclasses.dataclass to define input/output data structures
in terms of standard types. In a follow-up, we can use a scalems metaclass to define them
in terms of Data Descriptors that support mixed scalems.Future and native constant data types.
"""

__all__ = ["executable", "Subprocess", "SubprocessInput", "SubprocessResult"]

import dataclasses
import json
import logging
import typing
from pathlib import Path  # We probably need a scalems abstraction for Path.

import scalems.workflow
from scalems.exceptions import InternalError
from scalems.serialization import encode
from ..unique import next_monotonic_integer
from ..exceptions import APIError
from ..exceptions import MissingImplementationError
from ..workflow import WorkflowManager

logger = logging.getLogger(__name__)
logger.debug("Importing {}".format(__name__))


class OutputFile(dict):
    """Placeholder for output files.

    The initial implementation of OutputFile does not provide access to the
    created output file.

    The actual filename is managed by SCALE-MS to avoid namespace collisions.

    Arguments:
        label (str): Optional user-friendly identifier for locating a reference in the managed workflow.
        suffix (str): Optional filename suffix for the generated filename.

    In a future implementation, we may allow instances of OutputFile to transform
    into workflow references that are dependent on the task under construction.
    """

    def __init__(self, label=None, suffix=""):
        super().__init__()
        self["label"] = label
        self["suffix"] = suffix

    @property
    def label(self):
        return self.get("label", None)

    @property
    def suffix(self):
        return self.get("suffix", "")


# TODO: what is the mechanism for registering a command implementation in a new Context?
# TODO: What is the relationship between the command factory and the command type? Which parts need to be importable?


# TODO: input data typing.
@dataclasses.dataclass(frozen=True)
class SubprocessInput:
    # TODO: Move input documentation to Input class docs.
    argv: typing.Sequence[str]
    inputs: typing.Mapping[str, Path] = dataclasses.field(default_factory=dict)
    outputs: typing.Mapping[str, OutputFile] = dataclasses.field(default_factory=dict)
    stdin: typing.Iterable[str] = ()
    environment: typing.Mapping[str, typing.Union[str, None]] = dataclasses.field(default_factory=dict)
    # For now, let's just always enable stdout/stderr
    stdout: Path = dataclasses.field(default=Path("stdout"))
    stderr: Path = dataclasses.field(default=Path("stderr"))
    resources: typing.Mapping[str, typing.Any] = dataclasses.field(default_factory=dict)


# Register a director for SubprocessInput workflow items.
# TODO: Link to deserializer behavior.
# TODO: Normalize task_builder protocol.
# TODO: Generate from class decorator.
# TODO: Need a generic TaskView class for clients with reference to managed elements.
@scalems.workflow.workflow_item_director_factory.register
def _(item: SubprocessInput, *, manager: WorkflowManager, label: str = None):
    assert isinstance(manager, WorkflowManager)

    def director(*args, **kwargs):
        if len(args) > 0:
            # TODO: Reconsider reasonable exceptions.
            raise TypeError("Unexpected positional arguments.")
        if len(kwargs) > 0:
            raise TypeError("Unexpected key word arguments: {}".format(", ".join(kwargs.keys())))
        # TODO: Checksum with scalems utility, don't just use native Python hash.
        uid = hash(item)
        if uid in manager.tasks:
            # TODO: Consider whether this is the correct behavior
            return manager.tasks[uid]
        else:
            manager.tasks[uid] = item
            return manager.tasks[uid]

    return director


@dataclasses.dataclass(frozen=True)
class SubprocessResult:
    # file: Field(Path)
    # exitcode: Field(int)
    # TODO: Can we use None instead of os.devnull to indicate non-presence of stdout/stderr?
    exitcode: int
    stdout: Path
    stderr: Path
    file: typing.Mapping[str, Path]


class SubprocessTask:
    """Describe the type of resource provided by a Subprocess command."""

    @classmethod
    def scoped_identifier(cls):
        # TODO: Consider either deriving from the `import` identifier,
        #       or defining a more sophisticated protocol for name resolution.
        return ("scalems", "subprocess", "SubprocessTask")

    @classmethod
    def identifier(cls):
        return ".".join(cls.scoped_identifier())

    @classmethod
    def input_type(cls) -> type:
        return SubprocessInput

    @classmethod
    def result_type(cls) -> type:
        return SubprocessResult


# TODO: helpers / ABCs for Serializeable and Encodable.


# TODO: Instances must have a way to map to the owning Context and a task implementation.
# It is reasonable to allow the WorkflowContext to produce an object satisfying a common
# ItemView interface, plus additional interface aspects as specified by the workflow item
# developer in e.g. SubprocessResourceType.
class Subprocess:
    @classmethod
    def resource_type(cls):
        # Note: we return an instance to better reflect the documented object
        # model and to allow for future contextual information, such as shape.
        return SubprocessTask()

    # TODO: Remove uid parameter. It should be calculated.
    def __init__(self, input: SubprocessInput, uid=None):
        self._bound_input = input
        self._result = None
        self._uid = uid
        if self._uid is None:
            self._uid = next_monotonic_integer().to_bytes(32, "big")

    def input_collection(self):
        return self._bound_input

    def result(self):
        return self._result

    def dependencies(self):
        ...

    def uid(self):
        return self._uid

    def serialize(self) -> str:
        """Encode the task as a JSON record.

        Input and Result will be serialized as references.
        The caller is responsible for serializing existing records
        for bound objects, if they exist.
        """
        record = {}
        record["uid"] = self.uid().hex()
        # "label" not yet supported.
        record["type"] = self.resource_type().scoped_identifier()
        record["input"] = dataclasses.asdict(self._bound_input)  # reference
        record["result"] = dataclasses.asdict(self._result)  # reference
        try:
            serialized = json.dumps(record, default=encode)
        except TypeError as e:
            logger.critical("Missing encoding logic for scalems data. Encoder says " + str(e))
            raise InternalError("Missing serialization support.") from e

        # raise MissingImplementationError('To do...')
        return serialized

    @classmethod
    def deserialize(cls, record: str, context=None):
        """Instantiate a Subprocess Task from a serialized record.

        In general, records should only be deserialized into a WorkflowContext
        that manages a valid work graph, but for early testing, at least,
        we have some standalone use cases.
        """

        # The record may or may not have a bound result.
        # If there is a bound result, it should be added to the workgraph first.
        # return cls()
        raise MissingImplementationError()

    # def __await__(self) -> typing.Generator[typing.Any, None, SubprocessResult]:
    #     """Implements the asyncio protocol for a coroutine object.
    #
    #     When awaited, query the current context to negotiate dispatching. Note that the
    #     built-in asyncio module acts like a LocalExecutor Context if and only if there
    #     is not an active SCALE-MS Context. SCALE-MS Contexts
    #     set the current context before awaiting.
    #     """
    #     # TODO: implementation registration.
    #     import scalems.context
    #     import scalems.local
    #     import scalems.radical
    #     context = scalems.context.get_scope()
    #     # TODO: dispatching
    #     if isinstance(context, scalems.local.LocalExecutor):
    #         from scalems.local.operations import executable as local_exec
    #         # Note that we need a more sophisticated coroutine object than what we get
    #         # directly from `async def` for command instances that can present output
    #         # in multiple contexts or be transferred from one to another.
    #         self._result = local_exec(self)
    #     elif isinstance(context, scalems.radical.RPExecutor):
    #         from scalems.radical.operations import executable as _rp_exec
    #         self._result = _rp_exec(self)
    #     else:
    #         raise MissingImplementationError(
    #         'Current context {} does not implement scalems.executable'.format(context))
    #
    #     # Allow this function to be a generator function, fulfilling the awaitable protocol.
    #     yield self
    #     # Note that the items yielded are not particularly useful, but the position of the
    #     # yield expression(s) is potentially useful for debugging or multitasking. Depending
    #     # on the implementation of the event loop, multiple yields may allow a way to avoid
    #     # deadlocks. For instance, we may choose to yield at each iteration of a loop to
    #     # provide or read PIPE-based stdin or stdout. `await` should accomplish the same thing,
    #     # but the generator protocol may improve debugging and generality.
    #     # The point of "yield" is more interesting when we use "yield" as an expression in the
    #     # yielding code, which allows values to be passed in to the coroutine at the evaluation
    #     # of the yield expression
    #     # (e.g. https://docs.python.org/3/howto/functional.html#passing-values-into-a-generator
    #     # but not that the coroutine protocol is slightly different, per https://www.python.org/dev/peps/pep-0492/)
    #     # For instance, this could be a mechanism for nesting event loops or dispatching contexts
    #     # while maintaining a heart-beat or other command-channel-like wrapper.
    #
    #     if not isinstance(self._result, SubprocessResult):
    #         raise RuntimeError('Result was not delivered!')
    #     return self._result


# Register a director for Subprocess workflow items.
# TODO: Wrap this in the decorator or metaclass used for TaskTypes.
@scalems.workflow.workflow_item_director_factory.register
def _(item: Subprocess, *, manager: scalems.workflow.WorkflowManager, label: str = None):
    if not isinstance(manager, scalems.workflow.WorkflowManager):
        raise APIError(f"No director for {repr(manager)}")

    def director(*args, **kwargs):
        if len(args) > 0:
            # TODO: Reconsider reasonable exceptions.
            raise TypeError("Unexpected positional arguments.")
        if len(kwargs) > 0:
            raise TypeError("Unexpected key word arguments: {}".format(", ".join(kwargs.keys())))

        # Note regarding registering task implementation functions:
        # scalems.subprocess is a very special kind of task. Each execution
        # environment needs to provide a specialized implementation for the
        # foreseeable future. It does not make sense for this module to provide
        # a default Python function reference for a task factory or callable.
        task_view = manager.add_item(item)

        return task_view

    return director


[docs]def executable(*args, manager: scalems.workflow.WorkflowManager = None, **kwargs): """Execute a command line program. Configure an executable to run in one (or more) subprocess(es). Executes when run in an execution Context, as part of a work graph. Process environment and execution mechanism depends on the execution environment, but is likely similar to (or implemented in terms of) the POSIX execvp system call. Shell processing of *argv* is disabled to improve determinism. This means that shell expansions such as environment variables, globbing (``*``), and other special symbols (like ``~`` for home directory) are not available. This allows a simpler and more robust implementation, as well as a better ability to uniquely identify the effects of a command line operation. If you think this disallows important use cases, please let us know. Arguments: manager: Workflow manager to which the work should be submitted. args: a tuple (or list) to be the subprocess arguments, including the executable *args* is required. Additional key words are optional. Other Parameters: outputs (Mapping): labeled output files, mapping command line flag to one (or more) filenames. inputs (Mapping): labeled input files, mapping command line flag to one (or more) filenames. environment (Mapping): environment variables to be set in the process environment. stdin (str): source for posix style standard input file handle (default None). stdout (str): Capture standard out to a filesystem artifact, even if it is not consumed in the workflow. stderr (str): Capture standard error to a filesystem artifact, even if it is not consumed in the workflow. resources (Mapping): Name additional required resources, such as an MPI environment. .. todo:: Support POSIX sigaction / IPC traps? .. todo:: Consider dataclasses.dataclass types to replace reusable/composable function signatures. Program arguments are iteratively added to the command line with standard Python iteration, so you should use a tuple or list even if you have only one parameter. I.e. If you provide a string with ``arguments="asdf"`` then it will be passed as ``... "a" "s" "d" "f"``. To pass a single string argument, ``arguments=("asdf")`` or ``arguments=["asdf"]``. *inputs* and *outputs* should be a dictionary with string keys, where the keys name command line "flags" or options. Note that the Execution Context (e.g. RPContext, LocalContext, DockerContext) determines the handling of *resources*. Typical values in *resources* may include * procs_per_task (int): Number of processes to spawn for an instance of the *exec*. * threads_per_proc (int): Number of threads to allocate for each process. * gpus_per_task (int): Number of GPU devices to allocate for and instance of the *exec*. * launcher (str): Task launch mechanism, such as ``mpiexec``. Returns: Output collection contains *exitcode*, *stdout*, *stderr*, *file*. The *file* output has the same keys as the *outputs* key word argument. Example: Execute a command named ``exe`` that takes a flagged option for input and output file names (stored in a local Python variable ``my_filename`` and as the string literal ``'exe.out'``) and an ``origin`` flag that uses the next three arguments to define a vector. >>> my_filename = "somefilename" >>> command = scalems.executable( ... ('exe', '--origin', 1.0, 2.0, 3.0), ... inputs={'--infile': scalems.file(my_filename)}, ... outputs={'--outfile': scalems.file('exe.out')}) >>> assert hasattr(command, 'file') >>> import os >>> assert os.path.exists(command.file['--outfile'].result()) >>> assert hasattr(command, 'exitcode') TODO: Consider input/output files that do not appear on the command line, but which must figure into data flow. """ if manager is None: manager = scalems.workflow.get_scope() # TODO: Figure out a reasonable way to check and catch invalid input # through a dispatcher. # subprocess_input = context.add(Subprocess.input_type(), *args, **kwargs) input_type = Subprocess.resource_type().input_type() if not isinstance(input_type, type): raise InternalError( "Bug: {} is not coded correctly for the {}.input_type() interface.".format( __name__, str(type(Subprocess.resource_type())) ) ) # TODO: Add input separately. First, just add the Subprocess object. # Note: static type checkers may not be able to resolve that `input_type is # SubprocessInput` for argument checking. Provide local object to the context and # replace local reference with a view to the workflow item. # subprocess_input = _context.add_to_workflow( # context, Subprocess.resource_type().input_type(), *args, **kwargs) bound_input = SubprocessInput(*args, **kwargs) director = scalems.workflow.workflow_item_director_factory(Subprocess, manager=manager) # Design note: at some point, dynamic workflows will require thread-safe # workflow editing context. We could either block on acquiring the editor # context, use an async context manager, or hide the possible async # aspect by letting the return value of the director be awaitable. # TODO: This would be more readable in a form like # `workflow.add_item(Subprocess, bound_input)` try: task_view: scalems.workflow.ItemView = director(input=bound_input) except TypeError as e: logger.error("Invalid input in SubprocessInput: " + str(e)) raise except json.JSONDecodeError as e: logger.critical("Malformed data: " + e.msg) raise InternalError("Bug: internal data is not being conditioned properly") from e except Exception as e: logger.critical("Unhandled " + repr(e)) raise return task_view