Source code for flytekitplugins.dbt.schema
import json
from dataclasses import dataclass
from typing import List, Optional
from dataclasses_json import DataClassJsonMixin
[docs]
@dataclass
class BaseDBTInput(DataClassJsonMixin):
"""
Base class for DBT Task Input.
Attributes
----------
project_dir : str
Path to directory containing the DBT ``dbt_project.yml``.
profiles_dir : str
Path to directory containing the DBT ``profiles.yml``.
profile : str
Profile name to be used for the DBT task. It will override value in ``dbt_project.yml``.
target : str
Target to load for the given profile (default=None).
output_path : str
Path to directory where compiled files (e.g. models) will be written when running the task (default=target).
ignore_handled_error : bool
Ignore handled error (exit code = 1) returned by DBT, see https://docs.getdbt.com/reference/exit-codes (default=False).
flags : dict
Dictionary containing CLI flags to be added to the ``dbt run`` command (default=False).
"""
project_dir: str
profiles_dir: str
profile: str
target: str = None
output_path: str = "target"
ignore_handled_error: bool = False
flags: dict = None
[docs]
def to_args(self) -> List[str]:
"""
Convert the instance of BaseDBTInput into list of arguments.
Returns
-------
List[str]
List of arguments.
"""
args = []
args += ["--project-dir", self.project_dir]
args += ["--profiles-dir", self.profiles_dir]
args += ["--profile", self.profile]
if self.target is not None:
args += ["--target", self.target]
if self.flags is not None:
for flag, value in self.flags.items():
if not value:
continue
args.append(f"--{flag}")
if isinstance(value, bool):
continue
if isinstance(value, list):
args += value
continue
if isinstance(value, dict):
args.append(json.dumps(value))
continue
args.append(str(value))
return args
[docs]
@dataclass
class BaseDBTOutput(DataClassJsonMixin):
"""
Base class for output of DBT task.
Attributes
----------
command : str
Complete CLI command and flags that was executed by DBT Task.
exit_code : int
Exit code returned by DBT CLI.
"""
command: str
exit_code: int
[docs]
@dataclass
class DBTRunInput(BaseDBTInput):
"""
Input to DBT Run task.
Attributes
----------
select : List[str]
List of model to be executed (default=None).
exclude : List[str]
List of model to be excluded (default=None).
"""
select: Optional[List[str]] = None
exclude: Optional[List[str]] = None
[docs]
def to_args(self) -> List[str]:
"""
Convert the instance of BaseDBTInput into list of arguments.
Returns
-------
List[str]
List of arguments.
"""
args = BaseDBTInput.to_args(self)
if self.select is not None:
args += ["--select"] + self.select
if self.exclude is not None:
args += ["--exclude"] + self.exclude
return args
[docs]
@dataclass
class DBTRunOutput(BaseDBTOutput):
"""
Output of DBT run task.
Attributes
----------
raw_run_result : str
Raw value of DBT's ``run_result.json``.
raw_manifest : str
Raw value of DBT's ``manifest.json``.
"""
raw_run_result: str
raw_manifest: str
[docs]
@dataclass
class DBTTestInput(BaseDBTInput):
"""
Input to DBT Test task.
Attributes
----------
select : List[str]
List of model to be executed (default : None).
exclude : List[str]
List of model to be excluded (default : None).
"""
select: Optional[List[str]] = None
exclude: Optional[List[str]] = None
[docs]
def to_args(self) -> List[str]:
"""
Convert the instance of DBTTestInput into list of arguments.
Returns
-------
List[str]
List of arguments.
"""
args = BaseDBTInput.to_args(self)
if self.select is not None:
args += ["--select"] + self.select
if self.exclude is not None:
args += ["--exclude"] + self.exclude
return args
[docs]
@dataclass
class DBTTestOutput(BaseDBTOutput):
"""
Output of DBT test task.
Attributes
----------
raw_run_result : str
Raw value of DBT's ``run_result.json``.
raw_manifest : str
Raw value of DBT's ``manifest.json``.
"""
raw_run_result: str
raw_manifest: str
@dataclass
class DBTFreshnessInput(BaseDBTInput):
"""
Input to DBT Freshness task.
Attributes
----------
select : List[str]
List of model to be executed (default : None).
exclude : List[str]
List of model to be excluded (default : None).
"""
select: Optional[List[str]] = None
exclude: Optional[List[str]] = None
def to_args(self) -> List[str]:
"""
Convert the instance of DBTFreshnessInput into list of arguments.
Returns
-------
List[str]
List of arguments.
"""
args = BaseDBTInput.to_args(self)
if self.select is not None:
args += ["--select"] + self.select
if self.exclude is not None:
args += ["--exclude"] + self.exclude
return args
@dataclass
class DBTFreshnessOutput(BaseDBTOutput):
"""
Output of DBT Freshness task.
Attributes
----------
raw_sources : str
Raw value of DBT's ``sources.json``.
"""
raw_sources: str