classAbstractAppriseNotificationBlock(NotificationBlock,ABC):""" An abstract class for sending notifications using Apprise. """notify_type:Literal["prefect_default","info","success","warning","failure"]=Field(default=PREFECT_NOTIFY_TYPE_DEFAULT,description=("The type of notification being performed; the prefect_default ""is a plain notification that does not attach an image."),)def__init__(self,*args,**kwargs):importappriseifPREFECT_NOTIFY_TYPE_DEFAULTnotinapprise.NOTIFY_TYPES:apprise.NOTIFY_TYPES+=(PREFECT_NOTIFY_TYPE_DEFAULT,)super().__init__(*args,**kwargs)def_start_apprise_client(self,url:SecretStr):fromappriseimportApprise,AppriseAsset# A custom `AppriseAsset` that ensures Prefect Notifications# appear correctly across multiple messaging platformsprefect_app_data=AppriseAsset(app_id="Prefect Notifications",app_desc="Prefect Notifications",app_url="https://prefect.io",)self._apprise_client=Apprise(asset=prefect_app_data)self._apprise_client.add(url.get_secret_value())defblock_initialization(self)->None:self._start_apprise_client(self.url)@sync_compatible@instrument_instance_method_callasyncdefnotify(self,body:str,subject:Optional[str]=None,):withLogEavesdropper("apprise",level=logging.DEBUG)aseavesdropper:result=awaitself._apprise_client.async_notify(body=body,title=subject,notify_type=self.notify_type)ifnotresultandself._raise_on_failure:raiseNotificationError(log=eavesdropper.text())
A base class for sending notifications using Apprise, through webhook URLs.
Source code in src/prefect/blocks/notifications.py
798081828384858687888990
classAppriseNotificationBlock(AbstractAppriseNotificationBlock,ABC):""" A base class for sending notifications using Apprise, through webhook URLs. """_documentation_url="https://docs.prefect.io/ui/notifications/"url:SecretStr=Field(default=...,title="Webhook URL",description="Incoming webhook URL used to send notifications.",examples=["https://hooks.example.com/XXX"],)
Enables sending notifications via any custom webhook.
All nested string param contains {{key}} will be substituted with value from context/secrets.
Context values include: subject, body and name.
Examples:
Load a saved custom webhook and send a message:
fromprefect.blocks.notificationsimportCustomWebhookNotificationBlockcustom_webhook_block=CustomWebhookNotificationBlock.load("BLOCK_NAME")custom_webhook_block.notify("Hello from Prefect!")
Source code in src/prefect/blocks/notifications.py
classCustomWebhookNotificationBlock(NotificationBlock):""" Enables sending notifications via any custom webhook. All nested string param contains `{{key}}` will be substituted with value from context/secrets. Context values include: `subject`, `body` and `name`. Examples: Load a saved custom webhook and send a message: ```python from prefect.blocks.notifications import CustomWebhookNotificationBlock custom_webhook_block = CustomWebhookNotificationBlock.load("BLOCK_NAME") custom_webhook_block.notify("Hello from Prefect!") ``` """_block_type_name="Custom Webhook"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/c7247cb359eb6cf276734d4b1fbf00fb8930e89e-250x250.png"_documentation_url="https://docs.prefect.io/api-ref/prefect/blocks/notifications/#prefect.blocks.notifications.CustomWebhookNotificationBlock"name:str=Field(title="Name",description="Name of the webhook.")url:str=Field(title="Webhook URL",description="The webhook URL.",examples=["https://hooks.slack.com/XXX"],)method:Literal["GET","POST","PUT","PATCH","DELETE"]=Field(default="POST",description="The webhook request method. Defaults to `POST`.")params:Optional[Dict[str,str]]=Field(default=None,title="Query Params",description="Custom query params.")json_data:Optional[dict]=Field(default=None,title="JSON Data",description="Send json data as payload.",examples=['{"text": "{{subject}}\\n{{body}}", "title": "{{name}}", "token":'' "{{tokenFromSecrets}}"}'],)form_data:Optional[Dict[str,str]]=Field(default=None,title="Form Data",description=("Send form data as payload. Should not be used together with _JSON Data_."),examples=['{"text": "{{subject}}\\n{{body}}", "title": "{{name}}", "token":'' "{{tokenFromSecrets}}"}'],)headers:Optional[Dict[str,str]]=Field(None,description="Custom headers.")cookies:Optional[Dict[str,str]]=Field(None,description="Custom cookies.")timeout:float=Field(default=10,description="Request timeout in seconds. Defaults to 10.")secrets:SecretDict=Field(default_factory=lambda:SecretDict(dict()),title="Custom Secret Values",description="A dictionary of secret values to be substituted in other configs.",examples=['{"tokenFromSecrets":"SomeSecretToken"}'],)def_build_request_args(self,body:str,subject:Optional[str]):"""Build kwargs for httpx.AsyncClient.request"""# prepare valuesvalues=self.secrets.get_secret_value()# use 'null' when subject is Nonevalues.update({"subject":"null"ifsubjectisNoneelsesubject,"body":body,"name":self.name,})# do substutionreturnapply_values({"method":self.method,"url":self.url,"params":self.params,"data":self.form_data,"json":self.json_data,"headers":self.headers,"cookies":self.cookies,"timeout":self.timeout,},values,)defblock_initialization(self)->None:# check form_data and json_dataifself.form_dataisnotNoneandself.json_dataisnotNone:raiseValueError("both `Form Data` and `JSON Data` provided")allowed_keys={"subject","body","name"}.union(self.secrets.get_secret_value().keys())# test template to raise a error earlyfornamein["url","params","form_data","json_data","headers","cookies"]:template=getattr(self,name)iftemplateisNone:continue# check for placeholders not in predefined keys and secretsplaceholders=find_placeholders(template)forplaceholderinplaceholders:ifplaceholder.namenotinallowed_keys:raiseKeyError(f"{name}/{placeholder}")@sync_compatible@instrument_instance_method_callasyncdefnotify(self,body:str,subject:Optional[str]=None):importhttpxrequest_args=self._build_request_args(body,subject)cookies=request_args.pop("cookies",None)# make request with httpxclient=httpx.AsyncClient(headers={"user-agent":"Prefect Notifications"},cookies=cookies)asyncwithclient:resp=awaitclient.request(**request_args)resp.raise_for_status()
fromprefect.blocks.notificationsimportDiscordWebhookdiscord_webhook_block=DiscordWebhook.load("BLOCK_NAME")discord_webhook_block.notify("Hello from Prefect!")
Source code in src/prefect/blocks/notifications.py
classDiscordWebhook(AbstractAppriseNotificationBlock):""" Enables sending notifications via a provided Discord webhook. See [Apprise notify_Discord docs](https://github.com/caronc/apprise/wiki/Notify_Discord) # noqa Examples: Load a saved Discord webhook and send a message: ```python from prefect.blocks.notifications import DiscordWebhook discord_webhook_block = DiscordWebhook.load("BLOCK_NAME") discord_webhook_block.notify("Hello from Prefect!") ``` """_description="Enables sending notifications via a provided Discord webhook."_block_type_name="Discord Webhook"_block_type_slug="discord-webhook"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/9e94976c80ef925b66d24e5d14f0d47baa6b8f88-250x250.png"_documentation_url="https://docs.prefect.io/api-ref/prefect/blocks/notifications/#prefect.blocks.notifications.DiscordWebhook"webhook_id:SecretStr=Field(default=...,description=("The first part of 2 tokens provided to you after creating a"" incoming-webhook."),)webhook_token:SecretStr=Field(default=...,description=("The second part of 2 tokens provided to you after creating a"" incoming-webhook."),)botname:Optional[str]=Field(title="Bot name",default=None,description=("Identify the name of the bot that should issue the message. If one isn't"" specified then the default is to just use your account (associated with"" the incoming-webhook)."),)tts:bool=Field(default=False,description="Whether to enable Text-To-Speech.",)include_image:bool=Field(default=False,description=("Whether to include an image in-line with the message describing the"" notification type."),)avatar:bool=Field(default=False,description="Whether to override the default discord avatar icon.",)avatar_url:Optional[str]=Field(title="Avatar URL",default=False,description=("Over-ride the default discord avatar icon URL. By default this is not set"" and Apprise chooses the URL dynamically based on the type of message"" (info, success, warning, or error)."),)defblock_initialization(self)->None:fromapprise.plugins.discordimportNotifyDiscordurl=SecretStr(NotifyDiscord(webhook_id=self.webhook_id.get_secret_value(),webhook_token=self.webhook_token.get_secret_value(),botname=self.botname,tts=self.tts,include_image=self.include_image,avatar=self.avatar,avatar_url=self.avatar_url,).url())self._start_apprise_client(url)
Load a saved Mattermost webhook and send a message:
fromprefect.blocks.notificationsimportMattermostWebhookmattermost_webhook_block=MattermostWebhook.load("BLOCK_NAME")mattermost_webhook_block.notify("Hello from Prefect!")
Source code in src/prefect/blocks/notifications.py
classMattermostWebhook(AbstractAppriseNotificationBlock):""" Enables sending notifications via a provided Mattermost webhook. See [Apprise notify_Mattermost docs](https://github.com/caronc/apprise/wiki/Notify_Mattermost) # noqa Examples: Load a saved Mattermost webhook and send a message: ```python from prefect.blocks.notifications import MattermostWebhook mattermost_webhook_block = MattermostWebhook.load("BLOCK_NAME") mattermost_webhook_block.notify("Hello from Prefect!") ``` """_description="Enables sending notifications via a provided Mattermost webhook."_block_type_name="Mattermost Webhook"_block_type_slug="mattermost-webhook"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/1350a147130bf82cbc799a5f868d2c0116207736-250x250.png"_documentation_url="https://docs.prefect.io/api-ref/prefect/blocks/notifications/#prefect.blocks.notifications.MattermostWebhook"hostname:str=Field(default=...,description="The hostname of your Mattermost server.",examples=["Mattermost.example.com"],)token:SecretStr=Field(default=...,description="The token associated with your Mattermost webhook.",)botname:Optional[str]=Field(title="Bot name",default=None,description="The name of the bot that will send the message.",)channels:Optional[List[str]]=Field(default=None,description="The channel(s) you wish to notify.",)include_image:bool=Field(default=False,description="Whether to include the Apprise status image in the message.",)path:Optional[str]=Field(default=None,description="An optional sub-path specification to append to the hostname.",)port:int=Field(default=8065,description="The port of your Mattermost server.",)defblock_initialization(self)->None:fromapprise.plugins.mattermostimportNotifyMattermosturl=SecretStr(NotifyMattermost(token=self.token.get_secret_value(),fullpath=self.path,host=self.hostname,botname=self.botname,channels=self.channels,include_image=self.include_image,port=self.port,).url())self._start_apprise_client(url)
Enables sending notifications via a provided Microsoft Teams webhook.
Examples:
Load a saved Teams webhook and send a message:
fromprefect.blocks.notificationsimportMicrosoftTeamsWebhookteams_webhook_block=MicrosoftTeamsWebhook.load("BLOCK_NAME")teams_webhook_block.notify("Hello from Prefect!")
Source code in src/prefect/blocks/notifications.py
classMicrosoftTeamsWebhook(AppriseNotificationBlock):""" Enables sending notifications via a provided Microsoft Teams webhook. Examples: Load a saved Teams webhook and send a message: ```python from prefect.blocks.notifications import MicrosoftTeamsWebhook teams_webhook_block = MicrosoftTeamsWebhook.load("BLOCK_NAME") teams_webhook_block.notify("Hello from Prefect!") ``` """_block_type_name="Microsoft Teams Webhook"_block_type_slug="ms-teams-webhook"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/817efe008a57f0a24f3587414714b563e5e23658-250x250.png"_documentation_url="https://docs.prefect.io/api-ref/prefect/blocks/notifications/#prefect.blocks.notifications.MicrosoftTeamsWebhook"url:SecretStr=Field(...,title="Webhook URL",description="The Teams incoming webhook URL used to send notifications.",examples=["https://your-org.webhook.office.com/webhookb2/XXX/IncomingWebhook/YYY/ZZZ"],)
Enables sending notifications via a provided Opsgenie webhook.
See Apprise notify_opsgenie docs
for more info on formatting the URL.
Examples:
Load a saved Opsgenie webhook and send a message:
fromprefect.blocks.notificationsimportOpsgenieWebhookopsgenie_webhook_block=OpsgenieWebhook.load("BLOCK_NAME")opsgenie_webhook_block.notify("Hello from Prefect!")
Source code in src/prefect/blocks/notifications.py
classOpsgenieWebhook(AbstractAppriseNotificationBlock):""" Enables sending notifications via a provided Opsgenie webhook. See [Apprise notify_opsgenie docs](https://github.com/caronc/apprise/wiki/Notify_opsgenie) for more info on formatting the URL. Examples: Load a saved Opsgenie webhook and send a message: ```python from prefect.blocks.notifications import OpsgenieWebhook opsgenie_webhook_block = OpsgenieWebhook.load("BLOCK_NAME") opsgenie_webhook_block.notify("Hello from Prefect!") ``` """_description="Enables sending notifications via a provided Opsgenie webhook."_block_type_name="Opsgenie Webhook"_block_type_slug="opsgenie-webhook"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/d8b5bc6244ae6cd83b62ec42f10d96e14d6e9113-280x280.png"_documentation_url="https://docs.prefect.io/api-ref/prefect/blocks/notifications/#prefect.blocks.notifications.OpsgenieWebhook"apikey:SecretStr=Field(default=...,title="API Key",description="The API Key associated with your Opsgenie account.",)target_user:Optional[List]=Field(default=None,description="The user(s) you wish to notify.")target_team:Optional[List]=Field(default=None,description="The team(s) you wish to notify.")target_schedule:Optional[List]=Field(default=None,description="The schedule(s) you wish to notify.")target_escalation:Optional[List]=Field(default=None,description="The escalation(s) you wish to notify.")region_name:Literal["us","eu"]=Field(default="us",description="The 2-character region code.")batch:bool=Field(default=False,description="Notify all targets in batches (instead of individually).",)tags:Optional[List]=Field(default=None,description=("A comma-separated list of tags you can associate with your Opsgenie"" message."),examples=['["tag1", "tag2"]'],)priority:Optional[str]=Field(default=3,description=("The priority to associate with the message. It is on a scale between 1"" (LOW) and 5 (EMERGENCY)."),)alias:Optional[str]=Field(default=None,description="The alias to associate with the message.")entity:Optional[str]=Field(default=None,description="The entity to associate with the message.")details:Optional[Dict[str,str]]=Field(default=None,description="Additional details composed of key/values pairs.",examples=['{"key1": "value1", "key2": "value2"}'],)defblock_initialization(self)->None:fromapprise.plugins.opsgenieimportNotifyOpsgenietargets=[]ifself.target_user:[targets.append(f"@{x}")forxinself.target_user]ifself.target_team:[targets.append(f"#{x}")forxinself.target_team]ifself.target_schedule:[targets.append(f"*{x}")forxinself.target_schedule]ifself.target_escalation:[targets.append(f"^{x}")forxinself.target_escalation]url=SecretStr(NotifyOpsgenie(apikey=self.apikey.get_secret_value(),targets=targets,region_name=self.region_name,details=self.details,priority=self.priority,alias=self.alias,entity=self.entity,batch=self.batch,tags=self.tags,).url())self._start_apprise_client(url)
Enables sending notifications via a provided PagerDuty webhook.
See Apprise notify_pagerduty docs
for more info on formatting the URL.
Examples:
Load a saved PagerDuty webhook and send a message:
fromprefect.blocks.notificationsimportPagerDutyWebHookpagerduty_webhook_block=PagerDutyWebHook.load("BLOCK_NAME")pagerduty_webhook_block.notify("Hello from Prefect!")
Source code in src/prefect/blocks/notifications.py
classPagerDutyWebHook(AbstractAppriseNotificationBlock):""" Enables sending notifications via a provided PagerDuty webhook. See [Apprise notify_pagerduty docs](https://github.com/caronc/apprise/wiki/Notify_pagerduty) for more info on formatting the URL. Examples: Load a saved PagerDuty webhook and send a message: ```python from prefect.blocks.notifications import PagerDutyWebHook pagerduty_webhook_block = PagerDutyWebHook.load("BLOCK_NAME") pagerduty_webhook_block.notify("Hello from Prefect!") ``` """_description="Enables sending notifications via a provided PagerDuty webhook."_block_type_name="Pager Duty Webhook"_block_type_slug="pager-duty-webhook"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/8dbf37d17089c1ce531708eac2e510801f7b3aee-250x250.png"_documentation_url="https://docs.prefect.io/api-ref/prefect/blocks/notifications/#prefect.blocks.notifications.PagerDutyWebHook"# The default cannot be prefect_default because NotifyPagerDuty's# PAGERDUTY_SEVERITY_MAP only has these notify types defined as keysnotify_type:Literal["info","success","warning","failure"]=Field(default="info",description="The severity of the notification.")integration_key:SecretStr=Field(default=...,description=("This can be found on the Events API V2 ""integration's detail page, and is also referred to as a Routing Key. ""This must be provided alongside `api_key`, but will error if provided ""alongside `url`."),)api_key:SecretStr=Field(default=...,title="API Key",description=("This can be found under Integrations. ""This must be provided alongside `integration_key`, but will error if ""provided alongside `url`."),)source:Optional[str]=Field(default="Prefect",description="The source string as part of the payload.")component:str=Field(default="Notification",description="The component string as part of the payload.",)group:Optional[str]=Field(default=None,description="The group string as part of the payload.")class_id:Optional[str]=Field(default=None,title="Class ID",description="The class string as part of the payload.",)region_name:Literal["us","eu"]=Field(default="us",description="The region name.")clickable_url:Optional[AnyHttpUrl]=Field(default=None,title="Clickable URL",description="A clickable URL to associate with the notice.",)include_image:bool=Field(default=True,description="Associate the notification status via a represented icon.",)custom_details:Optional[Dict[str,str]]=Field(default=None,description="Additional details to include as part of the payload.",examples=['{"disk_space_left": "145GB"}'],)defblock_initialization(self)->None:fromapprise.plugins.pagerdutyimportNotifyPagerDutyurl=SecretStr(NotifyPagerDuty(apikey=self.api_key.get_secret_value(),integrationkey=self.integration_key.get_secret_value(),source=self.source,component=self.component,group=self.group,class_id=self.class_id,region_name=self.region_name,click=self.clickable_url,include_image=self.include_image,details=self.custom_details,).url())self._start_apprise_client(url)
classSendgridEmail(AbstractAppriseNotificationBlock):""" Enables sending notifications via any sendgrid account. See [Apprise Notify_sendgrid docs](https://github.com/caronc/apprise/wiki/Notify_Sendgrid) Examples: Load a saved Sendgrid and send a email message: ```python from prefect.blocks.notifications import SendgridEmail sendgrid_block = SendgridEmail.load("BLOCK_NAME") sendgrid_block.notify("Hello from Prefect!") """_description="Enables sending notifications via Sendgrid email service."_block_type_name="Sendgrid Email"_block_type_slug="sendgrid-email"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/82bc6ed16ca42a2252a5512c72233a253b8a58eb-250x250.png"_documentation_url="https://docs.prefect.io/api-ref/prefect/blocks/notifications/#prefect.blocks.notifications.SendgridEmail"api_key:SecretStr=Field(default=...,title="API Key",description="The API Key associated with your sendgrid account.",)sender_email:str=Field(title="Sender email id",description="The sender email id.",examples=["test-support@gmail.com"],)to_emails:List[str]=Field(default=...,title="Recipient emails",description="Email ids of all recipients.",examples=['"recipient1@gmail.com"'],)defblock_initialization(self)->None:fromapprise.plugins.sendgridimportNotifySendGridurl=SecretStr(NotifySendGrid(apikey=self.api_key.get_secret_value(),from_email=self.sender_email,targets=self.to_emails,).url())self._start_apprise_client(url)
classSlackWebhook(AppriseNotificationBlock):""" Enables sending notifications via a provided Slack webhook. Examples: Load a saved Slack webhook and send a message: ```python from prefect.blocks.notifications import SlackWebhook slack_webhook_block = SlackWebhook.load("BLOCK_NAME") slack_webhook_block.notify("Hello from Prefect!") ``` """_block_type_name="Slack Webhook"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/c1965ecbf8704ee1ea20d77786de9a41ce1087d1-500x500.png"_documentation_url="https://docs.prefect.io/api-ref/prefect/blocks/notifications/#prefect.blocks.notifications.SlackWebhook"url:SecretStr=Field(default=...,title="Webhook URL",description="Slack incoming webhook URL used to send notifications.",examples=["https://hooks.slack.com/XXX"],)
classTwilioSMS(AbstractAppriseNotificationBlock):"""Enables sending notifications via Twilio SMS. Find more on sending Twilio SMS messages in the [docs](https://www.twilio.com/docs/sms). Examples: Load a saved `TwilioSMS` block and send a message: ```python from prefect.blocks.notifications import TwilioSMS twilio_webhook_block = TwilioSMS.load("BLOCK_NAME") twilio_webhook_block.notify("Hello from Prefect!") ``` """_description="Enables sending notifications via Twilio SMS."_block_type_name="Twilio SMS"_block_type_slug="twilio-sms"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/8bd8777999f82112c09b9c8d57083ac75a4a0d65-250x250.png"# noqa_documentation_url="https://docs.prefect.io/api-ref/prefect/blocks/notifications/#prefect.blocks.notifications.TwilioSMS"account_sid:str=Field(default=...,description=("The Twilio Account SID - it can be found on the homepage ""of the Twilio console."),)auth_token:SecretStr=Field(default=...,description=("The Twilio Authentication Token - ""it can be found on the homepage of the Twilio console."),)from_phone_number:str=Field(default=...,description="The valid Twilio phone number to send the message from.",examples=["18001234567"],)to_phone_numbers:List[str]=Field(default=...,description="A list of valid Twilio phone number(s) to send the message to.",# not wrapped in brackets because of the way UI displays examples; in code should be ["18004242424"]examples=["18004242424"],)defblock_initialization(self)->None:fromapprise.plugins.twilioimportNotifyTwiliourl=SecretStr(NotifyTwilio(account_sid=self.account_sid,auth_token=self.auth_token.get_secret_value(),source=self.from_phone_number,targets=self.to_phone_numbers,).url())self._start_apprise_client(url)