Using Raw Containers#

This example demonstrates how to use arbitrary containers in 5 different languages, all orchestrated in flytekit seamlessly. Flyte mounts an input data volume where all the data needed by the container is available and an output data volume for the container to write all the data which will be stored away.

The data is written as separate files, one per input variable. The format of the file is serialized strings. Refer to the raw protocol to understand how to leverage this.

import logging

from flytekit import ContainerTask, kwtypes, task, workflow

logger = logging.getLogger(__file__)

Container Tasks#

A flytekit.ContainerTask denotes an arbitrary container. In the following example, the name of the task is calculate_ellipse_area_shell. This name has to be unique in the entire project. Users can specify:

  • input_data_dir -> where inputs will be written to

  • output_data_dir -> where Flyte will expect the outputs to exist.

inputs and outputs specify the interface for the task, thus it should be an ordered dictionary of typed input and output variables.

calculate_ellipse_area_shell = ContainerTask(
    name="ellipse-area-metadata-shell",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-shell:v1",
    command=[
        "./calculate-ellipse-area.sh",
        "/var/inputs",
        "/var/outputs",
    ],
)

calculate_ellipse_area_python = ContainerTask(
    name="ellipse-area-metadata-python",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-python:v1",
    command=[
        "python",
        "calculate-ellipse-area.py",
        "/var/inputs",
        "/var/outputs",
    ],
)

calculate_ellipse_area_r = ContainerTask(
    name="ellipse-area-metadata-r",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-r:v1",
    command=[
        "Rscript",
        "--vanilla",
        "calculate-ellipse-area.R",
        "/var/inputs",
        "/var/outputs",
    ],
)

calculate_ellipse_area_haskell = ContainerTask(
    name="ellipse-area-metadata-haskell",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-haskell:v1",
    command=[
        "./calculate-ellipse-area",
        "/var/inputs",
        "/var/outputs",
    ],
)

calculate_ellipse_area_julia = ContainerTask(
    name="ellipse-area-metadata-julia",
    input_data_dir="/var/inputs",
    output_data_dir="/var/outputs",
    inputs=kwtypes(a=float, b=float),
    outputs=kwtypes(area=float, metadata=str),
    image="ghcr.io/flyteorg/rawcontainers-julia:v1",
    command=[
        "julia",
        "calculate-ellipse-area.jl",
        "/var/inputs",
        "/var/outputs",
    ],
)


@task
def report_all_calculated_areas(
    area_shell: float,
    metadata_shell: str,
    area_python: float,
    metadata_python: str,
    area_r: float,
    metadata_r: str,
    area_haskell: float,
    metadata_haskell: str,
    area_julia: float,
    metadata_julia: str,
):
    logger.info(f"shell: area={area_shell}, metadata={metadata_shell}")
    logger.info(f"python: area={area_python}, metadata={metadata_python}")
    logger.info(f"r: area={area_r}, metadata={metadata_r}")
    logger.info(f"haskell: area={area_haskell}, metadata={metadata_haskell}")
    logger.info(f"julia: area={area_julia}, metadata={metadata_julia}")

As can be seen in this example, ContainerTasks can be interacted with like normal python functions, whose inputs correspond to the declared input variables. All data returned by

@workflow
def wf(a: float, b: float):
    # Calculate area in all languages
    area_shell, metadata_shell = calculate_ellipse_area_shell(a=a, b=b)
    area_python, metadata_python = calculate_ellipse_area_python(a=a, b=b)
    area_r, metadata_r = calculate_ellipse_area_r(a=a, b=b)
    area_haskell, metadata_haskell = calculate_ellipse_area_haskell(a=a, b=b)
    area_julia, metadata_julia = calculate_ellipse_area_julia(a=a, b=b)

    # Report on all results in a single task to simplify comparison
    report_all_calculated_areas(
        area_shell=area_shell,
        metadata_shell=metadata_shell,
        area_python=area_python,
        metadata_python=metadata_python,
        area_r=area_r,
        metadata_r=metadata_r,
        area_haskell=area_haskell,
        metadata_haskell=metadata_haskell,
        area_julia=area_julia,
        metadata_julia=metadata_julia,
    )

Note

Raw containers cannot be run locally at the moment.

Scripts#

The contents of each script mentioned above:

calculate-ellipse-area.sh#

#! /usr/bin/env sh

a=$(cat $1/a)
b=$(cat $1/b)

echo "4*a(1) * $a * $b" | bc -l | tee $2/area

echo "[from shell rawcontainer]" | tee $2/metadata

calculate-ellipse-area.py#

import math
import sys


def read_input(input_dir, v):
    with open(f"{input_dir}/{v}", "r") as f:
        return float(f.read())


def write_output(output_dir, output_file, v):
    with open(f"{output_dir}/{output_file}", "w") as f:
        f.write(str(v))


def calculate_area(a, b):
    return math.pi * a * b


def main(input_dir, output_dir):
    a = read_input(input_dir, "a")
    b = read_input(input_dir, "b")

    area = calculate_area(a, b)

    write_output(output_dir, "area", area)
    write_output(output_dir, "metadata", "[from python rawcontainer]")


if __name__ == "__main__":
    input_dir = sys.argv[1]
    output_dir = sys.argv[2]

    main(input_dir, output_dir)

calculate-ellipse-area.R#

library(readr)

args = commandArgs(trailingOnly=TRUE)

input_dir = args[1]
output_dir = args[2]

a = read_lines(sprintf("%s/%s", input_dir, 'a'))
b = read_lines(sprintf("%s/%s", input_dir, 'b'))

area <- pi * as.double(a) * as.double(b)
print(area)

writeLines(as.character(area), sprintf("%s/%s", output_dir, 'area'))
writeLines("[from R rawcontainer]", sprintf("%s/%s", output_dir, 'metadata'))

calculate-ellipse-area.hs#

import System.IO
import System.Environment
import Text.Read
import Text.Printf

calculateEllipseArea :: Float -> Float -> Float
calculateEllipseArea a b = pi * a * b

main = do
  args <- getArgs
  let input_a = args!!0 ++ "/a"
      input_b = args!!0 ++ "/b"
  a <- readFile input_a
  b <- readFile input_b

  let area = calculateEllipseArea (read a::Float) (read b::Float)

  let output_area = args!!1 ++ "/area"
      output_metadata = args!!1 ++ "/metadata"
  writeFile output_area (show area)
  writeFile output_metadata "[from haskell rawcontainer]"

calculate-ellipse-area.jl#

using Printf

function calculate_area(a, b)
    π * a * b
end

function read_input(input_dir, v)
    open(@sprintf "%s/%s" input_dir v) do file
        parse.(Float64, read(file, String))
    end
end

function write_output(output_dir, output_file, v)
    output_path = @sprintf "%s/%s" output_dir output_file
    open(output_path, "w") do file
        write(file, string(v))
    end
end

function main(input_dir, output_dir)
    a = read_input(input_dir, 'a')
    b = read_input(input_dir, 'b')

    area = calculate_area(a, b)

    write_output(output_dir, "area", area)
    write_output(output_dir, "metadata", "[from julia rawcontainer]")
end

# the keyword ARGS is a special value that contains the command-line arguments
# julia arrays are 1-indexed
input_dir = ARGS[1]
output_dir = ARGS[2]

main(input_dir, output_dir)

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery