Skip to main content

A Compute agnostic pipelining software

Project description

Hello from magnus

Logo


python: Pypi Code style: black MyPy Checked Tests: Docs:


Magnus is a simplified workflow definition language that helps in:

Along with the developer friendly features, magnus also acts as an interface to production grade concepts such as data catalog, reproducibility, experiment tracking and secure access to secrets.


What does it do?

works


Documentation

More details about the project and how to use it available here.


Installation

The minimum python version that magnus supports is 3.8

pip install magnus

Please look at the installation guide for more information.


Example

Your application code. Use pydantic models as DTO.

Assumed to be present at functions.py

from pydantic import BaseModel

class InnerModel(BaseModel):
    """
    A pydantic model representing a group of related parameters.
    """

    foo: int
    bar: str


class Parameter(BaseModel):
    """
    A pydantic model representing the parameters of the whole pipeline.
    """

    x: int
    y: InnerModel


def return_parameter() -> Parameter:
    """
    The annotation of the return type of the function is not mandatory
    but it is a good practice.

    Returns:
        Parameter: The parameters that should be used in downstream steps.
    """
    # Return type of a function should be a pydantic model
    return Parameter(x=1, y=InnerModel(foo=10, bar="hello world"))


def display_parameter(x: int, y: InnerModel):
    """
    Annotating the arguments of the function is important for
    magnus to understand the type of parameters you want.

    Input args can be a pydantic model or the individual attributes.
    """
    print(x)
    # >>> prints 1
    print(y)
    # >>> prints InnerModel(foo=10, bar="hello world")

Application code using driver functions.

The code is runnable without any orchestration framework.

from functions import return_parameter, display_parameter

my_param = return_parameter()
display_parameter(my_param.x, my_param.y)

Orchestration using magnus

python SDK yaml

Example present at: examples/python-tasks.py

Run it as: python examples/python-tasks.py

from magnus import Pipeline, Task

def main():
    step1 = Task(
        name="step1",
        command="examples.functions.return_parameter",
    )
    step2 = Task(
        name="step2",
        command="examples.functions.display_parameter",
        terminate_with_success=True,
    )

    step1 >> step2

    pipeline = Pipeline(
        start_at=step1,
        steps=[step1, step2],
        add_terminal_nodes=True,
    )

    pipeline.execute()


if __name__ == "__main__":
    main()

Example present at: examples/python-tasks.yaml

Execute via the cli: magnus execute -f examples/python-tasks.yaml

dag:
  description: |
    This is a simple pipeline that does 3 steps in sequence.
    In this example:
      1. First step: returns a "parameter" x as a Pydantic model
      2. Second step: Consumes that parameter and prints it

    This pipeline demonstrates one way to pass small data from one step to another.

  start_at: step 1
  steps:
    step 1:
      type: task
      command_type: python # (2)
      command: examples.functions.return_parameter # (1)
      next: step 2
    step 2:
      type: task
      command_type: python
      command: examples.functions.display_parameter
      next: success
    success:
      type: success
    fail:
      type: fail

Transpile to argo workflows

No code change, just change the configuration.

executor:
  type: "argo"
  config:
    image: magnus:demo
    persistent_volumes:
      - name: magnus-volume
        mount_path: /mnt

run_log_store:
  type: file-system
  config:
    log_folder: /mnt/run_log_store

More details can be found in argo configuration.

Execute the code as magnus execute -f examples/python-tasks.yaml -c examples/configs/argo-config.yam

Expand
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
  generateName: magnus-dag-
  annotations: {}
  labels: {}
spec:
  activeDeadlineSeconds: 172800
  entrypoint: magnus-dag
  podGC:
    strategy: OnPodCompletion
  retryStrategy:
    limit: '0'
    retryPolicy: Always
    backoff:
      duration: '120'
      factor: 2
      maxDuration: '3600'
  serviceAccountName: default-editor
  templates:
    - name: magnus-dag
      failFast: true
      dag:
        tasks:
          - name: step-1-task-uvdp7h
            template: step-1-task-uvdp7h
            depends: ''
          - name: step-2-task-772vg3
            template: step-2-task-772vg3
            depends: step-1-task-uvdp7h.Succeeded
          - name: success-success-igzq2e
            template: success-success-igzq2e
            depends: step-2-task-772vg3.Succeeded
    - name: step-1-task-uvdp7h
      container:
        image: magnus:demo
        command:
          - magnus
          - execute_single_node
          - '{{workflow.parameters.run_id}}'
          - step%1
          - --log-level
          - WARNING
          - --file
          - examples/python-tasks.yaml
          - --config-file
          - examples/configs/argo-config.yaml
        volumeMounts:
          - name: executor-0
            mountPath: /mnt
        imagePullPolicy: ''
        resources:
          limits:
            memory: 1Gi
            cpu: 250m
          requests:
            memory: 1Gi
            cpu: 250m
    - name: step-2-task-772vg3
      container:
        image: magnus:demo
        command:
          - magnus
          - execute_single_node
          - '{{workflow.parameters.run_id}}'
          - step%2
          - --log-level
          - WARNING
          - --file
          - examples/python-tasks.yaml
          - --config-file
          - examples/configs/argo-config.yaml
        volumeMounts:
          - name: executor-0
            mountPath: /mnt
        imagePullPolicy: ''
        resources:
          limits:
            memory: 1Gi
            cpu: 250m
          requests:
            memory: 1Gi
            cpu: 250m
    - name: success-success-igzq2e
      container:
        image: magnus:demo
        command:
          - magnus
          - execute_single_node
          - '{{workflow.parameters.run_id}}'
          - success
          - --log-level
          - WARNING
          - --file
          - examples/python-tasks.yaml
          - --config-file
          - examples/configs/argo-config.yaml
        volumeMounts:
          - name: executor-0
            mountPath: /mnt
        imagePullPolicy: ''
        resources:
          limits:
            memory: 1Gi
            cpu: 250m
          requests:
            memory: 1Gi
            cpu: 250m
  templateDefaults:
    activeDeadlineSeconds: 7200
    timeout: 10800s
  arguments:
    parameters:
      - name: run_id
        value: '{{workflow.uid}}'
  volumes:
    - name: executor-0
      persistentVolumeClaim:
        claimName: magnus-volume

Pipelines can be:

Linear

A simple linear pipeline with tasks either python functions, notebooks, or shell scripts

Parallel branches

Execute branches in parallel

loops or map

Execute a pipeline over an iterable parameter.

Arbitrary nesting

Any nesting of parallel within map and so on.

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

magnus-0.5.1.tar.gz (111.1 kB view hashes)

Uploaded Source

Built Distribution

magnus-0.5.1-py3-none-any.whl (132.5 kB view hashes)

Uploaded Python 3

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page