Notifications#

When a workflow is completed, users can be notified through:

The content of these notifications is configurable at the platform level.

Code Example#

When a workflow reaches a specified terminal workflow execution phase, the flytekit.Email, flytekit.PagerDuty, or flytekit.Slack objects can be used in the construction of a flytekit.LaunchPlan.

Consider the following example workflow:

from flytekit import Email, LaunchPlan, task, workflow, WorkflowExecutionPhase


@task
def double_int_and_print(a: int) -> str:
    return str(a * 2)


@workflow
def int_doubler_wf(a: int) -> str:
    doubled = double_int_and_print(a=a)
    return doubled

Here are three scenarios that can help deepen your understanding of how notifications work:

  1. Launch Plan triggers email notifications when the workflow execution reaches the SUCCEEDED phase.

int_doubler_wf_lp = LaunchPlan.get_or_create(
    name="int_doubler_wf",
    workflow=int_doubler_wf,
    default_inputs={"a": 4},
    notifications=[
        Email(
            phases=[WorkflowExecutionPhase.SUCCEEDED],
            recipients_email=["admin@example.com"],
        )
    ],
)
  1. Notifications shine when used for scheduled workflows to alert for failures.

from datetime import timedelta

from flytekit import FixedRate, PagerDuty

int_doubler_wf_scheduled_lp = LaunchPlan.get_or_create(
    name="int_doubler_wf_scheduled",
    workflow=int_doubler_wf,
    default_inputs={"a": 4},
    notifications=[
        PagerDuty(
            phases=[WorkflowExecutionPhase.FAILED, WorkflowExecutionPhase.TIMED_OUT],
            recipients_email=["abc@pagerduty.com"],
        )
    ],
    schedule=FixedRate(duration=timedelta(days=1)),
)
  1. Notifications can be combined with different permutations of terminal phases and recipient targets.

from flytekit import Slack

wacky_int_doubler_lp = LaunchPlan.get_or_create(
    name="wacky_int_doubler",
    workflow=int_doubler_wf,
    default_inputs={"a": 4},
    notifications=[
        Email(
            phases=[WorkflowExecutionPhase.FAILED],
            recipients_email=["me@example.com", "you@example.com"],
        ),
        Email(
            phases=[WorkflowExecutionPhase.SUCCEEDED],
            recipients_email=["myboss@example.com"],
        ),
        Slack(
            phases=[
                WorkflowExecutionPhase.SUCCEEDED,
                WorkflowExecutionPhase.ABORTED,
                WorkflowExecutionPhase.TIMED_OUT,
            ],
            recipients_email=["myteam@slack.com"],
        ),
    ],
)

Future work#

Work is ongoing to support a generic event egress system that can be used to publish events for tasks, workflows, and workflow nodes. When this is complete, generic event subscribers can asynchronously process these events for a rich and fully customizable experience.

Platform Configuration Changes#

The notifications top-level portion of the Flyteadmin config specifies how to handle notifications.

As in schedules, the handling of notifications is composed of two parts— one part handles enqueuing notifications asynchronously. The other part handles processing pending notifications and sends out emails and alerts.

This is only supported for Flyte instances running on AWS.

Config#

To publish notifications, you’ll need to set up an SNS topic.

To process notifications, you’ll need to set up an AWS SQS queue to consume notification events. This queue must be configured as a subscription to your SNS topic you created above.

To publish notifications, you’ll need a verified SES email address which will be used to send notification emails and alerts using email APIs.

The role you use to run Flyteadmin must have permissions to read and write to your SNS topic and SQS queue.

Let’s look into the following config section and explain what each value represents:

notifications:
  type: "aws"
  region: "us-east-1"
  publisher:
    topicName: "arn:aws:sns:us-east-1:{{ YOUR ACCOUNT ID }}:{{ YOUR TOPIC }}"
  processor:
    queueName: "{{ YOUR QUEUE NAME }}"
    accountId: "{{ YOUR ACCOUNT ID }}"
  emailer:
    subject: "Notice: Execution \"{{ workflow.name }}\" has {{ phase }} in \"{{ domain }}\"."
    sender:  "flyte-notifications@company.com"
    body: >
       Execution \"{{ workflow.name }} [{{ name }}]\" has {{ phase }} in \"{{ domain }}\". View details at
       <a href=\http://flyte.company.com/console/projects/{{ project }}/domains/{{ domain }}/executions/{{ name }}>
       http://flyte.company.com/console/projects/{{ project }}/domains/{{ domain }}/executions/{{ name }}</a>. {{ error }}
  • type: AWS is the only cloud back-end supported for executing scheduled workflows; hence "aws" is the only valid value. By default, the no-op executor is used.

  • region: Specifies the region AWS clients should use when creating SNS and SQS clients.

  • publisher: Handles pushing notification events to your SNS topic.
    • topicName: This is the arn of your SNS topic.

  • processor: Handles recording notification events and enqueueing them to be processed asynchronously.
    • queueName: Name of the SQS queue which will capture pending notification events.

    • accountId: AWS account id.

  • emailer: Encloses config details for sending and formatting emails used as notifications.
    • subject: Configurable subject line used in notification emails.

    • sender: Your verified SES email sender.

    • body: Configurable email body used in notifications.

The complete set of parameters that can be used for email templating are checked in here.

Total running time of the script: ( 0 minutes 0.000 seconds)

Gallery generated by Sphinx-Gallery