Source code for flytekitplugins.dbt.task

import os

from flytekitplugins.dbt.error import DBTHandledError, DBTUnhandledError
from flytekitplugins.dbt.schema import (
    DBTFreshnessInput,
    DBTFreshnessOutput,
    DBTRunInput,
    DBTRunOutput,
    DBTTestInput,
    DBTTestOutput,
)
from flytekitplugins.dbt.util import run_cli

from flytekit import kwtypes
from flytekit.core.interface import Interface
from flytekit.core.python_function_task import PythonInstanceTask
from flytekit.loggers import logger

SUCCESS = 0
HANDLED_ERROR_CODE = 1
UNHANDLED_ERROR_CODE = 2


[docs] class DBTRun(PythonInstanceTask): """ Execute DBT Run CLI command. The task will execute ``dbt run`` CLI command in a subprocess. Input from :class:`flytekitplugins.dbt.schema.DBTRunInput` will be converted into the corresponding CLI flags and stored in :class:`flytekitplugins.dbt.schema.DBTRunOutput`'s command. Parameters ---------- name : str Task name. """ def __init__( self, name: str, **kwargs, ): super(DBTRun, self).__init__( task_type="dbt-run", name=name, task_config=None, interface=Interface(inputs=kwtypes(input=DBTRunInput), outputs=kwtypes(output=DBTRunOutput)), **kwargs, )
[docs] def execute(self, **kwargs) -> DBTRunOutput: """ This method will be invoked to execute the task. Example ------- :: dbt_run_task = DBTRun(name="test-task") @workflow def my_workflow() -> DBTRunOutput: return dbt_run_task( input=DBTRunInput( project_dir="tests/jaffle_shop", profiles_dir="tests/jaffle_shop/profiles", profile="jaffle_shop", ) ) Parameters ---------- input : DBTRunInput DBT run input. Returns ------- DBTRunOutput DBT run output. Raises ------ DBTHandledError If the ``dbt run`` command returns ``exit code 1``. DBTUnhandledError If the ``dbt run`` command returns ``exit code 2``. """ task_input: DBTRunInput = kwargs["input"] args = task_input.to_args() cmd = ["dbt", "--log-format", "json", "run"] + args full_command = " ".join(cmd) logger.info(f"Executing command: {full_command}") exit_code, logs = run_cli(cmd) logger.info(f"dbt exited with return code {exit_code}") if exit_code == HANDLED_ERROR_CODE and not task_input.ignore_handled_error: raise DBTHandledError(f"handled error while executing {full_command}", logs) if exit_code == UNHANDLED_ERROR_CODE: raise DBTUnhandledError(f"unhandled error while executing {full_command}", logs) output_dir = os.path.join(task_input.project_dir, task_input.output_path) run_result_path = os.path.join(output_dir, "run_results.json") with open(run_result_path) as file: run_result = file.read() # read manifest.json manifest_path = os.path.join(output_dir, "manifest.json") with open(manifest_path) as file: manifest = file.read() return DBTRunOutput( command=full_command, exit_code=exit_code, raw_run_result=run_result, raw_manifest=manifest, )
[docs] class DBTTest(PythonInstanceTask): """Execute DBT Test CLI command The task will execute ``dbt test`` CLI command in a subprocess. Input from :class:`flytekitplugins.dbt.schema.DBTTestInput` will be converted into the corresponding CLI flags and stored in :class:`flytekitplugins.dbt.schema.DBTTestOutput`'s command. Parameters ---------- name : str Task name. """ def __init__( self, name: str, **kwargs, ): super(DBTTest, self).__init__( task_type="dbt-test", name=name, task_config=None, interface=Interface( inputs={ "input": DBTTestInput, }, outputs={"output": DBTTestOutput}, ), **kwargs, )
[docs] def execute(self, **kwargs) -> DBTTestOutput: """ This method will be invoked to execute the task. Example ------- :: dbt_test_task = DBTTest(name="test-task") @workflow def my_workflow() -> DBTTestOutput: # run all models dbt_test_task( input=DBTTestInput( project_dir="tests/jaffle_shop", profiles_dir="tests/jaffle_shop/profiles", profile="jaffle_shop", ) ) # run singular test only dbt_test_task( input=DBTTestInput( project_dir="tests/jaffle_shop", profiles_dir="tests/jaffle_shop/profiles", profile="jaffle_shop", select=["test_type:singular"], ) ) # run both singular and generic test return dbt_test_task( input=DBTTestInput( project_dir="tests/jaffle_shop", profiles_dir="tests/jaffle_shop/profiles", profile="jaffle_shop", select=["test_type:singular", "test_type:generic"], ) ) Parameters ---------- input : DBTTestInput DBT test input Returns ------- DBTTestOutput DBT test output Raises ------ DBTHandledError If the ``dbt test`` command returns ``exit code 1``. DBTUnhandledError If the ``dbt test`` command returns ``exit code 2``. """ task_input: DBTTestInput = kwargs["input"] args = task_input.to_args() cmd = ["dbt", "--log-format", "json", "test"] + args full_command = " ".join(cmd) logger.info(f"Executing command: {full_command}") exit_code, logs = run_cli(cmd) logger.info(f"dbt exited with return code {exit_code}") if exit_code == HANDLED_ERROR_CODE and not task_input.ignore_handled_error: raise DBTHandledError(f"handled error while executing {full_command}", logs) if exit_code == UNHANDLED_ERROR_CODE: raise DBTUnhandledError(f"unhandled error while executing {full_command}", logs) output_dir = os.path.join(task_input.project_dir, task_input.output_path) run_result_path = os.path.join(output_dir, "run_results.json") with open(run_result_path) as file: run_result = file.read() # read manifest.json manifest_path = os.path.join(output_dir, "manifest.json") with open(manifest_path) as file: manifest = file.read() return DBTTestOutput( command=full_command, exit_code=exit_code, raw_run_result=run_result, raw_manifest=manifest, )
class DBTFreshness(PythonInstanceTask): """Execute DBT Freshness CLI command The task will execute ``dbt freshness`` CLI command in a subprocess. Input from :class:`flytekitplugins.dbt.schema.DBTFreshnessInput` will be converted into the corresponding CLI flags and stored in :class:`flytekitplugins.dbt.schema.DBTFreshnessOutput`'s command. Parameters ---------- name : str Task name. """ def __init__( self, name: str, **kwargs, ): super(DBTFreshness, self).__init__( task_type="dbt-freshness", name=name, task_config=None, interface=Interface( inputs={ "input": DBTFreshnessInput, }, outputs={"output": DBTFreshnessOutput}, ), **kwargs, ) def execute(self, **kwargs) -> DBTFreshnessOutput: """ This method will be invoked to execute the task. Example ------- :: dbt_freshness_task = DBTFreshness(name="freshness-task") @workflow def my_workflow() -> DBTFreshnessOutput: # run all models dbt_freshness_task( input=DBTFreshnessInput( project_dir="tests/jaffle_shop", profiles_dir="tests/jaffle_shop/profiles", profile="jaffle_shop", ) ) # run singular freshness only dbt_freshness_task( input=DBTFreshnessInput( project_dir="tests/jaffle_shop", profiles_dir="tests/jaffle_shop/profiles", profile="jaffle_shop", select=["test_type:singular"], ) ) # run both singular and generic freshness return dbt_freshness_task( input=DBTFreshnessInput( project_dir="tests/jaffle_shop", profiles_dir="tests/jaffle_shop/profiles", profile="jaffle_shop", select=["test_type:singular", "test_type:generic"], ) ) Parameters ---------- input : DBTFreshnessInput DBT freshness input Returns ------- DBTFreshnessOutput DBT freshness output Raises ------ DBTHandledError If the ``dbt source freshness`` command returns ``exit code 1``. DBTUnhandledError If the ``dbt source freshness`` command returns ``exit code 2``. """ task_input: DBTFreshnessInput = kwargs["input"] args = task_input.to_args() cmd = ["dbt", "--log-format", "json", "source", "freshness"] + args full_command = " ".join(cmd) logger.info(f"Executing command: {full_command}") exit_code, logs = run_cli(cmd) logger.info(f"dbt exited with return code {exit_code}") if exit_code == HANDLED_ERROR_CODE and not task_input.ignore_handled_error: raise DBTHandledError(f"handled error while executing {full_command}", logs) if exit_code == UNHANDLED_ERROR_CODE: raise DBTUnhandledError(f"unhandled error while executing {full_command}", logs) output_dir = os.path.join(task_input.project_dir, task_input.output_path) sources_path = os.path.join(output_dir, "sources.json") with open(sources_path) as file: sources = file.read() return DBTFreshnessOutput(command=full_command, exit_code=exit_code, raw_sources=sources)