Operations

Specify the interface by which functional code units are accessible and executable by the runtime facilities.

Importable Python modules implement a few basic interfaces to obtain computational and data resources and publish results, mediated by the runtime executor.

Framework and object model

Operations

Explain how operations are structured and implemented.

Runtime executor

@startuml
title Execution layer Worker interface

participant "Higher level framework" as framework

box "Worker Context"
participant "Resource manager" as context
participant "Node Director" as node_director
participant "Node Builder" as node_builder
participant "Result publisher" as publisher
end box

'--> framework: static data resources available
'   note over framework #FFFFFF
'   my_array =
'   69e69fedbdbab6dcda556db6d5835375cefb4e801fb8279d0d7ef3995154bc15: scalems.Integer64
'   endnote
'
'   framework --> context **: provision worker
'
'   rnote over framework #FFFFFF
'   69e69fedbdbab6dcda556db6d5835375cefb4e801fb8279d0d7ef3995154bc15: scalems.Integer64
'   endrnote
'
'   framework -> context: place data

--> framework: work package

    note over framework #FFFFFF
    {
        "version"= "scalems_workflow_1",
        "types"= {
            "scalems.SubprocessInput" = {
                "argv" = { "type"= ["scalems", "String"], "shape"= ["constraints.OneOrMore"] },
                "inputs" = { "type"= ["scalems", "Mapping"], "shape"= [1] },
                "outputs" = { "type"= ["scalems", "Mapping"], "shape"= [1] },
                "stdin" = { "type"= ["scalems", "File"], "shape"= [1] },
                "environment" = { "type"= ["scalems", "Mapping"], "shape"= [1] },
                "resources" = { "type"= ["scalems", "Mapping"], "shape"= [1] }
            },
            "scalems.SubprocessResult" = {
                "exitcode" = { "type"= ["scalems", "Integer"], "shape"= [1] },
                "stdout" = { "type"= ["scalems", "File"], "shape"= [1] },
                "stderr" = { "type"= ["scalems", "File"], "shape"= [1] },
                "file" = { "type"= ["scalems", "Mapping"], "shape"= [1] }
            },
            "scalems.Subprocess" = {
                "input" = { "type": ["scalems", "SubprocessInput"], "shape": [1] },
                "result" = { "type": ["scalems", "SubprocessResult"], "shape": [1] }
            },
        },
        "referents"= [
            {
                "label": "input_files",
                "uid": "aaaa...",
                "type": ["scalems", "Mapping"],
                "data": [{"-i"= ["infile"]}]
            },
            {
                "label": "output_files",
                "uid": "bbbb...",
                "type": ["scalems", "Mapping"],
                "data": [{"-o"= ["outfile"]}]
            },
            {
                "label": "resource_spec",
                "uid": "cccc...",
                "type": ["scalems", "Mapping"],
                "data": []
            },
            {
                "label": "subprocess_input",
                "uid": "dddd...",
                "type": ["scalems", "SubprocessInput"],
                "args": ["myprogram", "--num_threads", "8"],
                "inputs": "aaaa...",
                "outputs": "bbbb...",
                "stdin": null,
                "environment" = [{}],
                "resources" = "cccc..."
            },
            {
                "label"= "command",
                "uid"= "eeee...",
                "type"= ["scalems", "Subprocess"],
                "input"= "dddd...",
                "result"= "eeee..."
            }

        ]
    }
    endrnote


   framework -> context ++: deserialize node


    context -> node_builder **: get_node_builder(uid, inputs)
    activate node_builder
    context -> node_director ++

    node_director -> node_builder: apply node input
    activate node_builder

    alt incomplete && has_dependencies
        node_builder -> context: register subscriptions
    end
    deactivate node_builder

    node_director -> node_builder ++: set up output
    node_builder -> publisher **
    deactivate node_builder

    node_director -> node_builder ++: build()

    publisher -> context: publish
    activate context
    context -> context: subscriptions
    framework <-- context: stage output data
    deactivate context
    deactivate publisher

    node_director <-- node_builder --: node
    context <-- node_director --
    destroy node_builder
    framework <-- context: node
    deactivate context

    [<-- framework: results

@enduml

Data flow, checkpoint, and resumption

The executor should make sure the worker is provisioned with operation dependencies before transmitting a packet of work to be performed, but the worker still needs to resolve the dependencies for each node.

Note that the abstract work graph contains work that a given worker may not perform (such as array members), but workers need to be able to identify the outputs of operations they are not responsible for executing.

Implementation road map

Data staging

Fundamental data structures, like arrays or array slices, need to be fingerprinted and staged to the worker, and reference-able in concrete graph records.

Concrete task

First, consider a graph with no abstraction. Each operation depends on concrete data explicitly represented in the work record.

scale-ms-15

  • Generate fingerprint for data and operations.

  • Produce serialized data node.

  • Produce serialized operation node.

  • Identify data by fingerprint and localize/confirm availability.

  • Deserialize work, (re)initializing local resources. Determine completion status.

  • Identify data flow constraints, establish subscriptions and/or chase references.

  • Execute and/or publish results to subscribers.

Generating concrete tasks

In the simplest abstraction, an operation input is expressed in terms of another operation. Demonstrate how this is translated to a concrete executable task, with implications for checkpointing and resumption.

Design notes

Callable and arguments

Arguments will be supplied to the callable when the task is launched. Launching a task requires a callable, arguments for the callable, an active executor. For a concurrent.futures.Executor, the callable and arguments are passed to Executor.submit() to get a Task (Future interface). For asyncio, it is possible to hold a coroutine object that is not yet executing (awaitable, but no Future interface), and then pass it to asyncio.create_task to get a Task (Future interface). Alternatively, callable and args can be passed with an executor to asyncio.Loop.run_in_executor() to get a asyncio.Future. In RP, A ComputeUnit can be created without immediate scheduling, then a Task is acquired by “submit”ing. The Callable and Arguments distinction, then, is not general. It seems reasonable to have an encapsulated task factory that can operate in conjunction with an active execution facility to produce a Future. The details of task description -> task submission / Future handle -> awaiting on the future can be absorbed into the NodeBuilder and proxy Future protocols, but we probably need two states in the command handle to indicate whether a Task has been created yet, and a third state to indicate its completion (note Future.done() is commonly available.).

Upshot:

  1. Handles returned by command helpers may not (yet) be bound to real tasks.

  2. Context implementations need to support an add_task protocol that proxies the details of converting a task description into a Task instance.

  3. A Context<->Context protocol needs to manage Futures that proxy to tasks in other Contexts. Only the simple and unified ScaleMS.Future should be exposed to ordinary users.

  4. We can require Session.run() to be used in the near term to force completion of the task protocol, and absorb that behavior into scalems.wait() and scalems.run() in the long term.

async def canonical_syntax():
    context = sms_context.get_scope()
    # For testing purposes, we will explicitly create each of the implemented Contexts.
    # Simple use case:
    # Use context manager protocol to initialize and finalize connections and resources.
    with context as session:
        cmd = executable(('/bin/echo', 'hello', 'world'), stdout='filename')
        await session.run(cmd) # Near-term solution to resolve all awaitables explicitly.
        # Alternative not yet implemented:
        #    scalems.wait(cmd.exitcode) # Minimally resolve cmd.exitcode
        assert cmd.exitcode.done()
        # In user-facing Contexts, we can make sure that Future.result() does not require explicit waiting.
        assert cmd.exitcode.result() == 0
        # No `wait cmd` is necessary because the awaitable will be managed by session clean-up, if necessary.
    # Higher-level use cases:
    # * Pass the target context to the command factory to get a command object that is
    #   already bound to a particular execution framework.
    # * Set the Context and then call scalems.run(workflow) to get behavior like
    #   asyncio.run().

Simple work

A single node of work may be executed immediately in the Worker Context, or not at all (if the local checkpoint state is already final).

@startuml
title Immediate launch

box "Worker Context"
participant "Resource manager" as context
participant "Node Director" as node_director
participant "Node Builder" as node_builder
participant "Result publisher" as publisher
end box

box "checkpoint facility"
participant "API" as checkpoint_facility
participant "Checkpoint director" as checkpoint_director
participant "checkpointer: Publisher" as checkpoint_publisher
participant "backing store" as checkpoint_backing_store
end box

box "Operation implementation" #DDEEFF
participant "API connectors" as factory
participant resources
participant function
end box

[-> context: place data

[-> context: deserialize node
activate context

context -> node_builder **: get_node_builder(uid, inputs)
activate node_builder
context -> node_director ++

node_director -> factory
node_director <-- factory: input resource builder
node_director <-- factory: output resource builder
node_director <-- factory: functor builder


node_director -> checkpoint_facility ++
checkpoint_facility -> factory: negotiate facilities
checkpoint_facility -> checkpoint_backing_store ++: initialize
checkpoint_facility <-- checkpoint_backing_store --: state metadata
node_director <-- checkpoint_facility --

node_director -> checkpoint_facility ++
checkpoint_facility -> checkpoint_director **: backing store reference
checkpoint_facility --> node_director --: checkpoint_director

node_director -> node_builder: apply node input
activate node_builder

node_builder -> factory ++: resource builder

checkpoint_facility <-- factory ++: receive checkpoint state
checkpoint_facility -> checkpoint_director ++
checkpoint_director -> checkpoint_backing_store ++: read
checkpoint_director -> factory: apply checkpoint state
checkpoint_director -> checkpoint_backing_store: close
deactivate checkpoint_backing_store
deactivate checkpoint_director
deactivate checkpoint_facility
alt incomplete && has_dependencies
    node_builder -> context: register subscriptions
end
deactivate node_builder

node_director -> node_builder ++: set up output
node_builder -> publisher **
deactivate node_builder

node_director -> node_builder ++: build()
node_builder -> factory ++: build()
factory <- factory ++: resolve input subscriptions
alt incomplete && has_dependencies
    factory -> context: await subscriptions
    context --> factory: apply inputs
end
deactivate factory

node_builder -> checkpoint_facility ++
checkpoint_facility -> checkpoint_publisher **: backing store reference
return

node_builder -> checkpoint_publisher ++
checkpoint_publisher -> publisher: subscribe
ref over publisher
Use case
end ref
deactivate checkpoint_publisher

publisher <-- factory ++: acquire
factory -> resources ++
node_builder <-- factory --: resources
deactivate factory

node_builder -> factory ++: task builder
factory -> function ++
function -> resources: input
node_builder <-- factory --: functor

alt partial completion checkpoint
function -> resources: checkpoint
resources -> checkpoint_publisher ++: update
checkpoint_publisher -> checkpoint_backing_store ++: open
checkpoint_publisher -> checkpoint_backing_store: write
checkpoint_publisher -> checkpoint_backing_store: close
deactivate checkpoint_backing_store
deactivate checkpoint_publisher
end

function -> resources: output
resources -> publisher ++: publish
publisher -> checkpoint_publisher ++: publish
checkpoint_publisher -> checkpoint_backing_store ++: open
checkpoint_publisher -> checkpoint_backing_store: write
checkpoint_publisher -> checkpoint_backing_store: close
deactivate checkpoint_backing_store
deactivate checkpoint_publisher

publisher -> context: publish
activate context
context -> context: subscriptions
[<-- context: stage output data
deactivate context
deactivate publisher

function -> node_builder: success
node_builder -> function: release
deactivate function

node_builder -> resources: release references
resources --> publisher: release
destroy publisher
destroy resources

node_director <-- node_builder --: node
context <-- node_director --
destroy node_builder
[<-- context: node
deactivate context

|||
    ref over context, node_director, node_builder, publisher
Use case: subscribe to result publisher

Scenario 1: results not yet available
1. node result publisher receives subscription request.
2. Context identifies that node is not finalized.
3. Subscriber is added to the list of publishers to notify on node output activity.

Scenario 2: results already published
1. node result publisher receives subscription request
2. Context identifies that node is already complete.
3. Subscriber receives published results before subscribe() completes.
    end ref

@enduml

Deferred execution

Multiple graph nodes may be received in the same packet of work, or asynchronously. The executor may locally manage dependencies to optimize execution and data placement.

The following diagram is somewhat speculative. See also issue 15 and issue 23.

@startuml
title Deferred launch

box "Worker Context"
participant "Resource manager" as context
participant "Node Director" as node_director
participant "Node Builder" as node_builder
participant "Result publisher" as publisher
end box

box "checkpoint facility"
participant "API" as checkpoint_facility
participant "Checkpoint director" as checkpoint_director
participant "checkpointer: Publisher" as checkpoint_publisher
participant "backing store" as checkpoint_backing_store
end box

box "Operation implementation" #DDEEFF
participant "API connectors" as factory
participant resources
participant function
end box

    [-> context: add_node(node)
    activate context


    context -> node_builder **: get_node_builder(uid, inputs)
    activate node_builder
    context -> node_director ++

    node_director -> factory
    node_director <-- factory: input resource builder
    node_director <-- factory: output resource builder
    node_director <-- factory: functor builder


    node_director -> checkpoint_facility ++
    checkpoint_facility -> factory: negotiate facilities
    checkpoint_facility -> checkpoint_backing_store ++: initialize
    checkpoint_facility <-- checkpoint_backing_store --: state metadata
    node_director <-- checkpoint_facility --

    node_director -> checkpoint_facility ++
    checkpoint_facility -> checkpoint_director **: backing store reference
    checkpoint_facility --> node_director --: checkpoint_director

    node_director -> node_builder: apply node input
    activate node_builder

    node_builder -> factory ++: resource builder

    checkpoint_facility <-- factory ++: receive checkpoint state
    checkpoint_facility -> checkpoint_director ++
    checkpoint_director -> checkpoint_backing_store ++: read
    checkpoint_director -> factory: apply checkpoint state
    checkpoint_director -> checkpoint_backing_store: close
    deactivate checkpoint_backing_store
    deactivate checkpoint_director
    deactivate checkpoint_facility
    alt incomplete && has_dependencies
        node_builder -> context: register subscriptions
    end
    deactivate node_builder

    node_director -> node_builder ++: set up output
    node_builder -> publisher **
    deactivate node_builder


    node_director <-- node_builder --: node
    context <-- node_director --
    [<-- context: node
    deactivate context


    alt wait on Future

        [--> context: receive subscriptions
        activate context
        context -> publisher: subscribe
        deactivate context

    else data event trigger
                context --> factory: apply inputs

    else explicit run
    [-> context
    end

    context -> context: resolve
    activate context


    context -> node_builder ++: get_node_builder(uid, inputs)
    context -> node_director ++



    node_director -> node_builder ++: build()
    node_builder -> factory ++: build()

    node_builder -> checkpoint_facility ++
    checkpoint_facility -> checkpoint_publisher **: backing store reference
    return

    node_builder -> checkpoint_publisher ++
    checkpoint_publisher -> publisher: subscribe
    ref over publisher
    Use case
    end ref
    deactivate checkpoint_publisher

    publisher <-- factory ++: acquire
    factory -> resources ++
    node_builder <-- factory --: resources
    deactivate factory

    node_builder -> factory ++: task builder
    factory -> function ++
    function -> resources: input
    node_builder <-- factory --: functor

    alt partial completion checkpoint
    function -> resources: checkpoint
    resources -> checkpoint_publisher ++: update
    checkpoint_publisher -> checkpoint_backing_store ++: open
    checkpoint_publisher -> checkpoint_backing_store: write
    checkpoint_publisher -> checkpoint_backing_store: close
    deactivate checkpoint_backing_store
    deactivate checkpoint_publisher
    end

    function -> resources: output
    resources -> publisher ++: publish
    publisher -> checkpoint_publisher ++: publish
    checkpoint_publisher -> checkpoint_backing_store ++: open
    checkpoint_publisher -> checkpoint_backing_store: write
    checkpoint_publisher -> checkpoint_backing_store: close
    deactivate checkpoint_backing_store
    deactivate checkpoint_publisher

    publisher -> context: publish
    activate context
    context -> context: subscriptions
    [<-- context: stage output data
    deactivate context
    deactivate publisher

    function -> node_builder: success
    node_builder -> function: release
    deactivate function

    node_builder -> resources: release references
    resources --> publisher: release
    destroy publisher
    destroy resources

    node_director <-- node_builder --: node
    context <-- node_director --
    destroy node_builder
    [<-- context: node
    deactivate context


|||
    ref over context, node_director, node_builder, publisher
Use case: subscribe to result publisher

Scenario 1: results not yet available
1. node result publisher receives subscription request.
2. Context identifies that node is not finalized.
3. Subscriber is added to the list of publishers to notify on node output activity.

Scenario 2: results already published
1. node result publisher receives subscription request
2. Context identifies that node is already complete.
3. Subscriber receives published results before subscribe() completes.
    end ref

@enduml

Orphaned diagrams

TODO: Are these redundant? Are they useful?

@startuml
title WorkflowContext client interface

participant "scalems package" as framework

box "WorkflowContext"
participant "Resource manager" as context
participant "Graph Director" as node_director
participant "Node Builder" as node_builder
participant "Result publisher" as publisher
end box


   framework --> context **: initialize local workflow state

   rnote over framework, context #FFFFFF
   lock meta-data store and note local workflow artifacts
   endrnote

-> framework: scalems.integer(((1,2),(3,4)))

    rnote over framework #FFFFFF
    "69e69fedbdbab6dcda556db6d5835375cefb4e801fb8279d0d7ef3995154bc15":
    {
        "operation": ["scalems", "Integer64"],
        "label": "my_array",
        "input": { "data": [ [1, 2], [3, 4] ] },
        "output": { "meta": { "resource": {
                        "type": "scalems.Integer64",
                        "shape": [2, 2] }}},
    }
    To do: update syntax...
    endrnote


   framework -> context ++: deserialize node


    context -> node_builder **: get_node_builder(uid, inputs)
    activate node_builder
    context -> node_director ++

    node_director -> node_builder: apply node input
    activate node_builder

    alt incomplete && has_dependencies
        node_builder -> context: register subscriptions
    end
    deactivate node_builder

    node_director -> node_builder ++: set up output
    node_builder -> publisher **
    deactivate node_builder

    node_director -> node_builder ++: build()

    publisher -> context: publish
    activate context
    context -> context: subscriptions
    framework <-- context: stage output data
    deactivate context
    deactivate publisher

    node_director <-- node_builder --: node
    context <-- node_director --
    destroy node_builder
    framework <-- context: node
    deactivate context

    [<-- framework: results

@enduml

@startuml

:Executor: -- (Execute task)
:Executor: -- (Generate task)
:Operation package: - (Execute task)
:Operation package: - (Publish data)
(Generate task) ..> (Evaluate resources): <<include>>
(Generate task) ..> (Evaluate dependencies): <<include>>
(Generate task) ..> (Control compute and input resources): <<include>>
(Execute task) ..> (Control compute and input resources): <<include>>
(Evaluate dependencies) ..> (Place data): <<include>>
(Publish data) ..> (Place data): <<include>>
(Publish data) -> (Generate task)
:Managed data: - (Place data)

@enduml

@startuml
:Executor: -- (Execute task)

(Create local node handle)

(Register checkpointed entity)

@enduml