"""Python invocation of SCALE-MS workflow scripts.
Refer to :doc:`invocation` for user-level documentation about SCALE-MS command line
invocation.
The base command line parser is provided by :py:func:`scalems.invocation.base_parser`,
extended (optionally) by the :ref:`backend`, and further extended by
:py:func:`scalems.invocation.run`. Get usage for a particular backend with
reference to the particular module.
Workflow Manager
----------------
Managed workflows are dispatched to custom execution back-ends through
:py:func:`run`, which accepts a WorkflowManager creation function as its argument.
Most of the customization hooks are provided through the implementing module.
I.e. the *manager_factory* argument has its ``__module__`` attribute queried
to get the implementing *module*.
.. py:currentmodule:: <module>
An execution back end :py:mod:`<module>` supports this invocation model by
implementing a ``__main__`` method that passes a WorkflowManager creation function
and an *executor_factory* to `scalems.invocation.run`.
For example, in the `scalems.radical` module,
:file:`scalems/radical/__main__.py` contains::
if __name__ == '__main__':
sys.exit(scalems.invocation.run(
manager_factory=scalems.radical.workflow_manager,
executor_factory=scalems.radical.executor_factory))
Required Attributes
~~~~~~~~~~~~~~~~~~~
Execution back-end modules (modules providing a *manager_factory*) *MUST* provide
the following module attribute(s).
.. py:attribute:: parser
:noindex:
`argparse.ArgumentParser` for the execution module. For correct composition,
see `scalems.invocation.make_parser()`.
Optional Attributes
~~~~~~~~~~~~~~~~~~~
Execution back-end modules (modules providing a *manager_factory*) *MAY* provide
the following module attribute(s) for hooks in `run()`
.. py:attribute:: logger
:noindex:
`logging.Logger` instance to use for the invocation.
.. py:attribute:: configuration
:noindex:
A callable to acquire a runtime module configuration.
If present in *module*, `run()` calls ``module.configuration(args)``,
where *args* is a :py:class:`argparse.Namespace` object created by the
module's *parser*. The resulting configuration object is provided when
invoking the *executor_factory*.
.. uml::
title Execution back ends with scalems.invocation
participant "workflow script" as script
box "SCALE-MS framework" #honeydew
participant "SCALE-MS API" as scalems.Runtime
participant WorkflowManager as client_workflowmanager
end box
box "SCALE-MS execution backend" #linen
participant scalems.radical <<execution module>>
end box
autoactivate on
-> scalems.radical: python -m scalems.radical ...
scalems.radical -> scalems.Runtime: scalems.invocation.run(workflow_manager)
scalems.Runtime -> scalems.radical: configuration()
return
scalems.Runtime -> scalems.radical: workflow_manager(loop)
note left
Initialize with event loop
end note
scalems.radical -> client_workflowmanager **: <<create>>
activate client_workflowmanager
scalems.Runtime <-- scalems.radical:
scalems.Runtime -> script: <<runpy>>
return @scalems.app
scalems.Runtime -> scalems.Runtime: run_dispatch(work, context)
...Manage run time and dispatch work....
scalems.Runtime --> scalems.Runtime: loop.run_until_complete()
scalems.Runtime --> scalems.radical: SystemExit.code
destroy client_workflowmanager
deactivate scalems.Runtime
<-- scalems.radical: sys.exit
"""
import argparse
import asyncio
import functools
import os
import runpy
import sys
import threading
import typing
import scalems.exceptions
import scalems.execution
import scalems.workflow
from scalems import ScriptEntryPoint
_reentrance_guard = threading.Lock()
# We can import scalems.context and set module state before using runpy to
# execute the script in the current process. This allows us to preconfigure a
# default execution manager.
# TODO: Consider whether we want launched scripts to have `__name__` set to `__main__` or not.
# TODO: Consider whether we want to parse execution module arguments, including handling chained `-m`.
# Consider generalizing this boilerplate.
# TODO: Support REPL (e.g. https://github.com/python/cpython/blob/3.8/Lib/asyncio/__main__.py)
def run_dispatch(work, *, workflow_manager: scalems.workflow.WorkflowManager, executor_factory, config):
"""Run the provided work in the execution dispatching context.
Parameters:
config: Implementation-specific runtime configuration to be provided to *executor_factory*.
executor_factory: Implementation-specific callable to get a run time work manager.
workflow_manager (scalems.workflow.WorkflowManager) : an active workflow manager (with a running event loop)
work (typing.Callable) : An "app" function to run within the scope of an active execution dispatcher.
Initially, we assume that *work* is a `scalems.app` decorated function, though
this is not technically required in order to call this function directly.
Semantics and typing will presumably be clarified and tightened in the future.
"""
async def _dispatch(_work):
async with scalems.execution.dispatch(workflow_manager, executor_factory=executor_factory, params=config):
# Add work to the queue
_work()
# Return an iterable of results.
# for task in context.tasks: ...
...
_loop = workflow_manager.loop()
_coro = _dispatch(work)
# TODO: Name the task, if `work` has suitable attribute(s).
_task = _loop.create_task(_coro)
_result = _loop.run_until_complete(_task)
return _result
class _ManagerT(typing.Protocol):
"""Primary argument type for scalems.invocation.run().
An imported callable object with the signature defined by this Protocol.
We use a typing.Protocol instead of typing.Callable to emphasize that the object of
this type is an object with the full interface described for User-defined functions in
the Python Data Model for Callable types.
See https://docs.python.org/3/reference/datamodel.html#the-standard-type-hierarchy.
"""
def __call__(self, loop: asyncio.AbstractEventLoop) -> scalems.workflow.WorkflowManager:
...
[docs]def run(*, manager_factory: _ManagerT, executor_factory, _loop: asyncio.AbstractEventLoop = None): # noqa: C901
"""Execute boilerplate for scalems entry point scripts.
Provides consistent logic for command line invocation of a script with a main `scalems.app` function.
Supports invocation of the following form with minimal ``backend/__main__.py``
python -m scalems.radical --venv=/path/to/venv --resource=local.localhost myscript.py arg1 --foo bar
See `scalems.invocation` module documentation for details about the expected *manager_factory* module
interface.
Refer to module documentation for the execution backend for command line arguments.
Unrecognized command line arguments will be passed along to the called script through
modification to `sys.argv`.
Args:
executor_factory: Implementation-specific callable to get a run time work manager.
manager_factory (typing.Callable[..., scalems.workflow.WorkflowManager]) : workflow manager creation function
"""
safe = _reentrance_guard.acquire(blocking=False)
if not safe:
raise RuntimeError("scalems launcher is not reentrant.")
try:
module = sys.modules[manager_factory.__module__]
parser = getattr(module, "parser", None)
if parser is None:
raise scalems.exceptions.APIError("Execution manager modules must provide a `parser` module member.")
# Strip the current __main__ file from argv. Collect arguments for this script
# and for the called script.
args, script_args = parser.parse_known_args(sys.argv[1:])
if args.pycharm:
try:
# noinspection PyUnresolvedReferences
import pydevd_pycharm
pydevd_pycharm.settrace("host.docker.internal", port=12345, stdoutToServer=True, stderrToServer=True)
except ImportError:
...
if not os.path.exists(args.script):
# TODO: Support REPL (e.g. https://github.com/python/cpython/blob/3.8/Lib/asyncio/__main__.py)
raise RuntimeError("Need a script to execute.")
level = args.log_level
if level is not None:
import logging
character_stream = logging.StreamHandler()
# Optional: Set log level.
logging.getLogger("scalems").setLevel(level)
character_stream.setLevel(level)
# Optional: create formatter and add to character stream handler
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
character_stream.setFormatter(formatter)
# add handler to logger
logging.getLogger("scalems").addHandler(character_stream)
logger = getattr(module, "logger")
configure_module = getattr(module, "configuration", None)
if configure_module is not None:
config = configure_module(args)
logger.debug(f"Configuration: {config}")
else:
config = None
sys.argv = [args.script] + script_args
if _loop is not None:
# Allow event loop to be provided (for debugging and testing purposes).
loop = _loop
else:
# Start the asyncio event loop on behalf of the client.
# We want to do this exactly once per invocation, and we do not want the scalems
# package module or any particular scalems object to own the event loop.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
# TODO: Clarify event loop management scheme.
# Do we want scripts to be like "apps" that get called with asyncio.run(),
# should we effectively reimplement asyncio.run through scalems.run, or
# should we think about
# [ast.PyCF_ALLOW_TOP_LEVEL_AWAIT](https://docs.python.org/3/whatsnew/3.8.html#builtins)
# See also: https://docs.python.org/3/library/asyncio-runner.html
# Execute the script in the current process.
# TODO: Can we support mixing invocation with pytest?
exitcode = 0
try:
with scalems.workflow.scope(manager_factory(loop)) as manager:
try:
globals_namespace = runpy.run_path(args.script)
main = None
for name, ref in globals_namespace.items():
if isinstance(ref, ScriptEntryPoint):
if main is not None:
raise scalems.exceptions.DispatchError(
"Multiple apps in the same script is not (yet?) supported."
)
main = ref
ref.name = name
if main is None:
raise scalems.exceptions.DispatchError("No scalems.app callables found in script.")
# The scalems.run call hierarchy goes through utility.run
# to utility._run to AsyncWorkflowManager.run,
# which then goes through WorkflowManager.dispatch().
# We should
# (a) clean this up,
# (b) return something sensible,
# (c) clarify error behavior.
# We probably don't want to be calling scalems.run() from the entry point script.
# `scalems.run()` and `python -m scalems.backend script.py` are alternative
# ways to separate the user from the `with Manager.dispatch():` block.
# For the moment, we can disable the `scalems.run()` mechanism, I think.
# cmd = scalems.run(main, context=context)
logger.debug("Starting asyncio run()")
try:
run_dispatch(main, workflow_manager=manager, executor_factory=executor_factory, config=config)
except Exception as e:
logger.exception("Unhandled exception in scalems runner calling dispatch(): " + str(e))
raise e
finally:
# Warning: This assumes that the `manager` is done with the event loop
# by the end of `run_dispatch`, but this may not be enforced.
# We may need to rearrange things or strengthen the protocol.
loop.close()
assert loop.is_closed()
logger.debug("Finished asyncio run()")
except SystemExit as e:
exitcode = e.code
except Exception as e:
print(f"{module} encountered Exception: {repr(e)}")
if exitcode == 0:
exitcode = 1
if exitcode != 0:
raise SystemExit(exitcode)
finally:
_reentrance_guard.release()
[docs]@functools.cache
def base_parser(add_help=False):
"""Get the base scalems argument parser.
Provides a base argument parser for scripts or other module parsers.
By default, the returned ArgumentParser is created with ``add_help=False``
to avoid conflicts when used as a *parent* for a parser more local to the caller.
If *add_help* is provided, it is passed along to the ArgumentParser created
in this function.
See Also:
https://docs.python.org/3/library/argparse.html#parents
"""
from . import __version__ as _scalems_version
_parser = argparse.ArgumentParser(add_help=add_help)
_parser.add_argument("--version", action="version", version=f"scalems version {_scalems_version}")
_parser.add_argument(
"--log-level",
type=str.upper,
choices=["CRITICAL", "ERROR", "WARNING", "INFO", "DEBUG"],
help="Optionally configure console logging to the indicated level.",
)
_parser.add_argument(
"--pycharm",
action="store_true",
default=False,
help="Attempt to connect to PyCharm remote debugging system, where appropriate.",
)
_parser.add_argument(
"script",
metavar="script-to-run.py",
type=str,
help="The workflow script. Must contain a function decorated with `scalems.app`",
)
return _parser
[docs]def make_parser(module: str, parents: typing.Iterable[argparse.ArgumentParser] = None):
"""Make a SCALE-MS Execution Module command line argument parser.
Args:
module: Name of the execution module.
parents: Optional list of parent parsers.
If *parents* is not provided, `scalems.invocation.base_parser` is used to generate a default.
If *parents* _is_ provided, one of the provided parents **should** inherit from
`scalems.invocation.base_parser` using the *parents* parameter of
:py:class:`argparse.ArgumentParser`.
Notes:
:py:attr:`__package__` module attribute and :py:attr:`__module__` class or
function attribute are convenient for programmatically finding the *module* argument.
"""
if parents is None:
parents = [base_parser()]
_parser = argparse.ArgumentParser(
prog=module,
description=f"Command line interface for `{module}` workflow execution module.",
usage=f"python -m {module} <{module} args> script-to-run.py.py " "<script args>",
parents=parents,
)
return _parser