BLASTX Example#
This demonstration will utilize BLASTX to search for a nucleotide sequence within a local protein database.
Import the necessary libraries.
from pathlib import Path
from typing import NamedTuple
import matplotlib.pyplot as plt
import pandas as pd
import requests
from flytekit import conditional, kwtypes, task, workflow
from flytekit.extras.tasks.shell import OutputLocation, ShellTask
from flytekit.types.file import FlyteFile, PNGImageFile
Download the data from GitHub.
Note
When running code on the demo cluster, make sure data is included in the Docker image. Uncomment copy data command in the Dockerfile.
def download_dataset():
Path("kitasatospora").mkdir(exist_ok=True)
r = requests.get("https://api.github.com/repos/flyteorg/flytesnacks/contents/blast/kitasatospora?ref=datasets")
for each_file in r.json():
download_url = each_file["download_url"]
file_name = f"kitasatospora/{Path(download_url).name}"
if not Path(file_name).exists():
r_file = requests.get(each_file["download_url"])
open(file_name, "wb").write(r_file.content)
A ShellTask
allows you to run commands on the shell.
In this example, we use a ShellTask
to create and execute the BLASTX command.
Start by specifying the location of the BLAST output file.
Then, define variables that hold the paths to the input query sequence file, the database we are searching against, and the output file for BLAST.
Finally, generate and run the BLASTX command.
Both the standard output (stdout) and standard error (stderr) are captured and saved in the stdout
variable.
The {inputs}
and {outputs}
are placeholders for input and output values, respectively.
blastx_on_shell = ShellTask(
name="blastx",
debug=True,
script="""
mkdir -p {inputs.outdir}
query={inputs.datadir}/{inputs.query}
db={inputs.datadir}/{inputs.db}
blastout={inputs.outdir}/{inputs.blast_output}
blastx -out $blastout -outfmt 6 -query $query -db $db >> {outputs.stdout} 2>&1
""",
inputs=kwtypes(datadir=str, query=str, outdir=str, blast_output=str, db=str),
output_locs=[
OutputLocation(var="stdout", var_type=FlyteFile, location="stdout.txt"),
OutputLocation(
var="blastout",
var_type=FlyteFile,
location="{inputs.outdir}/{inputs.blast_output}",
),
],
)
Note
The outfmt=6
option requests BLASTX to generate a tab-separated plain text file, which is convenient for automated processing.
If the command runs successfully, there should be no standard output or error (stdout and stderr should be empty).
Next, define a task to load the BLASTX output. The task returns a pandas DataFrame and a plot. The file containing the BLASTX results is referred to as blastout.
BLASTXOutput = NamedTuple("blastx_output", result=pd.DataFrame, plot=PNGImageFile)
@task
def blastx_output(blastout: FlyteFile) -> BLASTXOutput:
# read BLASTX output
result = pd.read_csv(blastout, sep="\t", header=None)
# define column headers
headers = [
"query",
"subject",
"pc_identity",
"aln_length",
"mismatches",
"gaps_opened",
"query_start",
"query_end",
"subject_start",
"subject_end",
"e_value",
"bitscore",
]
# assign headers
result.columns = headers
# create a scatterplot
result.plot.scatter("pc_identity", "e_value")
plt.title("E value vs %identity")
plot = "plot.png"
plt.savefig(plot)
return BLASTXOutput(result=result.head(), plot=plot)
Verify that the BLASTX run was successful by checking if the standard output and error are empty.
@task
def is_batchx_success(stdout: FlyteFile) -> bool:
if open(stdout).read():
return False
else:
return True
Create a workflow that calls the previously defined tasks. A conditional statement is used to check the success of the BLASTX command.
@workflow
def blast_wf(
datadir: str = "kitasatospora",
outdir: str = "output",
query: str = "k_sp_CB01950_penicillin.fasta",
db: str = "kitasatospora_proteins.faa",
blast_output: str = "AMK19_00175_blastx_kitasatospora.tab",
) -> BLASTXOutput:
stdout, blastout = blastx_on_shell(datadir=datadir, outdir=outdir, query=query, db=db, blast_output=blast_output)
result = is_batchx_success(stdout=stdout)
final_result, plot = (
conditional("blastx_output")
.if_(result.is_true())
.then(blastx_output(blastout=blastout))
.else_()
.fail("BLASTX failed")
)
return BLASTXOutput(result=final_result, plot=plot)
Run the workflow locally.
if __name__ == "__main__":
print("Downloading dataset...")
download_dataset()
print("Running BLASTX...")
print(f"BLASTX result: {blast_wf()}")