This example demonstrates how to implement human-in-the-loop functionality with async agents, requiring user confirmation before executing tool operations.
"""🤝 Human-in-the-Loop: Adding User Confirmation to Tool CallsThis example shows how to implement human-in-the-loop functionality in your Agno tools.It shows how to:- Handle user confirmation during tool execution- Gracefully cancel operations based on user choiceSome practical applications:- Confirming sensitive operations before execution- Reviewing API calls before they're made- Validating data transformations- Approving automated actions in critical systemsRun `pip install openai httpx rich agno` to install dependencies."""import asyncioimport jsonimport httpxfrom agno.agent import Agentfrom agno.models.openai import OpenAIChatfrom agno.tools import toolfrom agno.utils import pprintfrom rich.console import Consolefrom rich.prompt import Promptconsole = Console()@tool(requires_confirmation=True)def get_top_hackernews_stories(num_stories: int) -> str: """Fetch top stories from Hacker News. Args: num_stories (int): Number of stories to retrieve Returns: str: JSON string containing story details """ # Fetch top story IDs response = httpx.get("https://hacker-news.firebaseio.com/v0/topstories.json") story_ids = response.json() # Yield story details all_stories = [] for story_id in story_ids[:num_stories]: story_response = httpx.get( f"https://hacker-news.firebaseio.com/v0/item/{story_id}.json" ) story = story_response.json() if "text" in story: story.pop("text", None) all_stories.append(story) return json.dumps(all_stories)agent = Agent( model=OpenAIChat(id="gpt-5-mini"), tools=[get_top_hackernews_stories], markdown=True,)run_response = asyncio.run(agent.arun("Fetch the top 2 hackernews stories"))if run_response.is_paused: for tool in run_response.tools_requiring_confirmation: # Ask for confirmation console.print( f"Tool name [bold blue]{tool.tool_name}({tool.tool_args})[/] requires confirmation." ) message = ( Prompt.ask("Do you want to continue?", choices=["y", "n"], default="y") .strip() .lower() ) if message == "n": tool.confirmed = False else: # We update the tools in place tool.confirmed = Truerun_response = asyncio.run(agent.acontinue_run(run_response=run_response))# Or# run_response = asyncio.run(agent.acontinue_run(run_id=run_response.run_id))pprint.pprint_run_response(run_response)# Or for simple debug flow# asyncio.run(agent.aprint_response("Fetch the top 2 hackernews stories"))