Source code for scalems.messages

"""Intra-component communication support.

Message types and protocol support for control signals,
queue management, and work dispatching.

The `QueueItem` classes are simple key-value pairs for use in local message-passing
queues.

The :py:class:`~scalems.messages.Command` classes are richer structures that
ensure (de)serializability for use between the client and (remote) runtime managers.
"""
__all__ = ("QueueItem", "CommandQueueControlItem", "CommandQueueAddItem")

import logging
import typing

from scalems.exceptions import APIError

logger = logging.getLogger(__name__)
logger.debug("Importing {}".format(__name__))


[docs]class QueueItem(dict, typing.MutableMapping[str, typing.Any]): """Queue items are either workflow items or control messages. Control messages are indicated by the key ``'command'``, and are described by :py:class:`CommandQueueControlItem`. Workflow items are indicated by the key ``'add_item'``, and are described by :py:class:`CommandQueueAddItem`. """ def _hexify(self): """Allow binary fields to be printable. Non-bytes values are returned without conversion. """ for key, value in self.items(): if isinstance(value, bytes): value = value.hex() yield key, value def __str__(self) -> str: return str(dict(self._hexify()))
[docs]class CommandQueueControlItem(QueueItem, typing.MutableMapping[str, str]): """String-encoded Command for the Executor command queue. Instructions for the `RuntimeManager`, intercepted and processed by :py:func:`scalems.execution.manage_execution()`. Currently, the only supported key is "command". Supported commands may grow to comprise a Compute Provide Interface. """ _allowed: typing.ClassVar = {"command": {"hello", "stop", "version"}} def __setitem__(self, k: str, v: str) -> None: if k in self._allowed: if v in self._allowed[k]: super().__setitem__(k, v) else: raise APIError(f"Unsupported command value: {repr(v)}") else: raise APIError(f"Unsupported command key: {repr(k)}")
[docs]class CommandQueueAddItem(QueueItem, typing.MutableMapping[str, bytes]): """String-encoded add_item command for the Executor command queue. The intended payload is an item to be added to the workflow graph: e.g. an operation, data reference, subgraph, or something meaningful to an :py:class:`~scalems.execution.AbstractWorkflowUpdater`. In practice, the value may just be a token or identifier. In the initial version of the scalems protocol, the value is passed to the item editor factory that is obtained from :py:func:`scalems.execution.RuntimeManager.get_edit_item()`. """ _allowed: typing.ClassVar = {"add_item"} def __setitem__(self, k: str, v: bytes) -> None: if k in self._allowed: if isinstance(v, bytes): super().__setitem__(k, v) else: raise APIError(f"Unsupported add_item value: {repr(v)}") else: raise APIError(f"Unsupported command: {repr(k)}")