LlamaIndex
Цей контент ще не доступний вашою мовою.
Or: When Your AI Needs to Remember What Happened
Section titled “Or: When Your AI Needs to Remember What Happened”What You’ll Be Able to Do
Section titled “What You’ll Be Able to Do”- Design stateful AI workflows using LangGraph’s StateGraph and conditional routing.
- Implement cyclic execution paths to enable self-correcting agent behaviors.
- Diagnose infinite loops and state mutation errors in complex multi-agent architectures.
- Evaluate the trade-offs between linear chains and graph-based workflow orchestration.
Why This Module Matters
Section titled “Why This Module Matters”In a long-running AI workflow, a session can fail late in the process when a downstream dependency times out, even after substantial work has already been completed.
He refreshed the dashboard. Everything was gone. All 52 documents, all verification results, all compliance checks—the entire session had vanished. The customer would have to start over from scratch. The financial impact can be immediate: failed long-running sessions often increase abandonment and lost revenue.
Without persisted state, a workflow failure can wipe in-memory progress; with checkpointing, the system can resume from the most recent saved step instead of restarting from scratch. This module will teach you how to build exactly this kind of resilient, stateful AI architecture.
Did You Know?
Section titled “Did You Know?”Section 1: The Graph Mental Model and Linear Limitations
Section titled “Section 1: The Graph Mental Model and Linear Limitations”In previous modules, you mastered Chain-of-Thought and ReAct patterns. But real production workflows require more than a single reasoning trace. They need to remember state across many steps, branch conditionally, loop back if something fails, and coordinate multiple agents.
Think of standard linear chains like a highway with no exits. Once you start, you can only go forward. If you miss something, you have to drive to the end and start over from the beginning.
graph TD Input --> Chain1 Chain1 --> Chain2 Chain2 --> Chain3 Chain3 --> Output Chain3 -.-> ConditionBranch ConditionBranch --> PathA[Path A] ConditionBranch --> PathB[Path B]Real-world workflows, however, look more like a city street grid. You can loop back, take detours, or recover from wrong turns without restarting your journey. LangGraph allows you to construct these cyclic execution paths naturally.
graph TD Research --> Analyze Analyze --> Check Check -- not good enough --> Research Check --> Write Write --> Review Review -- revisions needed --> Write Review --> PublishPause and predict: If a complex AI agent pipeline is purely linear, what architectural workarounds would you need to implement to allow an agent to correct a hallucinated answer after reviewing it?
Why Graphs for AI Workflows?
Section titled “Why Graphs for AI Workflows?”At its core, a graph consists of nodes and edges connecting them.
graph LR A[Node A] --edge--> B[Node B] B --edge--> C[Node C] B --edge--> D[Node D]| Feature | Linear Chains | Graph Workflows |
|---|---|---|
| Conditional logic | Limited | Full branching |
| Cycles/loops | Not possible | Native support |
| State management | Pass through | Persistent state |
| Error recovery | Start over | Retry specific nodes |
| Human-in-the-loop | Awkward | Native support |
| Multi-agent | Sequential only | True parallelism |
Section 2: LangGraph Core Concepts
Section titled “Section 2: LangGraph Core Concepts”LangGraph revolves around a few central primitives: the StateGraph, Nodes, Edges, Reducers, and Checkpointers.
| Concept | Purpose |
|---|---|
| StateGraph | Container for workflow definition |
| Nodes | Processing functions that update state |
| Edges | Define flow between nodes |
| Reducers | How state updates are merged |
| Checkpointer | Enables pause/resume and persistence |
| Interrupts | Human-in-the-loop integration |
1. StateGraph
Section titled “1. StateGraph”The foundation is the StateGraph, which manages the state schema, nodes, and transition logic.
from langgraph.graph import StateGraph, END
# Define the state typefrom typing import TypedDict, List, Annotatedimport operator
class AgentState(TypedDict): messages: Annotated[List[str], operator.add] # Accumulates current_step: str result: str
# Create the graphgraph = StateGraph(AgentState)2. State Annotations
Section titled “2. State Annotations”State updates require precise instructions on how new data should merge with existing data.
from typing import Annotatedimport operator
class MyState(TypedDict): # This field gets REPLACED on each update current_value: str
# This field ACCUMULATES (list concatenation) history: Annotated[List[str], operator.add]
# This field uses custom reducer counter: Annotated[int, lambda a, b: a + b]Stop and think: Why is
operator.addcrucial for tracking an LLM’s conversation history? What happens to the AI’s context window if a field lacks this annotation?
3. Nodes and Edges
Section titled “3. Nodes and Edges”Nodes receive the current state, perform logic, and return a partial state update. Edges define the conditional or unconditional routes between these nodes.
def analyze_node(state: AgentState) -> dict: """Analyze the input and return findings.""" messages = state["messages"] # Do analysis... return { "current_step": "analyze", "messages": ["Analysis complete: found 3 issues"] }
# Add node to graphgraph.add_node("analyze", analyze_node)# Unconditional edge: always go from A to Bgraph.add_edge("node_a", "node_b")
# Conditional edge: choose based on statedef should_continue(state: AgentState) -> str: if state["result"] == "success": return "finish" else: return "retry"
graph.add_conditional_edges( "check", should_continue, { "finish": "output", "retry": "process" # Creates a cycle! })4. Compilation
Section titled “4. Compilation”Every graph requires explicit entry and exit points before it can be compiled into an executable application.
from langgraph.graph import START, END
# Set entry pointgraph.add_edge(START, "first_node")
# Set exit point (END is a special node)graph.add_edge("final_node", END)# Compile the graphapp = graph.compile()
# Run itresult = app.invoke({"messages": ["Hello"], "current_step": "start"})Section 3: Building Your First LangGraph Workflow
Section titled “Section 3: Building Your First LangGraph Workflow”Let’s build a practical document processing pipeline. The graph structure will dynamically route documents based on classification.
graph LR Parse --> Classify --> Route Route --> ProcessA[Process A] --> Output1[Output] Route --> ProcessB[Process B] --> Output2[Output]Step 1: Define State
Section titled “Step 1: Define State”from typing import TypedDict, List, Annotated, Literalimport operator
class DocumentState(TypedDict): # The input document document: str
# Classification result doc_type: Literal["invoice", "contract", "letter", "unknown"]
# Extracted data (accumulates across nodes) extracted_data: Annotated[List[dict], operator.add]
# Processing messages messages: Annotated[List[str], operator.add]
# Final output output: strStep 2: Define Nodes
Section titled “Step 2: Define Nodes”def parse_node(state: DocumentState) -> dict: """Parse the raw document.""" doc = state["document"] # Simulate parsing return { "messages": [f"Parsed document: {len(doc)} characters"] }
def classify_node(state: DocumentState) -> dict: """Classify the document type.""" doc = state["document"].lower()
if "invoice" in doc or "amount due" in doc: doc_type = "invoice" elif "agreement" in doc or "contract" in doc: doc_type = "contract" elif "dear" in doc or "sincerely" in doc: doc_type = "letter" else: doc_type = "unknown"
return { "doc_type": doc_type, "messages": [f"Classified as: {doc_type}"] }
def process_invoice(state: DocumentState) -> dict: """Extract invoice-specific data.""" return { "extracted_data": [{"type": "invoice", "amount": "$1,234.56"}], "messages": ["Extracted invoice data"] }
def process_contract(state: DocumentState) -> dict: """Extract contract-specific data.""" return { "extracted_data": [{"type": "contract", "parties": ["A", "B"]}], "messages": ["Extracted contract data"] }
def process_generic(state: DocumentState) -> dict: """Generic processing for other documents.""" return { "extracted_data": [{"type": "generic", "summary": "Document processed"}], "messages": ["Generic processing complete"] }
def output_node(state: DocumentState) -> dict: """Generate final output.""" data = state["extracted_data"] return { "output": f"Processed {len(data)} items: {data}", "messages": ["Output generated"] }Step 3: Define Routing Logic
Section titled “Step 3: Define Routing Logic”def route_by_type(state: DocumentState) -> str: """Route to appropriate processor based on document type.""" doc_type = state["doc_type"]
routing = { "invoice": "process_invoice", "contract": "process_contract", "letter": "process_generic", "unknown": "process_generic" }
return routing.get(doc_type, "process_generic")Step 4 & 5: Build and Run the Graph
Section titled “Step 4 & 5: Build and Run the Graph”from langgraph.graph import StateGraph, START, END
# Create graphworkflow = StateGraph(DocumentState)
# Add nodesworkflow.add_node("parse", parse_node)workflow.add_node("classify", classify_node)workflow.add_node("process_invoice", process_invoice)workflow.add_node("process_contract", process_contract)workflow.add_node("process_generic", process_generic)workflow.add_node("output", output_node)
# Add edgesworkflow.add_edge(START, "parse")workflow.add_edge("parse", "classify")
# Conditional routing after classificationworkflow.add_conditional_edges( "classify", route_by_type, { "process_invoice": "process_invoice", "process_contract": "process_contract", "process_generic": "process_generic" })
# All processors lead to outputworkflow.add_edge("process_invoice", "output")workflow.add_edge("process_contract", "output")workflow.add_edge("process_generic", "output")
# Output leads to ENDworkflow.add_edge("output", END)
# Compileapp = workflow.compile()# Test with an invoiceresult = app.invoke({ "document": "INVOICE #123\nAmount Due: $1,234.56\nDue Date: 2024-01-15", "extracted_data": [], "messages": []})
print("Document type:", result["doc_type"])print("Messages:", result["messages"])print("Output:", result["output"])Section 4: Cycles and Error Handling
Section titled “Section 4: Cycles and Error Handling”Cycles are the beating heart of robust AI. Consider a self-correcting writer agent that drafts, reviews, and refines content.
graph LR Draft --> Review Review --> Good{Good?} Good -- Yes --> Publish Good -- No --> Draftfrom typing import TypedDict, List, Annotatedimport operator
class WriterState(TypedDict): topic: str draft: str feedback: str revision_count: int is_approved: bool messages: Annotated[List[str], operator.add]
def draft_node(state: WriterState) -> dict: """Create or revise the draft.""" topic = state["topic"] feedback = state.get("feedback", "") count = state.get("revision_count", 0)
if count == 0: # Initial draft draft = f"# {topic}\n\nThis is the initial draft about {topic}." msg = "Created initial draft" else: # Revision based on feedback draft = f"# {topic}\n\nRevised draft (v{count+1}): Addressed feedback: {feedback}" msg = f"Revised draft (attempt {count + 1})"
return { "draft": draft, "revision_count": count + 1, "messages": [msg] }
def review_node(state: WriterState) -> dict: """Review the draft and provide feedback.""" draft = state["draft"] count = state["revision_count"]
# Simulate review (in real app, this would use an LLM) if count >= 3: # Accept after 3 attempts return { "is_approved": True, "feedback": "Looks good!", "messages": ["Review passed!"] } else: return { "is_approved": False, "feedback": f"Need more detail in section {count}", "messages": [f"Review failed: needs revision"] }
def publish_node(state: WriterState) -> dict: """Publish the approved draft.""" return { "messages": [f"Published after {state['revision_count']} revisions!"] }
def should_continue(state: WriterState) -> str: """Decide whether to revise or publish.""" if state["is_approved"]: return "publish" else: return "revise"
# Build the graphwriter = StateGraph(WriterState)
writer.add_node("draft", draft_node)writer.add_node("review", review_node)writer.add_node("publish", publish_node)
writer.add_edge(START, "draft")writer.add_edge("draft", "review")
writer.add_conditional_edges( "review", should_continue, { "revise": "draft", # CYCLE: go back to draft "publish": "publish" })
writer.add_edge("publish", END)
app = writer.compile()
# Run itresult = app.invoke({ "topic": "LangGraph Cycles", "revision_count": 0, "is_approved": False, "messages": []})
print("Final messages:", result["messages"])# Output shows the progression through multiple revisionsSafety First: Preventing Infinite Loops
Section titled “Safety First: Preventing Infinite Loops”When implementing cycles, missing safety checks will stall your application entirely. Defensively wrap conditions with retry limits:
def should_continue_safe(state: WriterState) -> str: """Continue with a maximum retry limit.""" MAX_RETRIES = 5
if state["is_approved"]: return "publish" elif state["revision_count"] >= MAX_RETRIES: return "publish" # Publish anyway after max retries else: return "revise"You should also isolate error-prone operations at the node level to ensure localized recovery:
def safe_node(state: MyState) -> dict: """Node with built-in error handling.""" try: # Risky operation result = call_external_api(state["input"]) return {"result": result, "error": None} except Exception as e: return {"result": None, "error": str(e)}
def route_on_error(state: MyState) -> str: """Route based on success/failure.""" if state.get("error"): return "handle_error" return "continue"Which works perfectly in tandem with the explicit Retry pattern:
class RetryState(TypedDict): input: str output: str attempts: int max_attempts: int success: bool
def process_with_retry(state: RetryState) -> dict: """Process with retry tracking.""" attempts = state.get("attempts", 0) + 1
try: # Your processing logic result = risky_operation(state["input"]) return { "output": result, "attempts": attempts, "success": True } except Exception as e: return { "output": str(e), "attempts": attempts, "success": False }
def should_retry(state: RetryState) -> str: """Decide whether to retry.""" if state["success"]: return "done" elif state["attempts"] < state["max_attempts"]: return "retry" else: return "failed"Integrating LLMs
Section titled “Integrating LLMs”The true power emerges when node operations are powered by LLMs.
from langchain_google_genai import ChatGoogleGenerativeAIfrom langchain_core.messages import HumanMessage, AIMessage, SystemMessage
class LLMAgentState(TypedDict): messages: Annotated[List[dict], operator.add] task: str result: str
def create_llm_node(system_prompt: str): """Factory function to create LLM nodes with different personas.""" llm = ChatGoogleGenerativeAI(model="gemini-2.0-flash-exp")
def node(state: LLMAgentState) -> dict: # Build message history messages = [SystemMessage(content=system_prompt)] messages.append(HumanMessage(content=state["task"]))
# Call LLM response = llm.invoke(messages)
return { "result": response.content, "messages": [{"role": "assistant", "content": response.content}] }
return node
# Create specialized nodesresearcher = create_llm_node( "You are a research assistant. Gather relevant information about the topic.")
writer = create_llm_node( "You are a skilled writer. Create engaging content based on the research.")
editor = create_llm_node( "You are a strict editor. Review and improve the writing. Be concise.")Section 5: Multi-Agent Orchestration & Parallelism
Section titled “Section 5: Multi-Agent Orchestration & Parallelism”When coordinating numerous specialized agents, the “Supervisor Pattern” is indispensable. A central node dictates routing while worker nodes handle isolated execution.
graph TD Supervisor[Supervisor] Supervisor --> Researcher Supervisor --> Analyst Supervisor --> Writer Researcher --> Supervisor Analyst --> Supervisor Writer --> Supervisor Supervisor --> FinalOutput[Final Output]from typing import Literal
class TeamState(TypedDict): task: str research: str analysis: str draft: str current_agent: str next_agent: Literal["researcher", "analyst", "writer", "done"] messages: Annotated[List[str], operator.add]
def supervisor_node(state: TeamState) -> dict: """Decide which agent should work next.""" if not state.get("research"): return {"next_agent": "researcher", "messages": ["Assigning to researcher"]} elif not state.get("analysis"): return {"next_agent": "analyst", "messages": ["Assigning to analyst"]} elif not state.get("draft"): return {"next_agent": "writer", "messages": ["Assigning to writer"]} else: return {"next_agent": "done", "messages": ["All work complete!"]}
def researcher_node(state: TeamState) -> dict: """Research agent gathers information.""" task = state["task"] # In real app, use LLM + tools return { "research": f"Research findings for: {task}", "current_agent": "researcher", "messages": ["Research complete"] }
def analyst_node(state: TeamState) -> dict: """Analyst processes research.""" research = state["research"] return { "analysis": f"Analysis of: {research}", "current_agent": "analyst", "messages": ["Analysis complete"] }
def writer_node(state: TeamState) -> dict: """Writer creates final content.""" analysis = state["analysis"] return { "draft": f"Final draft based on: {analysis}", "current_agent": "writer", "messages": ["Draft complete"] }
def route_to_agent(state: TeamState) -> str: """Route to the next agent.""" return state["next_agent"]
# Build the multi-agent graphteam = StateGraph(TeamState)
team.add_node("supervisor", supervisor_node)team.add_node("researcher", researcher_node)team.add_node("analyst", analyst_node)team.add_node("writer", writer_node)
team.add_edge(START, "supervisor")
team.add_conditional_edges( "supervisor", route_to_agent, { "researcher": "researcher", "analyst": "analyst", "writer": "writer", "done": END })
# Each agent reports back to supervisorteam.add_edge("researcher", "supervisor")team.add_edge("analyst", "supervisor")team.add_edge("writer", "supervisor")
app = team.compile()Fan-Out / Fan-In Parallelism
Section titled “Fan-Out / Fan-In Parallelism”Tasks devoid of interdependencies should run simultaneously.
from langgraph.graph import StateGraph
class ParallelState(TypedDict): input: str result_a: str result_b: str result_c: str final: str
# Three independent processorsdef process_a(state): return {"result_a": f"A: {state['input']}"}def process_b(state): return {"result_b": f"B: {state['input']}"}def process_c(state): return {"result_c": f"C: {state['input']}"}
def combine(state): return {"final": f"{state['result_a']} + {state['result_b']} + {state['result_c']}"}
graph = StateGraph(ParallelState)
graph.add_node("a", process_a)graph.add_node("b", process_b)graph.add_node("c", process_c)graph.add_node("combine", combine)
# Fan out from START to parallel nodesgraph.add_edge(START, "a")graph.add_edge(START, "b")graph.add_edge(START, "c")
# Fan in to combinegraph.add_edge("a", "combine")graph.add_edge("b", "combine")graph.add_edge("c", "combine")
graph.add_edge("combine", END)
app = graph.compile()# LangGraph will execute a, b, c in parallel!Section 6: Persistence, Subgraphs, and the Human-in-the-Loop
Section titled “Section 6: Persistence, Subgraphs, and the Human-in-the-Loop”For high-stakes tasks, integrating human oversight is paramount. Interrupt mechanisms temporarily suspend workflows.
from langgraph.checkpoint.memory import MemorySaver
class ApprovalState(TypedDict): proposal: str approved: bool feedback: str messages: Annotated[List[str], operator.add]
def create_proposal(state: ApprovalState) -> dict: return { "proposal": "I propose we invest $100K in AI infrastructure", "messages": ["Proposal created"] }
def await_approval(state: ApprovalState) -> dict: """This node will be interrupted for human input.""" # The interrupt happens here - human provides approved/feedback return { "messages": ["Awaiting approval..."] }
def execute_proposal(state: ApprovalState) -> dict: if state["approved"]: return {"messages": ["Proposal executed!"]} else: return {"messages": [f"Proposal rejected: {state['feedback']}"]}
# Build with checkpointingworkflow = StateGraph(ApprovalState)
workflow.add_node("propose", create_proposal)workflow.add_node("await", await_approval)workflow.add_node("execute", execute_proposal)
workflow.add_edge(START, "propose")workflow.add_edge("propose", "await")workflow.add_edge("await", "execute")workflow.add_edge("execute", END)
# Compile with checkpointer for interruptscheckpointer = MemorySaver()app = workflow.compile( checkpointer=checkpointer, interrupt_before=["execute"] # Interrupt before execution)Resuming the workflow simply requires feeding the human-amended state back in:
# Start the workflowconfig = {"configurable": {"thread_id": "proposal-1"}}
# Run until interruptresult = app.invoke({"approved": False, "messages": []}, config)print("Paused at:", result)
# Human reviews and provides approval# Update state with human inputapp.update_state( config, {"approved": True, "feedback": "Looks good!"})
# Continue executionfinal = app.invoke(None, config) # None continues from checkpointprint("Final:", final)Persistence and State Checkpoints
Section titled “Persistence and State Checkpoints”Checkpointers are essential for crash-proofing applications. Memory is fine for local testing, but you’ll need robust DB backends in production.
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()app = graph.compile(checkpointer=checkpointer)
# Each run is identified by thread_idconfig = {"configurable": {"thread_id": "my-session-1"}}result = app.invoke(input_state, config)
# Get historyhistory = list(app.get_state_history(config))for state in history: print(f"Step: {state.metadata}")from langgraph.checkpoint.sqlite import SqliteSaver
# Persistent storagecheckpointer = SqliteSaver.from_conn_string("workflows.db")app = graph.compile(checkpointer=checkpointer)
# Workflows survive restarts!from langgraph.checkpoint.postgres import PostgresSaver
checkpointer = PostgresSaver.from_conn_string( "postgresql://user:pass@host:5432/db")app = graph.compile(checkpointer=checkpointer)Streaming Operations
Section titled “Streaming Operations”LangGraph supports granular streaming. You can emit entire state updates or individual tokens generated directly by the LLM inside nodes.
# Stream all eventsfor event in app.stream(input_state, config, stream_mode="values"): print(f"State update: {event}")
# Stream specific node outputsfor event in app.stream(input_state, config, stream_mode="updates"): print(f"Node output: {event}")# Stream tokens from LLM calls within nodesasync for event in app.astream_events(input_state, config, version="v2"): if event["event"] == "on_llm_stream": print(event["data"]["chunk"].content, end="", flush=True)Composing Workflows (Subgraphs)
Section titled “Composing Workflows (Subgraphs)”Encapsulating complexity makes architectures reusable. Any compiled graph can become a node in a broader parent graph.
# Create a reusable subgraphresearch_graph = StateGraph(ResearchState)research_graph.add_node("search", search_node)research_graph.add_node("summarize", summarize_node)research_graph.add_edge(START, "search")research_graph.add_edge("search", "summarize")research_graph.add_edge("summarize", END)research_subgraph = research_graph.compile()
# Use in parent graphmain_graph = StateGraph(MainState)main_graph.add_node("research", research_subgraph) # Subgraph as node!main_graph.add_node("write", write_node)main_graph.add_edge(START, "research")main_graph.add_edge("research", "write")main_graph.add_edge("write", END)Architecture Design Patterns
Section titled “Architecture Design Patterns”Here are battle-tested topologies to memorize for high-performance agentic systems.
Pattern 1: Research-Write-Review Cycle
graph LR Research --> Write --> Review --> Approved{Approved?} Approved -- No --> WritePattern 2: Multi-Stage Validation
graph TD Input --> ValFormat[Validate Format] ValFormat --> ValContent[Validate Content] ValContent --> ValPolicy[Validate Policy] ValPolicy --> Process
ValFormat --> Reject1[Reject] ValContent --> Reject2[Reject] ValPolicy --> Reject3[Reject]Pattern 3: Hierarchical Agents
graph TD Manager --> LeadA[Team Lead A] Manager --> LeadB[Team Lead B] Manager --> LeadC[Team Lead C]
LeadA --> Worker1[Worker 1] LeadB --> Worker2[Worker 2] LeadC --> Worker3[Worker 3]Pattern 4: MapReduce for Parallel Processing
graph TD Splitter --> Proc1[Process 1] Splitter --> Proc2[Process 2] Splitter --> Proc3[Process 3]
Proc1 --> Reducer Proc2 --> Reducer Proc3 --> ReducerROI Calculation
Section titled “ROI Calculation”Why invest in the steeper learning curve of LangGraph over rudimentary Airflow scripts? The economic value speaks for itself.
Without LangGraph (ad-hoc solution):- Development time: 3 months (senior engineer)- Crash-related rework: 15% of runs need manual intervention- Human review integration: Additional 2 weeks- Debugging production issues: 10 hours/week
With LangGraph:- Development time: 1 month (same engineer)- Crash-related rework: <1% (auto-resume handles most)- Human review: Built-in interrupts- Debugging: State replay eliminates most investigation
Annual savings for a team running 10K workflows/month:- Engineering time: ~$100K- Operational overhead: ~$50K- Reduced failures: ~$30K (API costs from restarts)Total: ~$180K/year| Component | Without LangGraph | With LangGraph |
|---|---|---|
| State Management | Often requires substantial custom code | Built-in |
| Crash Recovery | Manual replay (lost time + tokens) | Automatic resume |
| Human Review | Custom tooling | Native interrupts |
| Debugging | Log analysis | State replay |
| Company | Before LangGraph | After LangGraph | Reduction |
|---|---|---|---|
| Example team | thousands of lines | a few hundred lines | substantial |
| Example team | thousands of lines | a few hundred lines | substantial |
| Example team | thousands of lines | a few hundred lines | substantial |
| Feature | Airflow/Prefect | LangGraph |
|---|---|---|
| Primary use | Data pipelines | AI workflows |
| Node types | Python functions | LLM-aware functions |
| State management | External | Built-in with reducers |
| Streaming | Support varies by platform and pattern | First-class support |
| Human-in-loop | Often requires extra orchestration | Native interrupt/resume |
| LLM integration | DIY | Native with LangChain |
Common Mistakes
Section titled “Common Mistakes”| Mistake | Why it happens | Fix / Remedy |
|---|---|---|
| Forgetting State Annotations | Defining messages: List[str] replaces the list on every node execution. | Use Annotated[List[str], operator.add] for accumulation. |
| Infinite Loops | Conditional edges lack fallback limits, trapping agents in endless validation. | Track a retry_count in state and exit forcefully when it exceeds a threshold. |
| Missing Routing Edge Cases | Conditional edge fails to account for a string variant, raising a KeyError. | Usually return a fallback/default route in routing functions for unexpected cases. |
| Stateful Nodes | Relying on global Python variables (global count) inside nodes corrupts concurrent runs. | Enforce pure functions. Move all tracking to the TypedDict graph state. |
| Bloated State Objects | Pushing massive 10MB payloads directly into state kills memory during checkpoints. | Persist references to external storage (document_url: str) instead of raw data. |
| Missing Graph Compilation | You must compile the graph before invoking it. | Always instantiate via app = graph.compile() before invoking. |
| Unpersisted Threads | Pausing for human-in-the-loop without attaching a checkpointer wipes the session. | Define checkpointer=MemorySaver() (or Postgres) during .compile(). |
Avoiding Pitfalls through Proper Design
Section titled “Avoiding Pitfalls through Proper Design”# BAD: Lists get replaced, not accumulatedclass BadState(TypedDict): messages: List[str] # Each node replaces the list!
# GOOD: Use annotation for accumulationclass GoodState(TypedDict): messages: Annotated[List[str], operator.add]# BAD: No exit conditiongraph.add_conditional_edges("check", should_continue, { "retry": "process", "done": "output"})# If should_continue always returns "retry", infinite loop!
# GOOD: Add max retriesdef should_continue_safe(state): if state["attempts"] >= MAX_RETRIES: return "done" # Force exit return "retry" if not state["success"] else "done"# BAD: Missing routedef route(state): if condition_a: return "a" elif condition_b: return "b" # What if neither? KeyError!
# GOOD: Always have a defaultdef route(state): if condition_a: return "a" elif condition_b: return "b" else: return "default"# BAD: Node maintains internal statecounter = 0def bad_node(state): global counter counter += 1 # This persists across invocations! return {"count": counter}
# GOOD: All state in the graph statedef good_node(state): count = state.get("count", 0) + 1 return {"count": count}# BAD: Storing large data in stateclass BadState(TypedDict): full_document: str # 10MB document in every state snapshot!
# GOOD: Store references, load when neededclass GoodState(TypedDict): document_id: str # Reference to external storageGolden Best Practices
Section titled “Golden Best Practices”# Think about:# - What needs to persist across nodes?# - What should accumulate vs replace?# - What's the minimal state needed?
class WellDesignedState(TypedDict): # Core data input: str output: str
# Progress tracking (accumulates) steps_completed: Annotated[List[str], operator.add]
# Metadata (replaces) current_phase: str last_error: Optional[str]# Node should be deterministic given same statedef pure_node(state: MyState) -> dict: # Only use state input # No external side effects # Return consistent output return {"result": process(state["input"])}# Create reusable componentsvalidation_subgraph = create_validation_graph()processing_subgraph = create_processing_graph()
# Compose in main graphmain_graph.add_node("validate", validation_subgraph)main_graph.add_node("process", processing_subgraph)# Prevent runaway cyclesconfig = { "configurable": {"thread_id": "x"}, "recursion_limit": 50 # Max steps}result = app.invoke(state, config)# Visualize your graph to catch issuesfrom IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))Hands-On Exercise: Building a Multi-Agent Analyst
Section titled “Hands-On Exercise: Building a Multi-Agent Analyst”Objective: Implement a supervisor-managed research system that handles topic searches, evaluates depth, and loops if criteria aren’t met.
Step 1: Environment Setup Create a new directory and virtual environment:
mkdir langgraph-lab && cd langgraph-labpython3 -m venv venvsource venv/bin/activatepip install langgraph langchain-google-genai typing-extensionsStep 2: Define Your TypedDict State
Create a python script agent.py. Add a state dictionary that tracks the requested topic, a research_log (using operator.add to accumulate discoveries), and a loop_count integer to cap search attempts.
Step 3: Construct Worker Nodes Implement two Python functions:
researcher_node: Appends a mocked observation toresearch_log.reviewer_node: Evaluates if theresearch_loghas 2 or more entries. If so, updates a booleanis_comprehensiveflag to True.
Step 4: Implement Routing
Write a routing function taking the current state. If loop_count > 3, return "max_reached". If is_comprehensive is True, return "done". Otherwise, return "research".
Step 5: Graph Compilation & Execution
Instantiate a StateGraph, link the nodes, setup the conditional edge pointing out of the reviewer node, and execute .invoke().
View the Full Solution Code
import operatorfrom typing import TypedDict, List, Annotatedfrom langgraph.graph import StateGraph, START, END
# Step 2: State Definitionclass ResearchState(TypedDict): topic: str research_log: Annotated[List[str], operator.add] loop_count: int is_comprehensive: bool
# Step 3: Node Implementationdef researcher_node(state: ResearchState) -> dict: current_count = state.get("loop_count", 0) return { "research_log": [f"Found insight part {current_count + 1}."], "loop_count": current_count + 1 }
def reviewer_node(state: ResearchState) -> dict: logs = state.get("research_log", []) return { "is_comprehensive": len(logs) >= 2 }
# Step 4: Routing Logicdef routing_logic(state: ResearchState) -> str: if state.get("loop_count", 0) > 3: return "max_reached" if state.get("is_comprehensive"): return "done" return "research"
# Step 5: Compilationworkflow = StateGraph(ResearchState)workflow.add_node("researcher", researcher_node)workflow.add_node("reviewer", reviewer_node)
workflow.add_edge(START, "researcher")workflow.add_edge("researcher", "reviewer")workflow.add_conditional_edges( "reviewer", routing_logic, { "done": END, "max_reached": END, "research": "researcher" })
app = workflow.compile()output = app.invoke({"topic": "Quantum Computing", "research_log": [], "loop_count": 0})print("Final State:", output)Success Checklist
Section titled “Success Checklist”- Nodes accurately append to the
research_log. - Execution completes gracefully after hitting the minimum log requirement.
- Changing the
reviewer_nodelogic to require 5 items effectively triggers the max loop cutoff.
1. A developer notices their LangGraph workflow is replacing the message history instead of appending new messages. What is the architectural flaw?
The developer likely assigned `messages: List[str]` in their `TypedDict` instead of applying LangGraph's accumulation annotation. Without `Annotated[List[str], operator.add]`, LangGraph falls back to its default reducer behavior. This default behavior overwrites the existing state key entirely with the newly returned dictionary value. To resolve this, the state definition must explicitly instruct the framework to merge list contents.2. You need to implement a fallback mechanism where a failed API call is retried up to three times before halting. How does a graph accomplish this?
You must model the error and the retry logic explicitly as state data properties rather than relying on standard try-catch blocks alone. By incorporating an `attempts: int` counter and `success: bool` flag into your State, you gain granular control over execution flow. You can then write a routing function that inspects the outcome and iterates the cycle until `attempts >= 3`. At that point, the function must forcefully return a fallback edge to break the cycle.3. A team is designing a system where an AI agent proposes a trade, but a human must approve it before execution. How should this be implemented natively in LangGraph?
The framework handles this effortlessly using the specialized checkpointer interrupt feature. The team should initialize their orchestrator by calling `app.compile(checkpointer=MemorySaver(), interrupt_before=["execute"])`. This halts execution immediately before the trade execution node runs, preserving the current state safely in memory. Once human feedback is secured, it is injected back via `.update_state()`, allowing the system to proceed seamlessly.4. An architect wants three distinct specialized agents to analyze a document simultaneously and then combine their insights. Which graph configuration achieves this?
The architect should fan-out the execution by connecting the START node directly to all three worker nodes simultaneously via unconditional edges. Next, all three worker nodes must direct their downstream edges into a single downstream combination node. Because LangGraph automatically runs independent nodes in parallel, it naturally processes the three tasks concurrently. This approach eliminates latency bottlenecks before finally synchronizing the distinct outputs.5. A graph enters an infinite loop between a "review" node and a "draft" node. How can this architectural flaw be mitigated safely?
A runaway cycle typically happens when a conditional edge lacks an explicit architectural cap or fallback route. To mitigate this safely, introduce a `recursion_limit` directly in the execution configuration object. Additionally, you should track the total revision count inside the node state. This allows your routing logic to force an exit or transition to a generic "publish" edge once that count hits a sensible upper limit.6. During a high-traffic period, your container crashes halfway through a multi-agent debate. Without starting over, how can LangGraph resume from the exact point of failure?
LangGraph achieves system resiliency using persistent Checkpointers backed by external databases. If the graph was compiled with a `SqliteSaver` or `PostgresSaver`, the orchestrator simply reinvokes the workflow targeting the exact `thread_id` contained within the configuration object. The application then reconstructs the last fully processed state segment automatically. From there, it picks up execution at the subsequent unexecuted node without dropping any prior context.Module 18 Complete! Progress: 21/56 modules (38%)
Next: Module 1.5 - Building AI Agents — Move from framework components to full agent architectures, orchestration patterns, and production-ready design tradeoffs.
Sources
Section titled “Sources”- LangGraph GitHub Repository — This is the allowlisted upstream home for the framework and its high-level positioning.
- LangGraph Releases — Useful for checking official release chronology and version history.