Skip to content

LlamaIndex

Or: When Your AI Needs to Remember What Happened

Section titled “Or: When Your AI Needs to Remember What Happened”
  • Design stateful AI workflows using LangGraph’s StateGraph and conditional routing.
  • Implement cyclic execution paths to enable self-correcting agent behaviors.
  • Diagnose infinite loops and state mutation errors in complex multi-agent architectures.
  • Evaluate the trade-offs between linear chains and graph-based workflow orchestration.

In a long-running AI workflow, a session can fail late in the process when a downstream dependency times out, even after substantial work has already been completed.

He refreshed the dashboard. Everything was gone. All 52 documents, all verification results, all compliance checks—the entire session had vanished. The customer would have to start over from scratch. The financial impact can be immediate: failed long-running sessions often increase abandonment and lost revenue.

Without persisted state, a workflow failure can wipe in-memory progress; with checkpointing, the system can resume from the most recent saved step instead of restarting from scratch. This module will teach you how to build exactly this kind of resilient, stateful AI architecture.

Section 1: The Graph Mental Model and Linear Limitations

Section titled “Section 1: The Graph Mental Model and Linear Limitations”

In previous modules, you mastered Chain-of-Thought and ReAct patterns. But real production workflows require more than a single reasoning trace. They need to remember state across many steps, branch conditionally, loop back if something fails, and coordinate multiple agents.

Think of standard linear chains like a highway with no exits. Once you start, you can only go forward. If you miss something, you have to drive to the end and start over from the beginning.

graph TD
Input --> Chain1
Chain1 --> Chain2
Chain2 --> Chain3
Chain3 --> Output
Chain3 -.-> ConditionBranch
ConditionBranch --> PathA[Path A]
ConditionBranch --> PathB[Path B]

Real-world workflows, however, look more like a city street grid. You can loop back, take detours, or recover from wrong turns without restarting your journey. LangGraph allows you to construct these cyclic execution paths naturally.

graph TD
Research --> Analyze
Analyze --> Check
Check -- not good enough --> Research
Check --> Write
Write --> Review
Review -- revisions needed --> Write
Review --> Publish

Pause and predict: If a complex AI agent pipeline is purely linear, what architectural workarounds would you need to implement to allow an agent to correct a hallucinated answer after reviewing it?

At its core, a graph consists of nodes and edges connecting them.

graph LR
A[Node A] --edge--> B[Node B]
B --edge--> C[Node C]
B --edge--> D[Node D]
FeatureLinear ChainsGraph Workflows
Conditional logicLimitedFull branching
Cycles/loopsNot possibleNative support
State managementPass throughPersistent state
Error recoveryStart overRetry specific nodes
Human-in-the-loopAwkwardNative support
Multi-agentSequential onlyTrue parallelism

LangGraph revolves around a few central primitives: the StateGraph, Nodes, Edges, Reducers, and Checkpointers.

ConceptPurpose
StateGraphContainer for workflow definition
NodesProcessing functions that update state
EdgesDefine flow between nodes
ReducersHow state updates are merged
CheckpointerEnables pause/resume and persistence
InterruptsHuman-in-the-loop integration

The foundation is the StateGraph, which manages the state schema, nodes, and transition logic.

from langgraph.graph import StateGraph, END
# Define the state type
from typing import TypedDict, List, Annotated
import operator
class AgentState(TypedDict):
messages: Annotated[List[str], operator.add] # Accumulates
current_step: str
result: str
# Create the graph
graph = StateGraph(AgentState)

State updates require precise instructions on how new data should merge with existing data.

from typing import Annotated
import operator
class MyState(TypedDict):
# This field gets REPLACED on each update
current_value: str
# This field ACCUMULATES (list concatenation)
history: Annotated[List[str], operator.add]
# This field uses custom reducer
counter: Annotated[int, lambda a, b: a + b]

Stop and think: Why is operator.add crucial for tracking an LLM’s conversation history? What happens to the AI’s context window if a field lacks this annotation?

Nodes receive the current state, perform logic, and return a partial state update. Edges define the conditional or unconditional routes between these nodes.

def analyze_node(state: AgentState) -> dict:
"""Analyze the input and return findings."""
messages = state["messages"]
# Do analysis...
return {
"current_step": "analyze",
"messages": ["Analysis complete: found 3 issues"]
}
# Add node to graph
graph.add_node("analyze", analyze_node)
# Unconditional edge: always go from A to B
graph.add_edge("node_a", "node_b")
# Conditional edge: choose based on state
def should_continue(state: AgentState) -> str:
if state["result"] == "success":
return "finish"
else:
return "retry"
graph.add_conditional_edges(
"check",
should_continue,
{
"finish": "output",
"retry": "process" # Creates a cycle!
}
)

Every graph requires explicit entry and exit points before it can be compiled into an executable application.

from langgraph.graph import START, END
# Set entry point
graph.add_edge(START, "first_node")
# Set exit point (END is a special node)
graph.add_edge("final_node", END)
# Compile the graph
app = graph.compile()
# Run it
result = app.invoke({"messages": ["Hello"], "current_step": "start"})

Section 3: Building Your First LangGraph Workflow

Section titled “Section 3: Building Your First LangGraph Workflow”

Let’s build a practical document processing pipeline. The graph structure will dynamically route documents based on classification.

graph LR
Parse --> Classify --> Route
Route --> ProcessA[Process A] --> Output1[Output]
Route --> ProcessB[Process B] --> Output2[Output]
from typing import TypedDict, List, Annotated, Literal
import operator
class DocumentState(TypedDict):
# The input document
document: str
# Classification result
doc_type: Literal["invoice", "contract", "letter", "unknown"]
# Extracted data (accumulates across nodes)
extracted_data: Annotated[List[dict], operator.add]
# Processing messages
messages: Annotated[List[str], operator.add]
# Final output
output: str
def parse_node(state: DocumentState) -> dict:
"""Parse the raw document."""
doc = state["document"]
# Simulate parsing
return {
"messages": [f"Parsed document: {len(doc)} characters"]
}
def classify_node(state: DocumentState) -> dict:
"""Classify the document type."""
doc = state["document"].lower()
if "invoice" in doc or "amount due" in doc:
doc_type = "invoice"
elif "agreement" in doc or "contract" in doc:
doc_type = "contract"
elif "dear" in doc or "sincerely" in doc:
doc_type = "letter"
else:
doc_type = "unknown"
return {
"doc_type": doc_type,
"messages": [f"Classified as: {doc_type}"]
}
def process_invoice(state: DocumentState) -> dict:
"""Extract invoice-specific data."""
return {
"extracted_data": [{"type": "invoice", "amount": "$1,234.56"}],
"messages": ["Extracted invoice data"]
}
def process_contract(state: DocumentState) -> dict:
"""Extract contract-specific data."""
return {
"extracted_data": [{"type": "contract", "parties": ["A", "B"]}],
"messages": ["Extracted contract data"]
}
def process_generic(state: DocumentState) -> dict:
"""Generic processing for other documents."""
return {
"extracted_data": [{"type": "generic", "summary": "Document processed"}],
"messages": ["Generic processing complete"]
}
def output_node(state: DocumentState) -> dict:
"""Generate final output."""
data = state["extracted_data"]
return {
"output": f"Processed {len(data)} items: {data}",
"messages": ["Output generated"]
}
def route_by_type(state: DocumentState) -> str:
"""Route to appropriate processor based on document type."""
doc_type = state["doc_type"]
routing = {
"invoice": "process_invoice",
"contract": "process_contract",
"letter": "process_generic",
"unknown": "process_generic"
}
return routing.get(doc_type, "process_generic")
from langgraph.graph import StateGraph, START, END
# Create graph
workflow = StateGraph(DocumentState)
# Add nodes
workflow.add_node("parse", parse_node)
workflow.add_node("classify", classify_node)
workflow.add_node("process_invoice", process_invoice)
workflow.add_node("process_contract", process_contract)
workflow.add_node("process_generic", process_generic)
workflow.add_node("output", output_node)
# Add edges
workflow.add_edge(START, "parse")
workflow.add_edge("parse", "classify")
# Conditional routing after classification
workflow.add_conditional_edges(
"classify",
route_by_type,
{
"process_invoice": "process_invoice",
"process_contract": "process_contract",
"process_generic": "process_generic"
}
)
# All processors lead to output
workflow.add_edge("process_invoice", "output")
workflow.add_edge("process_contract", "output")
workflow.add_edge("process_generic", "output")
# Output leads to END
workflow.add_edge("output", END)
# Compile
app = workflow.compile()
# Test with an invoice
result = app.invoke({
"document": "INVOICE #123\nAmount Due: $1,234.56\nDue Date: 2024-01-15",
"extracted_data": [],
"messages": []
})
print("Document type:", result["doc_type"])
print("Messages:", result["messages"])
print("Output:", result["output"])

Cycles are the beating heart of robust AI. Consider a self-correcting writer agent that drafts, reviews, and refines content.

graph LR
Draft --> Review
Review --> Good{Good?}
Good -- Yes --> Publish
Good -- No --> Draft
from typing import TypedDict, List, Annotated
import operator
class WriterState(TypedDict):
topic: str
draft: str
feedback: str
revision_count: int
is_approved: bool
messages: Annotated[List[str], operator.add]
def draft_node(state: WriterState) -> dict:
"""Create or revise the draft."""
topic = state["topic"]
feedback = state.get("feedback", "")
count = state.get("revision_count", 0)
if count == 0:
# Initial draft
draft = f"# {topic}\n\nThis is the initial draft about {topic}."
msg = "Created initial draft"
else:
# Revision based on feedback
draft = f"# {topic}\n\nRevised draft (v{count+1}): Addressed feedback: {feedback}"
msg = f"Revised draft (attempt {count + 1})"
return {
"draft": draft,
"revision_count": count + 1,
"messages": [msg]
}
def review_node(state: WriterState) -> dict:
"""Review the draft and provide feedback."""
draft = state["draft"]
count = state["revision_count"]
# Simulate review (in real app, this would use an LLM)
if count >= 3: # Accept after 3 attempts
return {
"is_approved": True,
"feedback": "Looks good!",
"messages": ["Review passed!"]
}
else:
return {
"is_approved": False,
"feedback": f"Need more detail in section {count}",
"messages": [f"Review failed: needs revision"]
}
def publish_node(state: WriterState) -> dict:
"""Publish the approved draft."""
return {
"messages": [f"Published after {state['revision_count']} revisions!"]
}
def should_continue(state: WriterState) -> str:
"""Decide whether to revise or publish."""
if state["is_approved"]:
return "publish"
else:
return "revise"
# Build the graph
writer = StateGraph(WriterState)
writer.add_node("draft", draft_node)
writer.add_node("review", review_node)
writer.add_node("publish", publish_node)
writer.add_edge(START, "draft")
writer.add_edge("draft", "review")
writer.add_conditional_edges(
"review",
should_continue,
{
"revise": "draft", # CYCLE: go back to draft
"publish": "publish"
}
)
writer.add_edge("publish", END)
app = writer.compile()
# Run it
result = app.invoke({
"topic": "LangGraph Cycles",
"revision_count": 0,
"is_approved": False,
"messages": []
})
print("Final messages:", result["messages"])
# Output shows the progression through multiple revisions

When implementing cycles, missing safety checks will stall your application entirely. Defensively wrap conditions with retry limits:

def should_continue_safe(state: WriterState) -> str:
"""Continue with a maximum retry limit."""
MAX_RETRIES = 5
if state["is_approved"]:
return "publish"
elif state["revision_count"] >= MAX_RETRIES:
return "publish" # Publish anyway after max retries
else:
return "revise"

You should also isolate error-prone operations at the node level to ensure localized recovery:

def safe_node(state: MyState) -> dict:
"""Node with built-in error handling."""
try:
# Risky operation
result = call_external_api(state["input"])
return {"result": result, "error": None}
except Exception as e:
return {"result": None, "error": str(e)}
def route_on_error(state: MyState) -> str:
"""Route based on success/failure."""
if state.get("error"):
return "handle_error"
return "continue"

Which works perfectly in tandem with the explicit Retry pattern:

class RetryState(TypedDict):
input: str
output: str
attempts: int
max_attempts: int
success: bool
def process_with_retry(state: RetryState) -> dict:
"""Process with retry tracking."""
attempts = state.get("attempts", 0) + 1
try:
# Your processing logic
result = risky_operation(state["input"])
return {
"output": result,
"attempts": attempts,
"success": True
}
except Exception as e:
return {
"output": str(e),
"attempts": attempts,
"success": False
}
def should_retry(state: RetryState) -> str:
"""Decide whether to retry."""
if state["success"]:
return "done"
elif state["attempts"] < state["max_attempts"]:
return "retry"
else:
return "failed"

The true power emerges when node operations are powered by LLMs.

from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage
class LLMAgentState(TypedDict):
messages: Annotated[List[dict], operator.add]
task: str
result: str
def create_llm_node(system_prompt: str):
"""Factory function to create LLM nodes with different personas."""
llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp")
def node(state: LLMAgentState) -> dict:
# Build message history
messages = [SystemMessage(content=system_prompt)]
messages.append(HumanMessage(content=state["task"]))
# Call LLM
response = llm.invoke(messages)
return {
"result": response.content,
"messages": [{"role": "assistant", "content": response.content}]
}
return node
# Create specialized nodes
researcher = create_llm_node(
"You are a research assistant. Gather relevant information about the topic."
)
writer = create_llm_node(
"You are a skilled writer. Create engaging content based on the research."
)
editor = create_llm_node(
"You are a strict editor. Review and improve the writing. Be concise."
)

Section 5: Multi-Agent Orchestration & Parallelism

Section titled “Section 5: Multi-Agent Orchestration & Parallelism”

When coordinating numerous specialized agents, the “Supervisor Pattern” is indispensable. A central node dictates routing while worker nodes handle isolated execution.

graph TD
Supervisor[Supervisor]
Supervisor --> Researcher
Supervisor --> Analyst
Supervisor --> Writer
Researcher --> Supervisor
Analyst --> Supervisor
Writer --> Supervisor
Supervisor --> FinalOutput[Final Output]
from typing import Literal
class TeamState(TypedDict):
task: str
research: str
analysis: str
draft: str
current_agent: str
next_agent: Literal["researcher", "analyst", "writer", "done"]
messages: Annotated[List[str], operator.add]
def supervisor_node(state: TeamState) -> dict:
"""Decide which agent should work next."""
if not state.get("research"):
return {"next_agent": "researcher", "messages": ["Assigning to researcher"]}
elif not state.get("analysis"):
return {"next_agent": "analyst", "messages": ["Assigning to analyst"]}
elif not state.get("draft"):
return {"next_agent": "writer", "messages": ["Assigning to writer"]}
else:
return {"next_agent": "done", "messages": ["All work complete!"]}
def researcher_node(state: TeamState) -> dict:
"""Research agent gathers information."""
task = state["task"]
# In real app, use LLM + tools
return {
"research": f"Research findings for: {task}",
"current_agent": "researcher",
"messages": ["Research complete"]
}
def analyst_node(state: TeamState) -> dict:
"""Analyst processes research."""
research = state["research"]
return {
"analysis": f"Analysis of: {research}",
"current_agent": "analyst",
"messages": ["Analysis complete"]
}
def writer_node(state: TeamState) -> dict:
"""Writer creates final content."""
analysis = state["analysis"]
return {
"draft": f"Final draft based on: {analysis}",
"current_agent": "writer",
"messages": ["Draft complete"]
}
def route_to_agent(state: TeamState) -> str:
"""Route to the next agent."""
return state["next_agent"]
# Build the multi-agent graph
team = StateGraph(TeamState)
team.add_node("supervisor", supervisor_node)
team.add_node("researcher", researcher_node)
team.add_node("analyst", analyst_node)
team.add_node("writer", writer_node)
team.add_edge(START, "supervisor")
team.add_conditional_edges(
"supervisor",
route_to_agent,
{
"researcher": "researcher",
"analyst": "analyst",
"writer": "writer",
"done": END
}
)
# Each agent reports back to supervisor
team.add_edge("researcher", "supervisor")
team.add_edge("analyst", "supervisor")
team.add_edge("writer", "supervisor")
app = team.compile()

Tasks devoid of interdependencies should run simultaneously.

from langgraph.graph import StateGraph
class ParallelState(TypedDict):
input: str
result_a: str
result_b: str
result_c: str
final: str
# Three independent processors
def process_a(state): return {"result_a": f"A: {state['input']}"}
def process_b(state): return {"result_b": f"B: {state['input']}"}
def process_c(state): return {"result_c": f"C: {state['input']}"}
def combine(state):
return {"final": f"{state['result_a']} + {state['result_b']} + {state['result_c']}"}
graph = StateGraph(ParallelState)
graph.add_node("a", process_a)
graph.add_node("b", process_b)
graph.add_node("c", process_c)
graph.add_node("combine", combine)
# Fan out from START to parallel nodes
graph.add_edge(START, "a")
graph.add_edge(START, "b")
graph.add_edge(START, "c")
# Fan in to combine
graph.add_edge("a", "combine")
graph.add_edge("b", "combine")
graph.add_edge("c", "combine")
graph.add_edge("combine", END)
app = graph.compile()
# LangGraph will execute a, b, c in parallel!

Section 6: Persistence, Subgraphs, and the Human-in-the-Loop

Section titled “Section 6: Persistence, Subgraphs, and the Human-in-the-Loop”

For high-stakes tasks, integrating human oversight is paramount. Interrupt mechanisms temporarily suspend workflows.

from langgraph.checkpoint.memory import MemorySaver
class ApprovalState(TypedDict):
proposal: str
approved: bool
feedback: str
messages: Annotated[List[str], operator.add]
def create_proposal(state: ApprovalState) -> dict:
return {
"proposal": "I propose we invest $100K in AI infrastructure",
"messages": ["Proposal created"]
}
def await_approval(state: ApprovalState) -> dict:
"""This node will be interrupted for human input."""
# The interrupt happens here - human provides approved/feedback
return {
"messages": ["Awaiting approval..."]
}
def execute_proposal(state: ApprovalState) -> dict:
if state["approved"]:
return {"messages": ["Proposal executed!"]}
else:
return {"messages": [f"Proposal rejected: {state['feedback']}"]}
# Build with checkpointing
workflow = StateGraph(ApprovalState)
workflow.add_node("propose", create_proposal)
workflow.add_node("await", await_approval)
workflow.add_node("execute", execute_proposal)
workflow.add_edge(START, "propose")
workflow.add_edge("propose", "await")
workflow.add_edge("await", "execute")
workflow.add_edge("execute", END)
# Compile with checkpointer for interrupts
checkpointer = MemorySaver()
app = workflow.compile(
checkpointer=checkpointer,
interrupt_before=["execute"] # Interrupt before execution
)

Resuming the workflow simply requires feeding the human-amended state back in:

# Start the workflow
config = {"configurable": {"thread_id": "proposal-1"}}
# Run until interrupt
result = app.invoke({"approved": False, "messages": []}, config)
print("Paused at:", result)
# Human reviews and provides approval
# Update state with human input
app.update_state(
config,
{"approved": True, "feedback": "Looks good!"}
)
# Continue execution
final = app.invoke(None, config) # None continues from checkpoint
print("Final:", final)

Checkpointers are essential for crash-proofing applications. Memory is fine for local testing, but you’ll need robust DB backends in production.

from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
app = graph.compile(checkpointer=checkpointer)
# Each run is identified by thread_id
config = {"configurable": {"thread_id": "my-session-1"}}
result = app.invoke(input_state, config)
# Get history
history = list(app.get_state_history(config))
for state in history:
print(f"Step: {state.metadata}")
from langgraph.checkpoint.sqlite import SqliteSaver
# Persistent storage
checkpointer = SqliteSaver.from_conn_string("workflows.db")
app = graph.compile(checkpointer=checkpointer)
# Workflows survive restarts!
from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string(
"postgresql://user:pass@host:5432/db"
)
app = graph.compile(checkpointer=checkpointer)

LangGraph supports granular streaming. You can emit entire state updates or individual tokens generated directly by the LLM inside nodes.

# Stream all events
for event in app.stream(input_state, config, stream_mode="values"):
print(f"State update: {event}")
# Stream specific node outputs
for event in app.stream(input_state, config, stream_mode="updates"):
print(f"Node output: {event}")
# Stream tokens from LLM calls within nodes
async for event in app.astream_events(input_state, config, version="v2"):
if event["event"] == "on_llm_stream":
print(event["data"]["chunk"].content, end="", flush=True)

Encapsulating complexity makes architectures reusable. Any compiled graph can become a node in a broader parent graph.

# Create a reusable subgraph
research_graph = StateGraph(ResearchState)
research_graph.add_node("search", search_node)
research_graph.add_node("summarize", summarize_node)
research_graph.add_edge(START, "search")
research_graph.add_edge("search", "summarize")
research_graph.add_edge("summarize", END)
research_subgraph = research_graph.compile()
# Use in parent graph
main_graph = StateGraph(MainState)
main_graph.add_node("research", research_subgraph) # Subgraph as node!
main_graph.add_node("write", write_node)
main_graph.add_edge(START, "research")
main_graph.add_edge("research", "write")
main_graph.add_edge("write", END)

Here are battle-tested topologies to memorize for high-performance agentic systems.

Pattern 1: Research-Write-Review Cycle

graph LR
Research --> Write --> Review --> Approved{Approved?}
Approved -- No --> Write

Pattern 2: Multi-Stage Validation

graph TD
Input --> ValFormat[Validate Format]
ValFormat --> ValContent[Validate Content]
ValContent --> ValPolicy[Validate Policy]
ValPolicy --> Process
ValFormat --> Reject1[Reject]
ValContent --> Reject2[Reject]
ValPolicy --> Reject3[Reject]

Pattern 3: Hierarchical Agents

graph TD
Manager --> LeadA[Team Lead A]
Manager --> LeadB[Team Lead B]
Manager --> LeadC[Team Lead C]
LeadA --> Worker1[Worker 1]
LeadB --> Worker2[Worker 2]
LeadC --> Worker3[Worker 3]

Pattern 4: MapReduce for Parallel Processing

graph TD
Splitter --> Proc1[Process 1]
Splitter --> Proc2[Process 2]
Splitter --> Proc3[Process 3]
Proc1 --> Reducer
Proc2 --> Reducer
Proc3 --> Reducer

Why invest in the steeper learning curve of LangGraph over rudimentary Airflow scripts? The economic value speaks for itself.

Without LangGraph (ad-hoc solution):
- Development time: 3 months (senior engineer)
- Crash-related rework: 15% of runs need manual intervention
- Human review integration: Additional 2 weeks
- Debugging production issues: 10 hours/week
With LangGraph:
- Development time: 1 month (same engineer)
- Crash-related rework: <1% (auto-resume handles most)
- Human review: Built-in interrupts
- Debugging: State replay eliminates most investigation
Annual savings for a team running 10K workflows/month:
- Engineering time: ~$100K
- Operational overhead: ~$50K
- Reduced failures: ~$30K (API costs from restarts)
Total: ~$180K/year
ComponentWithout LangGraphWith LangGraph
State ManagementOften requires substantial custom codeBuilt-in
Crash RecoveryManual replay (lost time + tokens)Automatic resume
Human ReviewCustom toolingNative interrupts
DebuggingLog analysisState replay
CompanyBefore LangGraphAfter LangGraphReduction
Example teamthousands of linesa few hundred linessubstantial
Example teamthousands of linesa few hundred linessubstantial
Example teamthousands of linesa few hundred linessubstantial
FeatureAirflow/PrefectLangGraph
Primary useData pipelinesAI workflows
Node typesPython functionsLLM-aware functions
State managementExternalBuilt-in with reducers
StreamingSupport varies by platform and patternFirst-class support
Human-in-loopOften requires extra orchestrationNative interrupt/resume
LLM integrationDIYNative with LangChain
MistakeWhy it happensFix / Remedy
Forgetting State AnnotationsDefining messages: List[str] replaces the list on every node execution.Use Annotated[List[str], operator.add] for accumulation.
Infinite LoopsConditional edges lack fallback limits, trapping agents in endless validation.Track a retry_count in state and exit forcefully when it exceeds a threshold.
Missing Routing Edge CasesConditional edge fails to account for a string variant, raising a KeyError.Usually return a fallback/default route in routing functions for unexpected cases.
Stateful NodesRelying on global Python variables (global count) inside nodes corrupts concurrent runs.Enforce pure functions. Move all tracking to the TypedDict graph state.
Bloated State ObjectsPushing massive 10MB payloads directly into state kills memory during checkpoints.Persist references to external storage (document_url: str) instead of raw data.
Missing Graph CompilationYou must compile the graph before invoking it.Always instantiate via app = graph.compile() before invoking.
Unpersisted ThreadsPausing for human-in-the-loop without attaching a checkpointer wipes the session.Define checkpointer=MemorySaver() (or Postgres) during .compile().
# BAD: Lists get replaced, not accumulated
class BadState(TypedDict):
messages: List[str] # Each node replaces the list!
# GOOD: Use annotation for accumulation
class GoodState(TypedDict):
messages: Annotated[List[str], operator.add]
# BAD: No exit condition
graph.add_conditional_edges("check", should_continue, {
"retry": "process",
"done": "output"
})
# If should_continue always returns "retry", infinite loop!
# GOOD: Add max retries
def should_continue_safe(state):
if state["attempts"] >= MAX_RETRIES:
return "done" # Force exit
return "retry" if not state["success"] else "done"
# BAD: Missing route
def route(state):
if condition_a:
return "a"
elif condition_b:
return "b"
# What if neither? KeyError!
# GOOD: Always have a default
def route(state):
if condition_a:
return "a"
elif condition_b:
return "b"
else:
return "default"
# BAD: Node maintains internal state
counter = 0
def bad_node(state):
global counter
counter += 1 # This persists across invocations!
return {"count": counter}
# GOOD: All state in the graph state
def good_node(state):
count = state.get("count", 0) + 1
return {"count": count}
# BAD: Storing large data in state
class BadState(TypedDict):
full_document: str # 10MB document in every state snapshot!
# GOOD: Store references, load when needed
class GoodState(TypedDict):
document_id: str # Reference to external storage
# Think about:
# - What needs to persist across nodes?
# - What should accumulate vs replace?
# - What's the minimal state needed?
class WellDesignedState(TypedDict):
# Core data
input: str
output: str
# Progress tracking (accumulates)
steps_completed: Annotated[List[str], operator.add]
# Metadata (replaces)
current_phase: str
last_error: Optional[str]
# Node should be deterministic given same state
def pure_node(state: MyState) -> dict:
# Only use state input
# No external side effects
# Return consistent output
return {"result": process(state["input"])}
# Create reusable components
validation_subgraph = create_validation_graph()
processing_subgraph = create_processing_graph()
# Compose in main graph
main_graph.add_node("validate", validation_subgraph)
main_graph.add_node("process", processing_subgraph)
# Prevent runaway cycles
config = {
"configurable": {"thread_id": "x"},
"recursion_limit": 50 # Max steps
}
result = app.invoke(state, config)
# Visualize your graph to catch issues
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))

Hands-On Exercise: Building a Multi-Agent Analyst

Section titled “Hands-On Exercise: Building a Multi-Agent Analyst”

Objective: Implement a supervisor-managed research system that handles topic searches, evaluates depth, and loops if criteria aren’t met.

Step 1: Environment Setup Create a new directory and virtual environment:

Terminal window
mkdir langgraph-lab && cd langgraph-lab
python3 -m venv venv
source venv/bin/activate
pip install langgraph langchain-google-genai typing-extensions

Step 2: Define Your TypedDict State Create a python script agent.py. Add a state dictionary that tracks the requested topic, a research_log (using operator.add to accumulate discoveries), and a loop_count integer to cap search attempts.

Step 3: Construct Worker Nodes Implement two Python functions:

  1. researcher_node: Appends a mocked observation to research_log.
  2. reviewer_node: Evaluates if the research_log has 2 or more entries. If so, updates a boolean is_comprehensive flag to True.

Step 4: Implement Routing Write a routing function taking the current state. If loop_count > 3, return "max_reached". If is_comprehensive is True, return "done". Otherwise, return "research".

Step 5: Graph Compilation & Execution Instantiate a StateGraph, link the nodes, setup the conditional edge pointing out of the reviewer node, and execute .invoke().

View the Full Solution Code
import operator
from typing import TypedDict, List, Annotated
from langgraph.graph import StateGraph, START, END
# Step 2: State Definition
class ResearchState(TypedDict):
topic: str
research_log: Annotated[List[str], operator.add]
loop_count: int
is_comprehensive: bool
# Step 3: Node Implementation
def researcher_node(state: ResearchState) -> dict:
current_count = state.get("loop_count", 0)
return {
"research_log": [f"Found insight part {current_count + 1}."],
"loop_count": current_count + 1
}
def reviewer_node(state: ResearchState) -> dict:
logs = state.get("research_log", [])
return {
"is_comprehensive": len(logs) >= 2
}
# Step 4: Routing Logic
def routing_logic(state: ResearchState) -> str:
if state.get("loop_count", 0) > 3:
return "max_reached"
if state.get("is_comprehensive"):
return "done"
return "research"
# Step 5: Compilation
workflow = StateGraph(ResearchState)
workflow.add_node("researcher", researcher_node)
workflow.add_node("reviewer", reviewer_node)
workflow.add_edge(START, "researcher")
workflow.add_edge("researcher", "reviewer")
workflow.add_conditional_edges(
"reviewer",
routing_logic,
{
"done": END,
"max_reached": END,
"research": "researcher"
}
)
app = workflow.compile()
output = app.invoke({"topic": "Quantum Computing", "research_log": [], "loop_count": 0})
print("Final State:", output)
  • Nodes accurately append to the research_log.
  • Execution completes gracefully after hitting the minimum log requirement.
  • Changing the reviewer_node logic to require 5 items effectively triggers the max loop cutoff.
1. A developer notices their LangGraph workflow is replacing the message history instead of appending new messages. What is the architectural flaw? The developer likely assigned `messages: List[str]` in their `TypedDict` instead of applying LangGraph's accumulation annotation. Without `Annotated[List[str], operator.add]`, LangGraph falls back to its default reducer behavior. This default behavior overwrites the existing state key entirely with the newly returned dictionary value. To resolve this, the state definition must explicitly instruct the framework to merge list contents.
2. You need to implement a fallback mechanism where a failed API call is retried up to three times before halting. How does a graph accomplish this? You must model the error and the retry logic explicitly as state data properties rather than relying on standard try-catch blocks alone. By incorporating an `attempts: int` counter and `success: bool` flag into your State, you gain granular control over execution flow. You can then write a routing function that inspects the outcome and iterates the cycle until `attempts >= 3`. At that point, the function must forcefully return a fallback edge to break the cycle.
3. A team is designing a system where an AI agent proposes a trade, but a human must approve it before execution. How should this be implemented natively in LangGraph? The framework handles this effortlessly using the specialized checkpointer interrupt feature. The team should initialize their orchestrator by calling `app.compile(checkpointer=MemorySaver(), interrupt_before=["execute"])`. This halts execution immediately before the trade execution node runs, preserving the current state safely in memory. Once human feedback is secured, it is injected back via `.update_state()`, allowing the system to proceed seamlessly.
4. An architect wants three distinct specialized agents to analyze a document simultaneously and then combine their insights. Which graph configuration achieves this? The architect should fan-out the execution by connecting the START node directly to all three worker nodes simultaneously via unconditional edges. Next, all three worker nodes must direct their downstream edges into a single downstream combination node. Because LangGraph automatically runs independent nodes in parallel, it naturally processes the three tasks concurrently. This approach eliminates latency bottlenecks before finally synchronizing the distinct outputs.
5. A graph enters an infinite loop between a "review" node and a "draft" node. How can this architectural flaw be mitigated safely? A runaway cycle typically happens when a conditional edge lacks an explicit architectural cap or fallback route. To mitigate this safely, introduce a `recursion_limit` directly in the execution configuration object. Additionally, you should track the total revision count inside the node state. This allows your routing logic to force an exit or transition to a generic "publish" edge once that count hits a sensible upper limit.
6. During a high-traffic period, your container crashes halfway through a multi-agent debate. Without starting over, how can LangGraph resume from the exact point of failure? LangGraph achieves system resiliency using persistent Checkpointers backed by external databases. If the graph was compiled with a `SqliteSaver` or `PostgresSaver`, the orchestrator simply reinvokes the workflow targeting the exact `thread_id` contained within the configuration object. The application then reconstructs the last fully processed state segment automatically. From there, it picks up execution at the subsequent unexecuted node without dropping any prior context.

Module 18 Complete! Progress: 21/56 modules (38%)

Next: Module 1.5 - Building AI Agents — Move from framework components to full agent architectures, orchestration patterns, and production-ready design tradeoffs.