Adaptive Markov State Modeling: fixed width ensemble¶

In this example, we use a Python data flow scripting interface to connect PyEmma, a simulator adapter, and some additional Python code to iteratively run and analyze groups of simulations until convergence.

The main script body defines and executes the subgraph. There is obvious opportunity for abstraction, but it may be illustrative in the present case to mix implementation details with algorithmic structure, in part to stimulate discussion and understanding of packaging and component capabilities.

One supporting module (examples/parallel_adaptive_msm/msmtool.py) wraps PyEmma and provides a SCALE-MS compatible msm_analyzer function.

Another supporting module (examples/parallel_adaptive_msm/wrappers) wraps GROMACS command line tools and gmxapi.

The entry point for the researcher’s application is the main script body from examples/parallel_adaptive_msm/workflow.py:

"""
Iterative adaptive Markov State Model refinement.

This script provides an entry point for a workflow implemented in terms of
SCALE-MS data flow and execution management, plus user-provided support tools.

In this example, we use a Python data flow scripting interface to connect
PyEmma, a simulator adapter, and some additional Python code to iteratively
run and analyze groups of simulations until convergence.
"""

import scalems
# Get a set of simulation tools following a common interface convention.
from scalems.wrappers.gromacs import get_trajectory
from scalems.wrappers.gromacs import internal_to_pdb
from scalems.wrappers.gromacs import make_input
from scalems.wrappers.gromacs import modify_input
from scalems.wrappers.gromacs import simulate
# Get a module that provides the `msm_analyzer` function we will be using.
from . import msmtool

# Acquire simulation inputs. Generic data hierarchy is not yet clear. Model,
# method, instance/system/conformation/microstate, and implementation details
# will be reflected. The current example reflects the requirements of GROMACS
# simulation input. Specific input requirements are expressed in the interface
# to the `make_input` wrapper module function.

# GROMACS MD model and method parameters.
# Could start with a list of distinct confs, but here we use a single starting point.
starting_structure = 'input_conf.gro'  # GROMACS structure file.
topology_file = 'input.top'  # GROMACS topology input.
run_parameters = 'params.mdp'  # GROMACS simulation parameters.

initial_simulation_input = make_input(
    simulation_parameters=run_parameters,
    topology=topology_file,
    conformation=starting_structure)

# Set up a parallel pipeline of N=10 simulations, starting from a single input.
num_simulations = 10
# A `broadcast` function is possible, but is not explicitly necessary,
# so we have not specified one yet. See Python reference for discussion.
#    initial_input = scalems.broadcast(initial_simulation_input, shape=(N,))
initial_input = [initial_simulation_input] * num_simulations

# We will need a pdb for MSM building in PyEmma
initial_pdb = internal_to_pdb(starting_structure)

# Build a model with 10 Markov states (clusters in conformation space).
num_clusters = 10

# Get a placeholder object that can serve as a sub context / work graph owner
# and can be used in a control operation.
simulation_and_analysis_iteration = scalems.subgraph(variables={
    'conformation': initial_input,
    'transition_matrix': scalems.ndarray(0., shape=(num_clusters, num_clusters)),
    'is_converged': False})

with simulation_and_analysis_iteration:
    modified_input = modify_input(
        input=initial_input, structure=simulation_and_analysis_iteration.conformation)
    md = simulate(input=modified_input)

    # Collect frames from all output trajectories.
    allframes = scalems.reduce(scalems.extend_sequence, get_trajectory(md))

    # Get the output trajectories and pass to PyEmma to build the MSM
    # Return a stop condition object that can be used in gmx while loop to
    # terminate the simulation
    adaptive_msm = msmtool.msm_analyzer(topfile=initial_pdb,
                                        trajectory=allframes,
                                        transition_matrix=simulation_and_analysis_iteration.transition_matrix)
    # Update the persistent data for the subgraph
    simulation_and_analysis_iteration.transition_matrix = adaptive_msm.transition_matrix
    # adaptive_msm here is responsible for maintaining the ensemble width
    simulation_and_analysis_iteration.conformation = adaptive_msm.conformation
    simulation_and_analysis_iteration.is_converged = adaptive_msm.is_converged

# In the default work graph, add a node that depends on `condition` and
# wraps subgraph.
my_loop = scalems.while_loop(function=simulation_and_analysis_iteration,
                             condition=scalems.logical_not(simulation_and_analysis_iteration.is_converged))
my_loop.run()

The algorithmic details of the above scriplet depend on two supporting modules, given here for completeness.

examples/parallel_adaptive_msm/msmtool.py:

"""
Example of a user-provided Analysis tool wrapper.

Wraps Pyemma to implement a Markov State Model builder and analyzer.
Set up a SCALE-MS compatible operation implementation and export it as the
module function `msm_analyzer`.
"""

import pyemma.coor as coor
import pyemma.msm as msm

import scalems

tol = 0.1


def relative_entropy(P, Q):
    """
    Takes two transition matrices, calculates relative entropy
    """
    # Implementation incomplete
    return rel_entropy_P_Q


class MSMAnalyzer:
    """
    Builds msm from output trajectory
    """

    def __init__(self, molecular_topology_file, trajectory, transition_matrix, num_clusters):
        # Build Markov model with PyEmma
        feat = coor.featurizer(molecular_topology_file)
        X = coor.load(trajectory, feat)
        Y = coor.tica(X, dim=2).get_output()
        k_means = coor.cluster_kmeans(Y, k=num_clusters)
        centroids = get_centroids(k_means)

        markov_model = msm.estimate_markov_model(kmeans.dtrajs, 100)  #

        previous_transition_matrix = transition_matrix
        self.transition_matrix = markov_model.get_transition_matrix()  # figure this out
        self._is_converged = relative_entropy(self.transition_matrix, transition_matrix) < tol

    def is_converged(self):
        return self._is_converged

    def get_transition_matrix(self):
        return self.transition_matrix


# Assuming MSMAnalyzer is an existing tool we do not want to modify,
# create a scalems compatible operation by wrapping with a provided utility.
msm_analyzer = scalems.make_operation(MSMAnalyzer,
                                      inputs=['topfile', 'trajectory', 'P', 'N'],
                                      output=['is_converged', 'transition_matrix']
                                      )

Simulation preparation and output manipulation use command line tools. Simulation is executed with gmxapi.

src/scalems/wrappers/gromacs.py

"""
Gromacs simulation tools.

Preparation and output manipulation use command line tools.
Simulation is executed with gmxapi.
"""
# Declare the public interface of this wrapper module.
__all__ = ['make_input', 'modify_input', 'simulate']

##########################
# New plan:
# Implement basic gmxapi examples directly without generalism or type checking:
# Create simple dictionaries and add them to the workflow manager.
# 0. Write gromacs wrapper in terms of dicts. Processing is done via scalems entry points.
# 1. Hard-code handling in WorkflowManager.
# 2. Resume handling of dict and JSON representations in fingerprint and serialization modules.
# 3. Refine Python data model and build framework.
###########################
import functools
import logging
# import gmxapi
import os
import pathlib
import typing
from typing import Sequence

import scalems
from scalems.context import WorkflowManager
from scalems.exceptions import MissingImplementationError
from scalems.utility import next_monotonic_integer as _next_int

logger = logging.getLogger(__name__)
logger.debug('Importing {}'.format(__name__))


# TODO: REMOVE. Follow with fingerprint based UID ASAP.
def _next_uid() -> bytes:
    # Note that network byte order is equivalent to "big endian".
    return _next_int().to_bytes(32, 'big')


def _executable(argv: Sequence[str],
                inputs: Sequence[str] = (),
                outputs: Sequence[str] = (),
                stdin: typing.Iterable[str] = (),
                resources: typing.Mapping[str, str] = None,
                environment: typing.Mapping[str, str] = None,
                stdout: str = 'stdout.txt',
                stderr: str = 'stderr.txt'
                ):
    if resources is None:
        resources = {}
    if environment is None:
        environment = {}

    input_node = {
        # Static properties.
        'object_type': 'ExecutableInput',
        'uid': _next_uid().hex(),
        # Fields may have dependencies.
        'data': {
            'argv': list(argv),
            'inputs': list(inputs),
            'stdin': stdin,
            'environment': dict(**environment)
        }
    }
    # Results are self-referential until a concrete result is available.
    input_node['result'] = input_node['uid']

    task_node: dict = {
        # Static properties.
        'object_type': 'Executable',
        'uid': _next_uid().hex(),
        'data': {
            'stdout': 'stdout.txt',
            'stderr': 'stderr.txt',
        },
        # Fields may have dependencies.
        'input': {
            'ExecutableInput': input_node['uid'],
            'resources': dict(**resources),
            # The files produced by a program may depend on the inputs, so this is
            # not static. We will archive the whole working directory, but we can
            # use this file list to do additional checks for task success and allow
            # tighter output binding.
            'output': list(outputs),
        },
    }
    task_node['result'] = task_node['uid']

    output_node: dict = {
        'object_type': 'ExecutableOutput',
        'uid': _next_uid().hex(),
        'input': {
            # Explicit dependency that is not (yet) resolved in terms of data flow.
            'depends': task_node['uid'],
            # Fields with dependencies, though not evident in this example.
            'outputs': list(outputs),
            'stdout': stdout,
            'stderr': stderr
        },
    }
    output_node['result'] = output_node['uid']

    return {
        'implementation': ['scalems', 'wrappers', 'gromacs', 'Executable'],
        'message': {
            'Executable': [input_node, task_node, output_node]
        }
    }


try:
    from gmxapi.commandline import cli_executable

    _gmx_cli_entry_point = cli_executable()
except ImportError:
    cli_executable = None
    _gmx_cli_entry_point = None
if _gmx_cli_entry_point is None:
    _gmx_cli_entry_point = 'gmx'


# TODO: Implicit ensemble handling requires type annotations.
# def make_input(simulation_parameters=['md.mdp'],
#                topology=['md.top'],
#                conformation=['md.gro'],
#                wrapper_name='gmx'):
def make_input(simulation_parameters='md.mdp',
               topology='md.top',
               conformation='md.gro',
               wrapper_name=_gmx_cli_entry_point):
    preprocess = _executable(
        argv=(
            wrapper_name,
            'grompp',
            '-f', simulation_parameters,
            '-p', topology,
            '-c', conformation,
            '-o', 'topol.tpr'
        )
    )
    return {
        'implementation': ['scalems', 'wrappers', 'gromacs', 'SimulationInput'],
        'message': {
            'SimulationInput': {'input': preprocess}
        }
    }


def simulate(arg, **kwargs):
    if not isinstance(arg, dict) or 'SimulationInput' not in arg.get('message', ()):
        raise TypeError('Bad input.')
    simulator_input = arg['message']['SimulationInput']['input']
    # Note: We haven't implemented modify_input or simulation chaining yet.
    if not isinstance(simulator_input, dict) or 'implementation' not in simulator_input:
        raise TypeError('Wrong kind of input object.')
    return {
        'implementation': ['scalems', 'wrappers', 'gromacs', 'Simulate'],
        'message': {
            'Simulate': {'input': simulator_input, 'kwargs': kwargs}
        }
    }


def modify_input(*args, **kwargs):
    raise MissingImplementationError()


@functools.singledispatch
def scalems_helper(*args, **kwargs):
    """Allow SCALE-MS to manage tasks from this module.

    Placeholder module entry point. This explicit entry point will probably not
    be required (or appropriate) in the long run. Its roles can be subsumed by
    code that is already run during module import, such as by registration
    through decorators and/or subclassing.

    TODO:
        We can incorporate some amount of Capabilities Provider Interface (CPI)
        here to allow the module author to provide information to the workflow manager
        or dispatch along different run time code paths according to the execution environment.
    """
    logger.debug('scalems_helper called with args:({}), kwargs:({})'.format(
        ','.join([repr(arg) for arg in args]),
        ','.join([':'.join((key, repr(value))) for key, value in kwargs.items()])
    ))


@scalems_helper.register
def _(task: scalems.context.Task, context: WorkflowManager):
    sublogger = logger.getChild('scalems_helper')
    sublogger.debug('Serialized task record: {}'.format(task.serialize()))
    command = task.type[-1]
    assert command in {'Simulate', 'SimulationInput', 'Executable'}
    # Right now, we should only need to process the 'Simulate' command...
    assert command == 'Simulate'
    # TODO: Typing on the Task data proxy.
    command_message = task.input['message'][command]
    # kwargs = command_message['kwargs']
    input_ref: str = command_message['input']
    logger.debug(f'Decoding reference {input_ref}')
    input_ref: bytes = bytes.fromhex(input_ref)
    task_map = context.task_map
    logger.debug('Items done: {}'.format(
        ', '.join([': '.join([key.hex(), str(value.done())]) for key, value in task_map.items()])
    ))
    # simulator_input_view = context.item(input_ref)

    # TODO: Workaround until we have the framework deliver results.
    # simulator_input: SubprocessResult = simulator_input_view.result()
    # logger.debug(f'Acquired grompp output: {simulator_input}')
    # tprfile = simulator_input.file['-o']
    dependency_dir = pathlib.Path(os.getcwd()).parent / input_ref.hex()
    tprfile = dependency_dir / 'topol.tpr'
    import gmxapi
    md_in = gmxapi.read_tpr(str(tprfile))
    # TODO: Manage or integrate with gmxapi working directories.
    md = gmxapi.mdrun(md_in)
    # TODO: Make sure we mark interrupted simulations as "failed".
    return md.run()