Adaptive Markov State Modeling: asynchronous

Note: may or may not have looping. Consider that MSM model builder outputs include the convergence condition helpers and the simulation input emitters.

As in Adaptive Markov State Modeling: fixed width ensemble, we use a Python data flow scripting interface to connect PyEmma, a simulator adapter, and some additional Python code to iteratively run and analyze groups of simulations until convergence.

In this example application, we consider use cases in which many independent simulations can be launched and processed asynchronously. As data becomes available, the model is updated and the next simulations are configured to improve the sampling needed to further refine the model.

Consider a few different modes of resource management and of workflow expression.

In the simplest case, the user may declare a number of simulations to run and analyze the results as they are available (Example 1).

Adaptive work updates the requested simulations with repeated updates to the model. This could be expressed in terms of a loop over individual analysis/update tasks updating the collection of simulation futures (Example 2) or as an asynchronous loop over batches or waves of simulation and analysis (Example 3).

Example 1: Unordered mapping

Simulation results are processed as they become available. This allows flexible scheduling of analysis tasks as simulation tasks complete.

Adaptive variants include analysis driven exit conditions (e.g. running until a convergence condition, rather than for a fixed number of simulations), but the runtime must support some adaptive functionality in that the mapping of task output to task input is not known until data is produced during execution.

examples/async_adaptive_msm/unordered_mapping.py

"""
Markov State Model building.

This script provides an entry point for a workflow implemented in terms of
SCALE-MS data flow and execution management, plus user-provided support tools.

In this example, we use a Python data flow scripting interface to connect
PyEmma, a simulator adapter, and some additional Python code to iteratively
run and analyze groups of simulations until convergence.
"""

# Acquire simulation inputs and common supporting code.
from .workflow_input import *

# Set up a parallel pipeline, starting from a single input.
num_simulations = 1000
# A `broadcast` function is possible, but is not explicitly necessary,
# so we have not specified one yet. See Python reference for discussion.
#    initial_input = scalems.broadcast(initial_simulation_input, shape=(N,))
initial_input = [initial_simulation_input] * num_simulations

# Get the Future for the sequenced iterable of MD trajectories.
trajectory_sequence = simulate(input=initial_input).trajectory

# Convert the sequence Future to an unordered iterable Future.
trajectory_unordered_collection = scalems.desequence(trajectory_sequence)
# Note that the Python *asyncio* standard library module has an `as_completed`
# function that performs the behavior for which we use `desequence`. We could
# rename `desequence` to `as_completed`, or keep `desequence` as a lower-level
# data annotation helper and use `as_completed` for the normative user interface
# for use cases such as this.
#
# Implementation note: through the Resource slicing interface, a sequence Future
# can be decomposed to Futures of the sequence elements. Through the Context
# resource subscription interface, either the original Future or the decomposed
# Futures can be converted to individual resource subscriptions that can be
# re-expressed to the user as an unordered resource in the local Python context.
#
# Question: The *trajectory* output is actually an ensemble array of sequences
# of trajectory frames, which in turn contain sequences of coordinate data.
# How do we distinguish the outer from the inner dimensions most intuitively?
# Maybe, like analogous numpy operations, commands like `desequence` have an
# optional *axis* or *dimension* argument?

# Asynchronously flatten the trajectory frames into an asynchronous iterable.
allframes = collect_coordinates(trajectory_unordered_collection)

# It is conceivable that *msm_analyzer* is implemented to decompose its own work
# in cooperation with the task management framework, in which case
# `msmtool.msm_analyzer(topfile=initial_pdb, trajectory=allframes)` could be
# efficiently scheduled with non-blocking asynchronicity, but for simplicity
# and consistency with the other examples, we assume the simpler implementation
# that updates the N-clusters model with (batches of) input.

scalems.map(msmtool.msm_analyzer, allframes).run()
# Question: How do we express that the model data is accumulated to a single
# entity?
# Question: Should we use the *shape* argument to `map` to indicate the intended
# chunking of *allframes* data to process?

Example 2: Data queue

Simulation results are popped from an asynchronous queue, analyzed, and used to inform the simulations added back to the queue.

examples/async_adaptive_msm/async_queue.py

"""
Iterative adaptive Markov State Model refinement.

This script provides an entry point for a workflow implemented in terms of
SCALE-MS data flow and execution management, plus user-provided support tools.

In this example, we use a Python data flow scripting interface to connect
PyEmma, a simulator adapter, and some additional Python code to iteratively
run and analyze groups of simulations until convergence.
"""

# Acquire simulation inputs and common supporting code.
from .workflow_input import *

# Construct an initial set of exploratory simulations.
seeds = list(range(100))
parameter_list = list()
for seed in range(100):
    parameter_list.append({'ld_seed': seed})
modified_input = modify_input(input=initial_simulation_input,
                              parameters=parameter_list)
# Note: a generic implementation should support syntax like
# `parameters={'ld_seed': seed for seed in range(100)}`, but the example as
# written may be more readable, and requires less development on the data model
# to properly handle.


# Declare a Queue of simulation results and populate it with some initial
# exploratory simulations.
queue = scalems.Queue()
# Note that aspects of the Queue instance will depend on the nature of the
# execution environment, the implementation of the executor adapters, and, thus,
# the associated Context instance, but we can probably assume that sensible
# default values for such things as *maxsize* are possible in all implementations,
# and thus in the proxy implementation for the local Context.
scalems.map(queue.put, simulate(input=modified_input))
# This `map` could reasonably be written in terms of the more generic `map`,
# and is essentially equivalent to
#     for simulation in simulate(input=modified_input):
#         queue.put(simulation)


# Iteratively process batches of results from the queue, adding new simulations
# to the queue as analysis completes.

# Get a placeholder object that can serve as a sub context / work graph owner
# and can be used in a control operation.
analysis_iteration = scalems.subgraph(variables={
    'queue': queue,
    'transition_matrix': scalems.ndarray(0., shape=(num_clusters, num_clusters)),
    'is_converged': False})

with analysis_iteration:
    # Get the results from the next simulation.
    md = analysis_iteration.queue.get()
    # Note: we may not want to use the `get` interface when setting up
    # "coroutine futures". It is not clear how to implement a comparable
    # `task_done()` message, or how best to pop several results at once,
    # and the Queue itself may be a lower level detail that doesn't belong in
    # the generic use case.
    # We also should have a way to get multiple items at a time.
    # We might use either a `partial_map` or a *chunksize* argument to `map`.
    # Alternatively, consider the pattern of `asyncio.wait(return_when=...)`
    # that allows behavior to be specified by an argument.

    # Get the output trajectories and pass to PyEmma to build the MSM
    # Return a stop condition object that can be used in gmx while loop to
    # terminate the simulation
    allframes = collect_coordinates(md.trajectory)

    adaptive_msm = msmtool.msm_analyzer(topfile=initial_pdb,
                                        trajectory=allframes,
                                        transition_matrix=analysis_iteration.transition_matrix)

    # Update the persistent data for the subgraph
    analysis_iteration.transition_matrix = adaptive_msm.transition_matrix
    analysis_iteration.is_converged = adaptive_msm.is_converged

    # Finish the iteration with conditional logic (1) to avoid adding more
    # unnecessary simulations to the queue, and (2) to allow this subgraph to
    # be reused to drain the queue.
    with scalems.conditional(condition=scalems.logical_not(analysis_iteration.is_converged)):
        # Use the MSM update to generate simulation inputs for further refinement.
        modified_input = modify_input(
            input=initial_simulation_input, structure=adaptive_msm.conformation)
        scalems.map(queue.put, simulate(modified_input))

# In the default work graph, add a node that depends on `condition` and
# wraps subgraph.
my_loop = scalems.while_loop(function=analysis_iteration,
                             condition=scalems.logical_not(analysis_iteration.is_converged))
my_loop.run()

# (Optionally) Drain the queue.
scalems.while_loop(function=analysis_iteration,
                   condition=scalems.logical_not(queue.empty)).run()

Example 3: Asynchronous batches

A batch of simulations is launched. Through an unordered mapping, the results are processed as they become available, and the analysis results inform the configuration of the next batch of simulations. An asynchronous looping construct allows the next batch to be scheduled and launched opportunistically as inputs dependencies are satisfied.

examples/async_adaptive_msm/async_iteration.py

"""
Iterative adaptive Markov State Model refinement.

This script provides an entry point for a workflow implemented in terms of
SCALE-MS data flow and execution management, plus user-provided support tools.

In this example, we use a Python data flow scripting interface to connect
PyEmma, a simulator adapter, and some additional Python code to iteratively
run and analyze groups of simulations until convergence.
"""

# Acquire simulation inputs and common supporting code.
from .workflow_input import *

# Set up a parallel pipeline of N=100 simulations, starting from a single input.
num_simulations = 100
seeds = list(range(num_simulations))
parameter_list = list()
for seed in range(num_simulations):
    parameter_list.append({'ld_seed': seed})
initial_input = modify_input(input=initial_simulation_input,
                             parameters=parameter_list)

# The num_simulations should equal or exceed the amount of concurrency available
# from the requested computing resources. Results will be processed in smaller
# batches. Batch sizes should be large enough that analysis tasks are not
# dominated by scheduling, event handling / data placement latency, or other
# synchronization, but small enough to minimize the cost of node failures and
# work heterogeneity (e.g. maximize occupancy).
batch_size = num_simulations // 10


async def batch_analysis(model, conformation: Iterable):
    """Return an updated model after applying a batch updated."""
    updated_model = model.update(await conformation)
    return updated_model


async def analysis_iteration(model, trajectories):
    """Perform an iteration on a wave of simulation results.

    Given an iterable of results, decompose into chunks for dispatching analysis
    tasks. Dispatch batch_size results at a time, as they become available.
    Use the analysis results to prepare the asynchronous iterable for the next
    wave of simulation.
    """
    simulation_input_set = set()
    # The model needs to be updated in a well-determined sequence, but we can
    # manage that entirely in terms of Futures by getting a new reference for
    # each future updated state of the model.
    for batch in scalems.as_completed(trajectories, batch_size=batch_size):
        model = batch_analysis(model, batch)
        # Note that the batch_analysis coroutine will await simulation outputs,
        # but since we have not awaited the coroutine yet, it is not yet a
        # blocking task. Add a future from the result to a set of awaitables.
        simulation_input_set.add(model.conformations)
    # Return a tuple that includes
    #  1. the reference to the model after the final batch update has been applied, and
    #  2. the set of awaitable simulation inputs.
    return (model, simulation_input_set)


async def simulation_iteration_optionA(inputs: Iterable[Future[SimulationInput]],
                                       conformations: Iterable[Future[SimulationFrame]]):
    """Perform an iteration of launching a wave of simulations.

    Given an asynchronous iterable of simulation inputs, launch simulations as
    the inputs become available. The set of simulation results compose the
    asynchronously awaitable output.
    """
    # Return a set of awaitable simulations.
    return {simulate(modify_input(i, conformation=x)) for i, x in zip(inputs, conformations)}


async def simulation_iteration_optionB(inputs: Iterable[Future[SimulationInput]]):
    """Perform an iteration of launching a wave of simulations.

    Given an asynchronous iterable of simulation inputs, launch simulations as
    the inputs become available. The set of simulation results compose the
    asynchronously awaitable output.
    """
    # Return a set of awaitable simulations.
    return {simulate(i) for i in inputs}


async def simulation_and_analysis_loop(initial_input):
    simulation_input = initial_input
    next_conformation = simulation_input.conformation
    model = msmtool.msm_analyzer()
    while not model.is_converged():
        # Option A would combine phase 0 and 1. E.g.:
        #     simulation = simulation_iteration_optionA(simulation_input, next_conformation)
        # Option B:
        # Phase 0: Prepare simulation input with the next round of initial conformations.
        simulation_input = modify_input(simulation_input, conformation=next_conformation)
        # Phase 1: Perform simulations
        simulation = simulation_iteration_optionB(simulation_input)
        # Phase 2: Update MSM to obtain next set of most informative initial conformations.
        model, next_conformation = analysis_iteration(model, simulation)
    return model


model = asyncio.run(simulation_and_analysis_loop(initial_input))