--- jupytext: cell_metadata_filter: all formats: md:myst main_language: python notebook_metadata_filter: all text_representation: extension: .md format_name: myst format_version: 0.13 jupytext_version: 1.16.0 kernelspec: display_name: Python 3 language: python name: python3 --- +++ {"lines_to_next_cell": 0} # ChatGPT agent example usage ## Basic Example ChatGPT can be used in lots of cases, for example, sentiment analysis, language translation, SQL query generation, and text summarization. This example shows you how to run ChatGPT tasks in flyte. ```{code-cell} from typing import List import flytekit from flytekit import ImageSpec, Secret, dynamic, task, workflow from flytekitplugins.openai import ChatGPTTask ``` +++ {"lines_to_next_cell": 0} You have to specify your `name`, `openai_organization` and `chatgpt_config`. `name` is for Flyte and it should be unique. `openai_organization` is for the OpenAI API. You can find it [here](https://platform.openai.com/account/organization). `chatgpt_config` is for OpenAI chat completion. You can find it [here](https://platform.openai.com/docs/api-reference/chat/create). ```{code-cell} chatgpt_small_job = ChatGPTTask( name="3.5-turbo", openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7", chatgpt_config={ "model": "gpt-3.5-turbo", "temperature": 0.7, }, ) chatgpt_big_job = ChatGPTTask( name="gpt-4", openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7", chatgpt_config={ "model": "gpt-4", "temperature": 0.7, }, ) @workflow def my_chatgpt_job(message: str) -> str: message = chatgpt_small_job(message=message) message = chatgpt_big_job(message=message) return message ``` +++ {"lines_to_next_cell": 0} You can execute the workflow locally. ```{code-cell} :lines_to_next_cell: 2 if __name__ == "__main__": print(f"Running {__file__} main...") print(f"Running my_chatgpt_job(message='hi') {my_chatgpt_job(message='hi')}") ``` +++ {"lines_to_next_cell": 0} ## ChatGPT Summary Bot These examples show you a real use case of ChatGPT in the production mode. For more details, see the [FlyteChatGPT Summary Bot GitHub repository](https://github.com/Future-Outlier/FlyteChatGPTSummaryBot) and the [demo video](https://youtu.be/IS6gi4jR7h0?si=hWHZp5LyjDspiwfD). ### Summarize Flyte's latest GitHub releases to Slack ```{code-cell} :lines_to_next_cell: 2 image = ImageSpec( apt_packages=["git"], packages=[ "flytekitplugins-chatgpt", "requests", "slack_sdk", ], registry="ghcr.io/flyteorg", ) chatgpt_job = ChatGPTTask( name="3.5-turbo", openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7", chatgpt_config={ "model": "gpt-3.5-turbo", "temperature": 0.7, }, ) @task( container_image=image, secret_requests=[Secret(key="token", group="github-api")], ) def get_github_latest_release(owner: str = "flyteorg", repo: str = "flyte") -> str: import requests token = flytekit.current_context().secrets.get("github-api", "token") url = f"https://api.github.com/repos/{owner}/{repo}/releases/latest" headers = { "Authorization": f"token {token}", "Accept": "application/vnd.github.v3+json", } response = requests.get(url, headers=headers) message = ( "You are a Bot. Provide a summary of the latest Flyte Github releases for users on Slack." "Ensure the response fits within 4000 characters, suitable for a Slack message. " "Start the message with 'These are the latest Flyte GitHub releases'. " f"End the message with 'Check out the releases page here: https://github.com/{owner}/{repo}/releases'. " "Note: Handling via the Slack API is not required. Format the response in bullet points.\n\n" f"Latest releases:\n{response.json()['body']}" ) return message @task( container_image=image, secret_requests=[Secret(key="token", group="slack-api")], ) def post_message_on_slack(message: str): if message == "": return from slack_sdk import WebClient token = flytekit.current_context().secrets.get("slack-api", "token") client = WebClient(token=token) client.chat_postMessage(channel="youtube-summary", text=message) @workflow def slack_wf(owner: str = "flyteorg", repo: str = "flyte", channel: str = "demo"): message = get_github_latest_release(owner=owner, repo=repo) message = chatgpt_job(message=message) post_message_on_slack(message=message) if __name__ == "__main__": slack_wf() ``` +++ {"lines_to_next_cell": 0} ### Summarize Flyte's latest YouTube Video to Slack ```{code-cell} image = ImageSpec( apt_packages=["git"], packages=[ "flytekitplugins-chatgpt", "scrapetube==2.5.1", "youtube_transcript_api==0.6.1", "slack_sdk==3.23.0", ], registry="ghcr.io/flyteorg", ) chatgpt_job = ChatGPTTask( name="3.5-turbo", openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7", chatgpt_config={ "model": "gpt-3.5-turbo", "temperature": 0.7, }, ) @task(container_image=image) def get_latest_video_transcript_chunks(channel_url: str) -> List[str]: import scrapetube from youtube_transcript_api import YouTubeTranscriptApi # fetch_latest_video_id video_generator = scrapetube.get_channel(channel_url=channel_url) latest_video = next(video_generator) video_id = latest_video["videoId"] # fetch_transcript transcript = YouTubeTranscriptApi.get_transcript(video_id) # chunk_transcript text_transcript = "\n".join([entry["text"] for entry in transcript]) return [text_transcript[i : i + 10000] for i in range(0, len(text_transcript), 10000)] @workflow def video_wf(channel_url: str): chunks = get_latest_video_transcript_chunks(channel_url=channel_url) dynamic_subwf(channel_url=channel_url, chunks=chunks) @task(container_image=image) def check_strs_len_less_than_num(msg1: str, msg2: str, num: int) -> bool: return len(msg1) + len(msg2) < num @task(container_image=image) def concatenate_str(msg1: str, msg2: str) -> str: return msg1 + msg2 + "\n" @task(container_image=image) def str_is_non_empty(msg: str) -> bool: return len(msg) == 0 @dynamic(container_image=image) def dynamic_subwf(channel_url: str, chunks: List[str]): post_message_on_slack( message=f"This is the latest video summary, checkout in Flyte's Youtube Channel!\n{channel_url}" ) summary_messages = [] for chunk in chunks: message = chatgpt_job( message=concatenate_str( msg1=( "Please provide a summary of the following portion of the transcript" " from the latest Flyte YouTube video. Note: This is only a segment" " of the entire transcript, which has been split into multiple parts." " The summary should be concise, not exceeding 4000 characters, and" " suitable for sharing on Slack." "Note: Handling via the Slack API is not required. Format the response in bullet points.\n\n" "Transcript:\n" ), msg2=chunk, ) ) summary_messages.append(message) message = "" for summary_message in summary_messages: b = check_strs_len_less_than_num(msg1=message, msg2=summary_message, num=15000) if b.is_true: message = concatenate_str(msg1=message, msg2=summary_message) if b.is_false: message = chatgpt_job( message=concatenate_str( msg1=( "Please provide a concise summary of the following messages" " generated by ChatGPT. The summary should be suitable for sharing" " on Slack and not exceed 4000 characters." "Note: Handling via the Slack API is not required. Format the response in bullet points.\n\n" "Transcript:\n" ), msg2=message, ) ) post_message_on_slack(message=message) message = summary_message b = str_is_non_empty(msg=message) if b.is_true: message = chatgpt_job( message=concatenate_str( msg1=( "Please provide a concise summary of the following messages" " generated by ChatGPT. The summary should be suitable for sharing" " on Slack and not exceed 4000 characters." "Note: Handling via the Slack API is not required. Format the response in bullet points.\n\n" "Transcript:\n" ), msg2=message, ) ) post_message_on_slack(message=message) if __name__ == "__main__": video_wf(channel_url="https://www.youtube.com/@flyteorg") ``` +++ {"lines_to_next_cell": 0} ### Summarize the latest MLOps trend from Medium to Twitter Note: This example only works in a local environment. ```{code-cell} chatgpt_job = ChatGPTTask( name="3.5-turbo", openai_organization="org-NayNG68kGnVXMJ8Ak4PMgQv7", chatgpt_config={ "model": "gpt-3.5-turbo", "temperature": 0.7, }, ) @task def get_weekly_articles_title(url: str = "https://medium.com/tag/flyte") -> str: from bs4 import BeautifulSoup from selenium import webdriver from selenium.webdriver.chrome.options import Options from selenium.webdriver.chrome.service import Service from webdriver_manager.chrome import ChromeDriverManager options = Options() options.add_argument("--headless") options.add_argument("--no-sandbox") options.add_argument("--disable-dev-shm-usage") driver = webdriver.Chrome(service=Service(ChromeDriverManager().install()), options=options) driver.get(url) page_source = driver.page_source driver.quit() soup = BeautifulSoup(page_source, "html.parser") texts = soup.stripped_strings all_text = " ".join(texts) message = ( f"You are a Bot. Provide a summary of the latest MLOps trend for users on Medium. " f"Your response should fit within 280 characters for a tweet, excluding the article's title. " f"Start the message with '''This is the trend of MLOps on Medium this week\n'''. Note: Tweet API handling is not required." f"```````" f"Article Title: {all_text}" ) return message @task( secret_requests=[ Secret(key="bearer_token", group="tweet-api"), Secret(key="consumer_key", group="tweet-api"), Secret(key="consumer_secret", group="tweet-api"), Secret(key="access_token", group="tweet-api"), Secret(key="access_token_secret", group="tweet-api"), ], ) def tweet(text: str): import tweepy TWEET_LENGTH = 280 BEARER_TOKEN = flytekit.current_context().secrets.get("tweet-api", "bearer_token") CONSUMER_KEY = flytekit.current_context().secrets.get("tweet-api", "consumer_key") CONSUMER_SECRET = flytekit.current_context().secrets.get("tweet-api", "consumer_secret") ACCESS_TOKEN = flytekit.current_context().secrets.get("tweet-api", "access_token") ACCESS_TOKEN_SECRET = flytekit.current_context().secrets.get("tweet-api", "access_token_secret") client = tweepy.Client( bearer_token=BEARER_TOKEN, consumer_key=CONSUMER_KEY, consumer_secret=CONSUMER_SECRET, access_token=ACCESS_TOKEN, access_token_secret=ACCESS_TOKEN_SECRET, ) if len(text) > TWEET_LENGTH: text = text[:TWEET_LENGTH] client.create_tweet(text=text) @workflow def tweet_wf(url: str = "https://medium.com/tag/flyte"): message = get_weekly_articles_title(url=url) message = chatgpt_job(message=message) tweet(text=message) if __name__ == "__main__": tweet_wf() ```