AI Agents with LangGraph

Nov 12 2024 · Python 3.12, LangGraph 0.2.x, JupyterLab 4.2.4

Lesson 04: Enhancing Agent Capabilities

Human-in-the-Loop Demo

Episode complete

Play next episode

Next

Heads up... You’re accessing parts of this content for free, with some sections shown as obfuscated text.

Heads up... You’re accessing parts of this content for free, with some sections shown as obfuscated text.

Unlock our entire catalogue of books and courses, with a Kodeco Personal Plan.

Unlock now

Open the empty human.ipynb notebook in the Starter project. Then add the needed imports for the demo as well as a State class:

from typing import TypedDict, Literal
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import MemorySaver
from IPython.display import Image, display

class State(TypedDict):
  should_launch: bool
def start_engines(state):
  print("Starting engines...")
  return state

def launch_spaceship(state):
  print("Launching spaceship!")
  return state
def human_review(state):
  print("Perform human review")
  return state

def route_chooser(state) -> Literal["launch_spaceship", END]:
  if state["should_launch"]:
    return "launch_spaceship"
  else:
    print("Launch aborted.")
    return END
graph = StateGraph(State)

graph.add_node("start_engines", start_engines)
graph.add_node("human_review", human_review)
graph.add_node("launch_spaceship", launch_spaceship)

graph.add_edge(START, "start_engines")
graph.add_edge("start_engines", "human_review")
graph.add_conditional_edges("human_review", route_chooser)
graph.add_edge("launch_spaceship", END)
memory = MemorySaver()
app = graph.compile(checkpointer=memory, interrupt_before=["human_review"])
display(Image(app.get_graph().draw_mermaid_png()))
initial_input = {"should_launch": False}
thread = {"configurable": {"thread_id": "1"}}
result = app.invoke(initial_input, thread)
app.get_state(thread).next
user_input = input("Do you confirm the launch? (yes/no): ")
should_launch = user_input == "yes"
app.get_state(thread).values
app.update_state(thread, {"should_launch": should_launch})
app.get_state(thread).values
result = app.invoke(None, thread)
app.get_state(thread).next
See forum comments
Cinema mode Download course materials from Github
Previous: Human-in-the-Loop Next: Localizer Project