Skip to content

Self-triggering workflow issues #16

Description

@poautran

In GitLab by @woutdenolf on May 10, 2024, 13:22 GMT+2:

Reported by @edgar

The example below exposes three issues

  • the "loop" node is not recognized as a start node (issue with ewokscore.graph.analysis.start_nodes) https://gitlab.esrf.fr/workflow/ewoks/ewokscore/-/merge_requests/236
  • we sometimes get "the pool is closed" errors (issue in pypushflow?)
  • "save" cannot have required parameters because it fails when triggered by "config" (ewoksppf input merge actor issue?)
from ewokscore import Task
from ewoks import execute_graph


class ConfigTask(Task, output_names=["config"]):
    def run(self):
        print("config")
        self.outputs.config = {}


class LoopTask(
    Task,
    input_names=["i", "n"],
    optional_input_names=["config"],
    output_names=["i", "keep_looping"],
):
    def run(self):
        self.outputs.i = self.inputs.i + 1
        self.outputs.keep_looping = self.outputs.i < self.inputs.n
        print("loop", self.outputs.i)


class SaveTask(
    Task, input_names=[], optional_input_names=["i", "config"], output_names=["result"]
):
    def run(self):
        if self.missing_inputs.i:
            print("skip save")
            self.outputs.result = False
        else:
            print("save", self.inputs.i)
            FILE.append(self.inputs.i)
            self.outputs.result = True


nodes = [
    {
        "id": "config",
        "task_type": "class",
        "task_identifier": "__main__.ConfigTask",
    },
    {
        "id": "loop",
        "task_type": "class",
        "task_identifier": "__main__.LoopTask",
        "default_inputs": [{"name": "i", "value": 0}, {"name": "n", "value": 10}],
    },
    {
        "id": "save",
        "task_type": "class",
        "task_identifier": "__main__.SaveTask",
    },
]


links = [
    {
        "source": "loop",
        "target": "loop",
        "data_mapping": [{"source_output": "i", "target_input": "i"}],
        "conditions": [{"source_output": "keep_looping", "value": True}],
    },
    {
        "source": "loop",
        "target": "save",
        "data_mapping": [{"source_output": "i", "target_input": "i"}],
    },
    {
        "source": "config",
        "target": "save",
        "data_mapping": [{"source_output": "config", "target_input": "config"}],
    }
]


if True:
    # Because loop cannot be a start node (see ewokscore graph analysis)
    links.append({
        "source": "config",
        "target": "loop",
        "data_mapping": [{"source_output": "config", "target_input": "config"}],
    })


workflow = {"graph": {"id": "testworkflow"}, "nodes": nodes, "links": links}

import logging

# logging.basicConfig(level=logging.DEBUG)

FILE = []
result = execute_graph(workflow, engine="ppf", scaling_workers=False, pool_type="thread")
print(FILE)

Assignees: noordhee

Migrated from GitLab: https://gitlab.esrf.fr/workflow/ewoks/ewoksppf/-/issues/16

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions