One-Line Summary: Modeling task steps as a directed acyclic graph (DAG) enables agents to identify parallelizable work and execute skills in optimal order.
Prerequisites: breaking-complex-tasks-into-steps.md, basic graph theory (nodes, edges, topological sort)
What Is a Dependency Graph for Skill Execution?
Think of a dependency graph like a recipe with timing constraints. You can chop vegetables while the oven preheats -- those tasks are independent. But you cannot frost the cake until it has baked and cooled -- those tasks form a chain. A skilled cook identifies these relationships and overlaps independent tasks to finish faster. An AI agent does the same with its tool calls.
A dependency graph represents each sub-task as a node and each "must happen before" relationship as a directed edge. When task B requires the output of task A, there is an edge from A to B. The resulting graph must be acyclic (no circular dependencies) -- a DAG. This structure tells the agent exactly which tasks can run in parallel, which must wait, and in what order the waiting tasks should execute.
Without a dependency graph, the agent defaults to sequential execution -- one step at a time, even when three or four steps could run simultaneously. For tasks involving multiple independent API calls, this difference can mean 3-4x slower execution. The graph transforms a linear plan into an optimized parallel schedule.
How It Works
Building the Dependency Graph
The agent constructs the graph by analyzing each step's inputs and outputs. If step C's input references step A's output, there is a dependency edge from A to C.
from dataclasses import dataclass, field
@dataclass
class TaskNode:
id: str
tool: str
input_spec: dict
depends_on: list[str] = field(default_factory=list)
output_key: str = ""
class DependencyGraph:
def __init__(self):
self.nodes: dict[str, TaskNode] = {}
def add_task(self, node: TaskNode):
self.nodes[node.id] = node
def add_dependency(self, from_id: str, to_id: str):
"""from_id must complete before to_id can start."""
self.nodes[to_id].depends_on.append(from_id)
def validate(self) -> bool:
"""Check for cycles using DFS."""
visited = set()
in_stack = set()
def dfs(node_id: str) -> bool:
if node_id in in_stack:
return False # Cycle detected
if node_id in visited:
return True
visited.add(node_id)
in_stack.add(node_id)
for dep in self.nodes[node_id].depends_on:
if not dfs(dep):
return False
in_stack.discard(node_id)
return True
return all(dfs(nid) for nid in self.nodes if nid not in visited)Topological Sorting for Execution Order
Topological sorting produces an ordering where every task appears after all of its dependencies. More usefully for parallel execution, we compute the topological sort in layers (also called "levels" or "waves") -- each layer contains tasks that can run concurrently.
from collections import deque
def topological_layers(graph: DependencyGraph) -> list[list[str]]:
"""Return tasks grouped into parallelizable layers."""
in_degree = {nid: 0 for nid in graph.nodes}
for node in graph.nodes.values():
for dep in node.depends_on:
in_degree[node.id] += 1 # counts are per-node, not per-dep
# Recount properly
in_degree = {nid: len(graph.nodes[nid].depends_on) for nid in graph.nodes}
queue = deque([nid for nid, deg in in_degree.items() if deg == 0])
layers = []
while queue:
layer = list(queue)
layers.append(layer)
queue.clear()
for nid in layer:
for candidate_id, candidate in graph.nodes.items():
if nid in candidate.depends_on:
in_degree[candidate_id] -= 1
if in_degree[candidate_id] == 0:
queue.append(candidate_id)
executed = sum(len(l) for l in layers)
if executed != len(graph.nodes):
raise ValueError("Cycle detected: not all tasks were scheduled")
return layersFan-Out / Fan-In Pattern
The most common parallel pattern in agent execution is fan-out/fan-in: one task produces a list, multiple tasks process items from that list in parallel, and a final task aggregates the results.
┌── Search API A ──┐
│ │
Start ──┼── Search API B ──┼── Merge Results ── Format Output
│ │
└── Search API C ──┘
(fan-out) (fan-in)Here is a concrete implementation:
import asyncio
async def execute_dag(graph: DependencyGraph) -> dict[str, any]:
"""Execute a dependency graph with maximum parallelism."""
results: dict[str, any] = {}
layers = topological_layers(graph)
for layer in layers:
# All tasks in a layer can run concurrently
tasks = []
for node_id in layer:
node = graph.nodes[node_id]
resolved_input = resolve_inputs(node.input_spec, results)
tasks.append(execute_single(node_id, node.tool, resolved_input))
layer_results = await asyncio.gather(*tasks, return_exceptions=True)
for node_id, result in zip(layer, layer_results):
if isinstance(result, Exception):
results[node_id] = {"error": str(result)}
else:
results[node_id] = result
return results
async def execute_single(node_id: str, tool: str, inputs: dict) -> any:
"""Execute a single tool call."""
tool_fn = TOOL_REGISTRY[tool]
return await tool_fn(**inputs)
def resolve_inputs(input_spec: dict, results: dict) -> dict:
"""Replace $task_id references with actual results."""
resolved = {}
for key, value in input_spec.items():
if isinstance(value, str) and value.startswith("$"):
ref_id = value[1:].split(".")[0]
path = value[1:].split(".")[1:]
obj = results[ref_id]
for p in path:
obj = obj[p] if isinstance(obj, dict) else getattr(obj, p)
resolved[key] = obj
else:
resolved[key] = value
return resolvedReal Example: Concurrent API Lookups
Consider an agent that needs to gather a company's stock price, recent news, and weather at their headquarters to generate a briefing. The stock and news lookups are independent, but the weather lookup requires knowing the HQ location (which comes from the company info).
# Build the graph
g = DependencyGraph()
g.add_task(TaskNode(
id="company_info",
tool="web_search",
input_spec={"query": "Acme Corp headquarters location"},
))
g.add_task(TaskNode(
id="stock_price",
tool="finance_api",
input_spec={"symbol": "ACME"},
))
g.add_task(TaskNode(
id="recent_news",
tool="news_api",
input_spec={"query": "Acme Corp"},
))
g.add_task(TaskNode(
id="hq_weather",
tool="weather_api",
input_spec={"location": "$company_info.hq_city"},
depends_on=["company_info"],
))
g.add_task(TaskNode(
id="briefing",
tool="llm_summarize",
input_spec={
"stock": "$stock_price",
"news": "$recent_news",
"weather": "$hq_weather",
"info": "$company_info",
},
depends_on=["stock_price", "recent_news", "hq_weather"],
))
# Execution layers:
# Layer 0: [company_info, stock_price, recent_news] -- 3 tasks in parallel
# Layer 1: [hq_weather] -- needs company_info
# Layer 2: [briefing] -- needs all above
# Sequential: 5 steps × ~1s each = ~5s
# DAG-parallel: 3 layers × ~1s each = ~3s (40% faster)Handling Failures in the DAG
When a node fails, all downstream nodes that depend on it must be marked as blocked. Independent branches continue executing normally.
def mark_blocked(graph: DependencyGraph, failed_id: str) -> set[str]:
"""Find all nodes transitively dependent on the failed node."""
blocked = set()
queue = deque([failed_id])
while queue:
current = queue.popleft()
for nid, node in graph.nodes.items():
if current in node.depends_on and nid not in blocked:
blocked.add(nid)
queue.append(nid)
return blockedWhy It Matters
Execution Speed
For tasks with independent branches, DAG-based execution can reduce wall-clock time by 40-70%. An agent making 6 independent API calls sequentially takes 6 round trips. With a DAG, those 6 calls happen in one round trip.
Correctness Guarantees
The dependency graph makes execution order explicit and verifiable. There is no risk of a step running before its prerequisites are ready, which is a common bug in ad-hoc parallel execution code.
Key Technical Details
- Topological layer computation is O(V + E) where V is nodes and E is edges -- negligible overhead even for 100+ step plans
- In practice, most agent DAGs have 5-15 nodes and 2-4 layers
- asyncio.gather with
return_exceptions=Trueprevents one failure from canceling parallel siblings - Fan-out degree (number of parallel tasks) should be capped at 5-10 to avoid overwhelming rate-limited APIs
- The critical path (longest chain of dependent tasks) determines the minimum possible execution time regardless of parallelism
- Dynamic DAG modification (adding/removing nodes at runtime) supports adaptive replanning
Common Misconceptions
"More parallelism is always better": Parallelism is limited by the critical path, API rate limits, and resource constraints. If three of your five parallel API calls hit the same rate-limited endpoint, they serialize anyway. The DAG shows theoretical parallelism; the executor must respect practical limits.
"DAGs cannot handle loops": Correct -- a DAG is acyclic by definition. For iterative processes (retry loops, pagination), the loop logic lives inside a single node. The node appears once in the DAG, but its internal implementation may iterate multiple times.
Connections to Other Concepts
breaking-complex-tasks-into-steps.md-- Decomposition produces the nodes; dependency analysis produces the edgesparallel-skill-execution.md-- The runtime execution of parallel DAG layersplan-then-execute-pattern.md-- Plans can be represented as DAGs rather than linear sequencesadaptive-replanning.md-- Failed DAG nodes trigger replanning of downstream subgraphs
Further Reading
- Kahn, "Topological Sorting of Large Networks" (1962) -- The original algorithm for topological sort used in DAG scheduling
- LangGraph Documentation, "Branching and Merging" (2024) -- Practical guide to fan-out/fan-in in LangGraph
- Dean and Ghemawat, "MapReduce: Simplified Data Processing on Large Clusters" (2004) -- The fan-out/fan-in pattern at massive scale