@agent_app.command()asyncdefstart(# deprecated main argumentwork_queue:str=typer.Argument(None,show_default=False,help="DEPRECATED: A work queue name or ID",),work_queues:List[str]=typer.Option(None,"-q","--work-queue",help="One or more work queue names for the agent to pull from.",),work_queue_prefix:List[str]=typer.Option(None,"-m","--match",help=("Dynamically matches work queue names with the specified prefix for the"" agent to pull from,for example `dev-` will match all work queues with a"" name that starts with `dev-`"),),work_pool_name:str=typer.Option(None,"-p","--pool",help="A work pool name for the agent to pull from.",),hide_welcome:bool=typer.Option(False,"--hide-welcome"),api:str=SettingsOption(PREFECT_API_URL),run_once:bool=typer.Option(False,help="Run the agent loop once, instead of forever."),prefetch_seconds:int=SettingsOption(PREFECT_AGENT_PREFETCH_SECONDS),# deprecated tagstags:List[str]=typer.Option(None,"-t","--tag",help=("DEPRECATED: One or more optional tags that will be used to create a work"" queue. This option will be removed on 2023-02-23."),),limit:int=typer.Option(None,"-l","--limit",help="Maximum number of flow runs to start simultaneously.",),):""" Start an agent process to poll one or more work queues for flow runs. """work_queues=work_queuesor[]ifwork_queueisnotNone:# try to treat the work_queue as a UUIDtry:asyncwithget_client()asclient:q=awaitclient.read_work_queue(UUID(work_queue))work_queue=q.name# otherwise treat it as a string nameexcept(TypeError,ValueError):passwork_queues.append(work_queue)app.console.print(("Agents now support multiple work queues. Instead of passing a single"" argument, provide work queue names with the `-q` or `--work-queue`"f" flag: `prefect agent start -q {work_queue}`\n"),style="blue",)ifwork_pool_name:is_queues_paused=await_check_work_queues_paused(work_pool_name=work_pool_name,work_queues=work_queues,)ifis_queues_paused:queue_scope=("The 'default' work queue"ifnotwork_queueselse"Specified work queue(s)")app.console.print((f"{queue_scope} in the work pool {work_pool_name!r} is currently"" paused. This agent will not execute any flow runs until the work"" queue(s) are unpaused."),style="yellow",)ifnotwork_queuesandnottagsandnotwork_queue_prefixandnotwork_pool_name:exit_with_error("No work queues provided!",style="red")elifbool(work_queues)+bool(tags)+bool(work_queue_prefix)>1:exit_with_error("Only one of `work_queues`, `match`, or `tags` can be provided.",style="red",)ifwork_pool_nameandtags:exit_with_error("`tag` and `pool` options cannot be used together.",style="red")iftags:work_queue_name=f"Agent queue {'-'.join(sorted(tags))}"app.console.print(("`tags` are deprecated. For backwards-compatibility with old versions"" of Prefect, this agent will create a work queue named"f" `{work_queue_name}` that uses legacy tag-based matching. This option"" will be removed on 2023-02-23."),style="red",)asyncwithget_client()asclient:try:work_queue=awaitclient.read_work_queue_by_name(work_queue_name)ifwork_queue.filterisNone:# ensure the work queue has legacy (deprecated) tag-based behaviorawaitclient.update_work_queue(filter=dict(tags=tags))exceptObjectNotFound:# if the work queue doesn't already exist, we create it with tags# to enable legacy (deprecated) tag-matching behaviorawaitclient.create_work_queue(name=work_queue_name,tags=tags)work_queues=[work_queue_name]ifnothide_welcome:ifapi:app.console.print(f"Starting v{prefect.__version__} agent connected to {api}...")else:app.console.print(f"Starting v{prefect.__version__} agent with ephemeral API...")agent_process_id=os.getpid()setup_signal_handlers_agent(agent_process_id,"the Prefect agent",app.console.print)asyncwithPrefectAgent(work_queues=work_queues,work_queue_prefix=work_queue_prefix,work_pool_name=work_pool_name,prefetch_seconds=prefetch_seconds,limit=limit,)asagent:ifnothide_welcome:app.console.print(ascii_name)ifwork_pool_name:app.console.print("Agent started! Looking for work from "f"work pool '{work_pool_name}'...")elifwork_queue_prefix:app.console.print("Agent started! Looking for work from "f"queue(s) that start with the prefix: {work_queue_prefix}...")else:app.console.print("Agent started! Looking for work from "f"queue(s): {', '.join(work_queues)}...")asyncwithanyio.create_task_group()astg:tg.start_soon(partial(critical_service_loop,agent.get_and_submit_flow_runs,PREFECT_AGENT_QUERY_INTERVAL.value(),printer=app.console.print,run_once=run_once,jitter_range=0.3,backoff=4,# Up to ~1 minute interval during backoff))tg.start_soon(partial(critical_service_loop,agent.check_for_cancelled_flow_runs,PREFECT_AGENT_QUERY_INTERVAL.value()*2,printer=app.console.print,run_once=run_once,jitter_range=0.3,backoff=4,))app.console.print("Agent stopped!")