State change hooks execute code in response to changes in flow or task run states, enabling you to define actions for specific state transitions in a workflow.
This guide provides examples of real-world use cases.
State change hooks enable you to customize messages sent when tasks transition between states, such as sending notifications containing sensitive information when tasks enter a Failed state. Let's run a client-side hook upon a flow run entering a Failed state.
fromprefectimportflowfromprefect.blocks.coreimportBlockfromprefect.settingsimportPREFECT_API_URLdefnotify_slack(flow,flow_run,state):slack_webhook_block=Block.load("slack-webhook/my-slack-webhook")slack_webhook_block.notify((f"Your job {flow_run.name} entered {state.name} "f"with message:\n\n"f"See <https://{PREFECT_API_URL.value()}/flow-runs/"f"flow-run/{flow_run.id}|the flow run in the UI>\n\n"f"Tags: {flow_run.tags}\n\n"f"Scheduled start: {flow_run.expected_start_time}"))@flow(on_failure=[notify_slack],retries=1)deffailing_flow():raiseValueError("oops!")if__name__=="__main__":failing_flow()
Note that because we've configured retries in this example, the on_failure hook will not run until all retries have completed, when the flow run enters a Failed state.
State change hooks can aid in managing infrastructure cleanup in scenarios where tasks spin up individual infrastructure resources independently of Prefect.
When a flow run crashes, tasks may exit abruptly, resulting in the potential omission of cleanup logic within the tasks.
State change hooks can be used to ensure infrastructure is properly cleaned up even when a flow run enters a Crashed state!
Let's create a hook that deletes a Cloud Run job if the flow run crashes.
importosfromprefectimportflow,taskfromprefect.blocks.systemimportStringfromprefect.clientimportget_clientimportprefect.runtimeasyncdefdelete_cloud_run_job(flow,flow_run,state):"""Flow run state change hook that deletes a Cloud Run Job if the flow run crashes."""# retrieve Cloud Run job namecloud_run_job_name=awaitString.load(name="crashing-flow-cloud-run-job")# delete Cloud Run jobdelete_job_command=f"yes | gcloud beta run jobs delete {cloud_run_job_name.value}--regionus-central1"os.system(delete_job_command)# clean up the Cloud Run job string block as wellasyncwithget_client()asclient:block_document=awaitclient.read_block_document_by_name("crashing-flow-cloud-run-job",block_type_slug="string")awaitclient.delete_block_document(block_document.id)@taskdefmy_task_that_crashes():raiseSystemExit("Crashing on purpose!")@flow(on_crashed=[delete_cloud_run_job])defcrashing_flow():"""Save the flow run name (i.e. Cloud Run job name) as a String block. It then executes a task that ends up crashing."""flow_run_name=prefect.runtime.flow_run.namecloud_run_job_name=String(value=flow_run_name)cloud_run_job_name.save(name="crashing-flow-cloud-run-job",overwrite=True)my_task_that_crashes()if__name__=="__main__":crashing_flow()