classContextModel(BaseModel):""" A base model for context data that forbids mutation and extra data while providing a context manager """# The context variable for storing data must be defined by the child class__var__:ContextVar_token:Token=PrivateAttr(None)classConfig:# allow_mutation = Falsearbitrary_types_allowed=Trueextra="forbid"def__enter__(self):ifself._tokenisnotNone:raiseRuntimeError("Context already entered. Context enter calls cannot be nested.")self._token=self.__var__.set(self)returnselfdef__exit__(self,*_):ifnotself._token:raiseRuntimeError("Asymmetric use of context. Context exit called without an enter.")self.__var__.reset(self._token)self._token=None@classmethoddefget(cls:Type[T])->Optional[T]:returncls.__var__.get(None)defcopy(self,**kwargs):""" Duplicate the context model, optionally choosing which fields to include, exclude, or change. Attributes: include: Fields to include in new model. exclude: Fields to exclude from new model, as with values this takes precedence over include. update: Values to change/add in the new model. Note: the data is not validated before creating the new model - you should trust this data. deep: Set to `True` to make a deep copy of the model. Returns: A new model instance. """# Remove the token on copy to avoid re-entrance errorsnew=super().copy(**kwargs)new._token=Nonereturnnew
defcopy(self,**kwargs):""" Duplicate the context model, optionally choosing which fields to include, exclude, or change. Attributes: include: Fields to include in new model. exclude: Fields to exclude from new model, as with values this takes precedence over include. update: Values to change/add in the new model. Note: the data is not validated before creating the new model - you should trust this data. deep: Set to `True` to make a deep copy of the model. Returns: A new model instance. """# Remove the token on copy to avoid re-entrance errorsnew=super().copy(**kwargs)new._token=Nonereturnnew
classEngineContext(RunContext):""" The context for a flow run. Data in this context is only available from within a flow run function. Attributes: flow: The flow instance associated with the run flow_run: The API metadata for the flow run task_runner: The task runner instance being used for the flow run task_run_futures: A list of futures for task runs submitted within this flow run task_run_states: A list of states for task runs created within this flow run task_run_results: A mapping of result ids to task run states for this flow run flow_run_states: A list of states for flow runs created within this flow run sync_portal: A blocking portal for sync task/flow runs in an async flow timeout_scope: The cancellation scope for flow level timeouts """flow:Optional["Flow"]=Noneflow_run:Optional[FlowRun]=Noneautonomous_task_run:Optional[TaskRun]=Nonetask_runner:BaseTaskRunnerlog_prints:bool=Falseparameters:Optional[Dict[str,Any]]=None# Result handlingresult_factory:ResultFactory# Counter for task calls allowing uniquetask_run_dynamic_keys:Dict[str,int]=Field(default_factory=dict)# Counter for flow pausesobserved_flow_pauses:Dict[str,int]=Field(default_factory=dict)# Tracking for objects created by this flow runtask_run_futures:List[PrefectFuture]=Field(default_factory=list)task_run_states:List[State]=Field(default_factory=list)task_run_results:Dict[int,State]=Field(default_factory=dict)flow_run_states:List[State]=Field(default_factory=list)# The synchronous portal is only created for async flows for creating engine calls# from synchronous task and subflow callssync_portal:Optional[anyio.abc.BlockingPortal]=Nonetimeout_scope:Optional[anyio.abc.CancelScope]=None# Task group that can be used for background tasks during the flow runbackground_tasks:anyio.abc.TaskGroup# Events worker to emit events to Prefect Cloudevents:Optional[EventsWorker]=None__var__=ContextVar("flow_run")
classPrefectObjectRegistry(ContextModel):""" A context that acts as a registry for all Prefect objects that are registered during load and execution. Attributes: start_time: The time the object registry was created. block_code_execution: If set, flow calls will be ignored. capture_failures: If set, failures during __init__ will be silenced and tracked. """start_time:DateTimeTZ=Field(default_factory=lambda:pendulum.now("UTC"))_instance_registry:Dict[Type[T],List[T]]=PrivateAttr(default_factory=lambda:defaultdict(list))# Failures will be a tuple of (exception, instance, args, kwargs)_instance_init_failures:Dict[Type[T],List[Tuple[Exception,T,Tuple,Dict]]]=PrivateAttr(default_factory=lambda:defaultdict(list))block_code_execution:bool=Falsecapture_failures:bool=False__var__=ContextVar("object_registry")defget_instances(self,type_:Type[T])->List[T]:instances=[]forregistered_type,type_instancesinself._instance_registry.items():iftype_inregistered_type.mro():instances.extend(type_instances)returninstancesdefget_instance_failures(self,type_:Type[T])->List[Tuple[Exception,T,Tuple,Dict]]:failures=[]fortype__intype_.mro():failures.extend(self._instance_init_failures[type__])returnfailuresdefregister_instance(self,object):# TODO: Consider using a 'Set' to avoid duplicate entriesself._instance_registry[type(object)].append(object)defregister_init_failure(self,exc:Exception,object:Any,init_args:Tuple,init_kwargs:Dict):self._instance_init_failures[type(object)].append((exc,object,init_args,init_kwargs))@classmethoddefregister_instances(cls,type_:Type[T])->Type[T]:""" Decorator for a class that adds registration to the `PrefectObjectRegistry` on initialization of instances. """original_init=type_.__init__def__register_init__(__self__:T,*args:Any,**kwargs:Any)->None:registry=cls.get()try:original_init(__self__,*args,**kwargs)exceptExceptionasexc:ifnotregistryornotregistry.capture_failures:raiseelse:registry.register_init_failure(exc,__self__,args,kwargs)else:ifregistry:registry.register_instance(__self__)update_wrapper(__register_init__,original_init)type_.__init__=__register_init__returntype_
@classmethoddefregister_instances(cls,type_:Type[T])->Type[T]:""" Decorator for a class that adds registration to the `PrefectObjectRegistry` on initialization of instances. """original_init=type_.__init__def__register_init__(__self__:T,*args:Any,**kwargs:Any)->None:registry=cls.get()try:original_init(__self__,*args,**kwargs)exceptExceptionasexc:ifnotregistryornotregistry.capture_failures:raiseelse:registry.register_init_failure(exc,__self__,args,kwargs)else:ifregistry:registry.register_instance(__self__)update_wrapper(__register_init__,original_init)type_.__init__=__register_init__returntype_
The Prefect client instance being used for API communication
Source code in src/prefect/context.py
204205206207208209210211212213214215216
classRunContext(ContextModel):""" The base context for a flow or task run. Data in this context will always be available when `get_run_context` is called. Attributes: start_time: The time the run context was entered client: The Prefect client instance being used for API communication """start_time:DateTimeTZ=Field(default_factory=lambda:pendulum.now("UTC"))input_keyset:Optional[Dict[str,Dict[str,str]]]=Noneclient:Union[PrefectClient,SyncPrefectClient]
classSettingsContext(ContextModel):""" The context for a Prefect settings. This allows for safe concurrent access and modification of settings. Attributes: profile: The profile that is in use. settings: The complete settings model. """profile:Profilesettings:Settings__var__=ContextVar("settings")def__hash__(self)->int:returnhash(self.settings)def__enter__(self):""" Upon entrance, we ensure the home directory for the profile exists. """return_value=super().__enter__()try:prefect_home=Path(self.settings.value_of(PREFECT_HOME))prefect_home.mkdir(mode=0o0700,exist_ok=True)exceptOSError:warnings.warn(("Failed to create the Prefect home directory at "f"{self.settings.value_of(PREFECT_HOME)}"),stacklevel=2,)returnreturn_value@classmethoddefget(cls)->"SettingsContext":# Return the global context instead of `None` if no context existsreturnsuper().get()orGLOBAL_SETTINGS_CONTEXT
classTagsContext(ContextModel):""" The context for `prefect.tags` management. Attributes: current_tags: A set of current tags in the context """current_tags:Set[str]=Field(default_factory=set)@classmethoddefget(cls)->"TagsContext":# Return an empty `TagsContext` instead of `None` if no context existsreturncls.__var__.get(TagsContext())__var__=ContextVar("tags")
classTaskRunContext(RunContext):""" The context for a task run. Data in this context is only available from within a task run function. Attributes: task: The task instance associated with the task run task_run: The API metadata for this task run """task:"Task"task_run:TaskRunlog_prints:bool=Falseparameters:Dict[str,Any]# Result handlingresult_factory:ResultFactory__var__=ContextVar("task_run")
defget_run_context()->Union[FlowRunContext,TaskRunContext]:""" Get the current run context from within a task or flow function. Returns: A `FlowRunContext` or `TaskRunContext` depending on the function type. Raises RuntimeError: If called outside of a flow or task run. """task_run_ctx=TaskRunContext.get()iftask_run_ctx:returntask_run_ctxflow_run_ctx=FlowRunContext.get()ifflow_run_ctx:returnflow_run_ctxraiseMissingContextError("No run context available. You are not in a flow or task run context.")
Get the current settings context which contains profile information and the
settings that are being used.
Generally, the settings that are being used are a combination of values from the
profile and environment. See prefect.context.use_profile for more details.
Source code in src/prefect/context.py
382383384385386387388389390391392393394395
defget_settings_context()->SettingsContext:""" Get the current settings context which contains profile information and the settings that are being used. Generally, the settings that are being used are a combination of values from the profile and environment. See `prefect.context.use_profile` for more details. """settings_ctx=SettingsContext.get()ifnotsettings_ctx:raiseMissingContextError("No settings context found.")returnsettings_ctx
Return a fresh registry with instances populated from execution of a script.
Source code in src/prefect/context.py
455456457458459460461462463464465466467468469
defregistry_from_script(path:str,block_code_execution:bool=True,capture_failures:bool=True,)->PrefectObjectRegistry:""" Return a fresh registry with instances populated from execution of a script. """withPrefectObjectRegistry(block_code_execution=block_code_execution,capture_failures=capture_failures,)asregistry:load_script_as_module(path)returnregistry
Return the settings context that will exist as the root context for the module.
The profile to use is determined with the following precedence
- Command line via 'prefect --profile '
- Environment variable via 'PREFECT_PROFILE'
- Profiles file via the 'active' key
defroot_settings_context():""" Return the settings context that will exist as the root context for the module. The profile to use is determined with the following precedence - Command line via 'prefect --profile <name>' - Environment variable via 'PREFECT_PROFILE' - Profiles file via the 'active' key """profiles=prefect.settings.load_profiles()active_name=profiles.active_nameprofile_source="in the profiles file"if"PREFECT_PROFILE"inos.environ:active_name=os.environ["PREFECT_PROFILE"]profile_source="by environment variable"if(sys.argv[0].endswith("/prefect")andlen(sys.argv)>=3andsys.argv[1]=="--profile"):active_name=sys.argv[2]profile_source="by command line argument"ifactive_namenotinprofiles.names:print((f"WARNING: Active profile {active_name!r} set {profile_source} not ""found. The default profile will be used instead. "),file=sys.stderr,)active_name="default"withuse_profile(profiles[active_name],# Override environment variables if the profile was set by the CLIoverride_environment_variables=profile_source=="by command line argument",)assettings_context:returnsettings_context
@contextmanagerdeftags(*new_tags:str)->Generator[Set[str],None,None]:""" Context manager to add tags to flow and task run calls. Tags are always combined with any existing tags. Yields: The current set of tags Examples: >>> from prefect import tags, task, flow >>> @task >>> def my_task(): >>> pass Run a task with tags >>> @flow >>> def my_flow(): >>> with tags("a", "b"): >>> my_task() # has tags: a, b Run a flow with tags >>> @flow >>> def my_flow(): >>> pass >>> with tags("a", "b"): >>> my_flow() # has tags: a, b Run a task with nested tag contexts >>> @flow >>> def my_flow(): >>> with tags("a", "b"): >>> with tags("c", "d"): >>> my_task() # has tags: a, b, c, d >>> my_task() # has tags: a, b Inspect the current tags >>> @flow >>> def my_flow(): >>> with tags("c", "d"): >>> with tags("e", "f") as current_tags: >>> print(current_tags) >>> with tags("a", "b"): >>> my_flow() {"a", "b", "c", "d", "e", "f"} """current_tags=TagsContext.get().current_tagsnew_tags=current_tags.union(new_tags)withTagsContext(current_tags=new_tags):yieldnew_tags
The name of the profile to load or an instance of a Profile.
required
override_environment_variable
If set, variables in the profile will take
precedence over current environment variables. By default, environment
variables will override profile settings.
required
include_current_context
bool
If set, the new settings will be constructed
with the current settings context as a base. If not set, the use_base settings
will be loaded from the environment and defaults.
@contextmanagerdefuse_profile(profile:Union[Profile,str],override_environment_variables:bool=False,include_current_context:bool=True,):""" Switch to a profile for the duration of this context. Profile contexts are confined to an async context in a single thread. Args: profile: The name of the profile to load or an instance of a Profile. override_environment_variable: If set, variables in the profile will take precedence over current environment variables. By default, environment variables will override profile settings. include_current_context: If set, the new settings will be constructed with the current settings context as a base. If not set, the use_base settings will be loaded from the environment and defaults. Yields: The created `SettingsContext` object """ifisinstance(profile,str):profiles=prefect.settings.load_profiles()profile=profiles[profile]ifnotisinstance(profile,Profile):raiseTypeError(f"Unexpected type {type(profile).__name__!r} for `profile`. ""Expected 'str' or 'Profile'.")# Create a copy of the profiles settings as we will mutate itprofile_settings=profile.settings.copy()existing_context=SettingsContext.get()ifexisting_contextandinclude_current_context:settings=existing_context.settingselse:settings=prefect.settings.get_settings_from_env()ifnotoverride_environment_variables:forkeyinos.environ:ifkeyinprefect.settings.SETTING_VARIABLES:profile_settings.pop(prefect.settings.SETTING_VARIABLES[key],None)new_settings=settings.copy_with_update(updates=profile_settings)withSettingsContext(profile=profile,settings=new_settings)asctx:yieldctx