Our Executor was working. Tasks were being prioritized and assigned. But we noticed a troubling pattern: projects would get stuck. One task would be completed, but the next one, which depended on the first, would never start. It was like a relay race where the first runner finished their leg, but there was no one there to take the baton.
Initially, we had hypothesized that implicit coordination through the database (the "Shared State" pattern) would be sufficient. Agent A finishes the task, the state changes to completed
, Agent B sees the change and starts.
This worked for simple, linear workflows. But it failed miserably in more complex scenarios:
Our architecture was missing an explicit mechanism for collaboration and knowledge transfer.
Inspired by OpenAI SDK primitives, we created our concept of Handoff. A Handoff is not just a task assignment; its a formal, context-rich handover between two agents.
Reference code: backend/database.py
(create_handoff
function)
A Handoff is a specific object in our database that contains:
Handoff Field | Description | Strategic Purpose |
---|---|---|
source_agent_id |
The agent who completed the work. | Traceability. |
target_agent_id |
The agent who should receive the work. | Explicit assignment. |
task_id |
The new task that is created as part of the handoff. | Links the handover to concrete action. |
context_summary |
An AI-generated summary from the source_agent that says: "I did X, and the most important thing you need to know for your next task is Y". |
This is the heart of the solution. It solves the context transfer problem. |
relevant_artifacts |
A list of IDs of deliverables or assets produced by the source_agent . |
Provides the target_agent with direct links to materials they need to work on. |
Workflow with Handoffs:
To ensure this system worked, we created a specific test.
Reference code: tests/test_tools_and_handoffs.py
This test didnt verify a single output, but an entire collaboration sequence:
Handoff
object is created in the database.context_summary
field of the Handoff contains an intelligent summary and is not empty.Relying on an implicit mechanism like shared state for collaboration is a recipe for failure in complex systems.
context_summary
is a form of "short-term memory" passed between agents. Its a specific insight for the next task, complementing the workspaces long-term memory.database.py
, making it a reusable system capability.We learned that effective collaboration between AI agents, just like between humans, requires explicit communication and efficient context transfer. The Handoff system provided exactly this.
✓ Dont rely solely on shared state. For complex workflows, you need explicit communication mechanisms between agents.
✓ Context is king. The most valuable part of a handover isnt the result, but the context summary that enables the next agent to be immediately productive.
✓ Design for collaboration. Think of your system not as a series of tasks, but as a network of collaborators. How do they pass information? How do they ensure work doesn't fall "between the cracks"?
Chapter Conclusion
With an orchestrator for strategic management and a handoff system for tactical collaboration, our "team" of agents was starting to look like a real team.
But who was deciding the composition of this team? Up to that point, we were manually defining the roles. To achieve true autonomy and scalability, we needed to delegate this responsibility to AI as well. It was time to create our AI Recruiter.
Initially, our priority system was trivial: a simple if/else
based on a priority
field ("high", "medium", "low") in the database. It worked for about a day.
We quickly realized that the true priority of a task isnt a static value, but depends on the dynamic context of the project. A low-priority task can suddenly become critical if its blocking ten other tasks.
This was our first real application of Pillar #2 (AI-Driven, zero hard-coding) at the orchestration level. We replaced the if/else
logic with a function we call _calculate_ai_driven_base_priority
.
Reference code: backend/executor.py
def _calculate_ai_driven_base_priority(task_data: dict, context: dict) -> int:
"""
Uses an AI model to calculate the strategic priority of a task.
"""
prompt = f"""
Analyze the following task and project context. Assign a priority score from 0 to 1000.
TASK: {task_data.get(name')}
DESCRIPTION: {task_data.get(description)}
PROJECT CONTEXT:
- Current Objective: {context.get(current_goal)}
- Blocked Tasks Waiting: {context.get(blocked_tasks_count)}
- Task Age (days): {context.get(task_age_days)}
Consider:
- Tasks that unblock other tasks are more important.
- Older tasks should have higher priority.
- Tasks directly connected to the current objective are critical.
Respond only with a JSON integer: {{"priority_score": }}
"""
# ... logic to call AI and parse response ...
return ai_response.get("priority_score", 100)
This transformed our Executor from a simple queue manager into a true AI Project Manager, capable of making strategic decisions about where to allocate team resources.
With the introduction of agents capable of creating other tasks, we unleashed a monster we hadn't anticipated: the infinite loop of task creation.
Disaster Logbook (July 26th):
INFO: Agent A created Task B.
INFO: Agent B created Task C.
INFO: Agent C created Task D.
... (after 20 minutes)
ERROR: Workspace a352c927... has 5,000+ pending tasks. Halting operations.
An agent, in a clumsy attempt to "decompose the problem", kept creating sub-tasks of sub-tasks, blocking the entire system.
The solution was twofold:
delegation_depth
field to each tasks context_data
. If a task was created by another task, its depth increased by 1. We set a maximum limit (e.g., 5 levels) to prevent infinite recursion.This experience taught us a fundamental lesson about managing autonomous systems: autonomy without limits leads to chaos. Its necessary to implement safety "fuses" that protect the system from itself.
Our AI-driven prioritization system had a hidden flaw that only manifested when we started testing it with more complex workspaces. The problem? Analysis paralysis.
Disaster Logbook:
INFO: Calculating AI-driven priority for Task_A...
INFO: AI priority calculation took 4.2 seconds
INFO: Calculating AI-driven priority for Task_B...
INFO: AI priority calculation took 3.8 seconds
INFO: Calculating AI-driven priority for Task_C...
INFO: AI priority calculation took 5.1 seconds
... (15 minutes later)
WARNING: Still calculating priorities. No tasks executed yet.
The problem was that each AI call to calculate priority took 3-5 seconds. With workspaces that had 20+ pending tasks, our event loop transformed into an "event crawl". The system was technically correct, but practically unusable.
The Solution: Intelligent Priority Caching with "Semantic Hashing"
Instead of calling AI for every single task, we introduced an intelligent semantic caching system:
def _get_cached_or_calculate_priority(task_data: dict, context: dict) -> int:
"""
Intelligent priority caching based on semantic hashing
"""
# Create a semantic hash of the task and context
semantic_hash = create_semantic_hash(task_data, context)
# Check if we've already calculated a similar priority
cached_priority = priority_cache.get(semantic_hash)
if cached_priority and cache_is_fresh(cached_priority, max_age_minutes=30):
return cached_priority.score
# Only if we dont have a valid cache, call AI
ai_priority = _calculate_ai_driven_base_priority(task_data, context)
priority_cache.set(semantic_hash, ai_priority, ttl=1800) # 30 min TTL
return ai_priority
The create_semantic_hash()
generates a hash based on the key concepts of the task (objective, content type, dependencies) rather than the exact string. This means similar tasks (e.g., "Write blog post about AI" vs "Create article on artificial intelligence") share the same cached priority.
Result: Average prioritization time dropped from 4 seconds to 0.1 seconds for 80% of tasks.
We were proud of our asynchronous worker pool. 10 workers that could process tasks in parallel, making the system extremely fast. At least, thats what we thought.
The problem emerged when we tested the system with a workspace requiring heavy web research. Multiple tasks started making simultaneous calls to different external APIs (Google search, social media, news databases).
Disaster Logbook:
INFO: Worker_1 executing research task (target: competitor analysis)
INFO: Worker_2 executing research task (target: market trends)
INFO: Worker_3 executing research task (target: industry reports)
... (all 10 workers active)
ERROR: Rate limit exceeded for Google Search API (429)
ERROR: Rate limit exceeded for Twitter API (429)
ERROR: Rate limit exceeded for News API (429)
WARNING: 7/10 workers stuck in retry loops
CRITICAL: Executor queue backup - 234 pending tasks
All workers had exhausted external API rate limits simultaneously, causing a domino effect. The system was technically scalable, but had created its worst enemy: resource contention.
The Solution: Intelligent Resource Arbitration
We introduced a Resource Arbitrator that manages shared resources (API calls, database connections, memory) like an intelligent semaphore:
class ResourceArbitrator:
def __init__(self):
self.resource_quotas = {
"google_search_api": TokenBucket(max_tokens=100, refill_rate=1),
"twitter_api": TokenBucket(max_tokens=50, refill_rate=0.5),
"database_connections": TokenBucket(max_tokens=20, refill_rate=10)
}
async def acquire_resource(self, resource_type: str, estimated_cost: int = 1):
"""
Acquires a resource if available, otherwise queues
"""
bucket = self.resource_quotas.get(resource_type)
if bucket and await bucket.consume(estimated_cost):
return ResourceLock(resource_type, estimated_cost)
else:
# Queue the task for this specific resource
await self.queue_for_resource(resource_type, estimated_cost)
# In the executor:
async def execute_task_with_arbitration(task_data):
required_resources = analyze_required_resources(task_data)
# Acquire all necessary resources before starting
async with resource_arbitrator.acquire_resources(required_resources):
return await execute_task(task_data)
Result: Rate limit errors dropped by 95%, system throughput increased by 40% thanks to better resource management.
What we had built was powerful, but still monolithic. As the system grew, we realized orchestration needed more nuances:
This led us, in later phases of the project, to completely rethink the orchestration architecture. But this is a story well tell in Part II of this manual, when we explore how we went from an MVP to an enterprise-ready system.
For more technical readers, its worth exploring how we implemented the Executors central event loop. Its not a simple while True
, but a layered system:
class IntelligentEventLoop:
def __init__(self):
self.polling_intervals = {
"high_priority_workspaces": 5, # seconds
"normal_workspaces": 15, # seconds
"low_activity_workspaces": 60, # seconds
"maintenance_mode": 300 # seconds
}
self.workspace_activity_tracker = ActivityTracker()
async def adaptive_polling_cycle(self):
"""
Polling cycle that adapts intervals based on activity
"""
while self.is_running:
workspaces_by_priority = self.classify_workspaces_by_activity()
for priority_tier, workspaces in workspaces_by_priority.items():
interval = self.polling_intervals[priority_tier]
# Process high-priority workspaces more frequently
if time.time() - self.last_poll_time[priority_tier] >= interval:
await self.process_workspaces_batch(workspaces)
self.last_poll_time[priority_tier] = time.time()
# Dynamic pause based on system load
await asyncio.sleep(self.calculate_dynamic_sleep_time())
This adaptive polling approach means active workspaces are checked every 5 seconds, while dormant workspaces are checked only every 5 minutes, optimizing both responsiveness and efficiency.
After implementing the optimizations, our system achieved these metrics:
Metric | Baseline (v1) | Optimized (v2) | Improvement |
---|---|---|---|
Task/sec throughput | 2.3 | 8.1 | +252% |
Average prioritization time | 4.2s | 0.1s | -97% |
Resource contention errors | 34/hour | 1.7/hour | -95% |
Memory usage (idle) | 450MB | 280MB | -38% |
websearch
).The adoption of these primitives accelerated our development exponentially. Instead of building complex systems for memory or tool management from scratch, we were able to leverage ready-made, tested, and optimized components.
Our decision to adopt an SDK wasnt just a tactical choice to simplify code, but a strategic bet on a more open and interoperable future. At the heart of this vision is a fundamental concept: the Model Context Protocol (MCP).
What is MCP? The "USB-C" for Artificial Intelligence.
Imagine a world where every AI tool (an analysis tool, a vector database, another agent) speaks a different language. To make them collaborate, you have to build a custom adapter for every pair. Its an integration nightmare.
MCP aims to solve this problem. Its an open protocol that standardizes how applications provide context and tools to LLMs. It works like a USB-C port: a single standard that allows any AI model to connect to any data source or tool that "speaks" the same language.
Why MCP is the Future (and why we care):
Choosing an SDK that embraces (or moves toward) MCP principles is a strategic move that aligns perfectly with our pillars:
MCP Strategic Benefit | Description | Corresponding Reference Pillar |
---|---|---|
End of Vendor Lock-in | If more models and tools support MCP, we can switch AI providers or integrate new third-party tools with minimal effort. | ;#15 (Robustness & Fallback) |
A "Plug-and-Play" Tool Ecosystem | A true marketplace of specialized tools (financial, scientific, creative) will emerge that we can "plug into" our agents instantly. | #14 (Modular Tool/Service-Layer) |
Interoperability Between Agents | Two different agent systems, built by different companies, could collaborate if both support MCP. This unlocks industry-wide automation potential. | #4 (Scalable & Self-learning) |
Our choice to use the OpenAI Agents SDK was therefore a bet that, even though the SDK itself is specific, the principles its based on (tool abstraction, handoffs, context management) are the same ones driving the MCP standard. Were building our cathedral not on sand foundations, but on rocky ground that's becoming standardized.
The "easy" path would have led us to a complex, entangled, and fragile system. The "simple" path, while requiring more initial work to configure the SDK, led us to a system much easier to understand, maintain, and extend.
This decision paid enormous dividends almost immediately. When we had to implement memory, tools, and quality gates, we didnt have to build the infrastructure from scratch. We could use the primitives the SDK already offered.
✓ Abstract External Dependencies: Never couple your business logic directly to an external API. Always use an abstraction layer.
✓ Think in Terms of "Capabilities", not "API Calls": The SDK allowed us to stop thinking about "how to format the request for endpoint X" and start thinking about "how can I use this agents planning capability?".
✓ Leverage Existing Primitives: Before building a complex system (e.g., memory management), check if the SDK youre using already offers a solution. Reinventing the wheel is a classic mistake that leads to technical debt.
Chapter Conclusion
With the SDK as the backbone of our architecture, we finally had all the pieces to build not just agents, but a real team. We had a common language and robust infrastructure.
We were ready for the next challenge: orchestration. How to make these specialized agents collaborate to achieve a common goal? This led us to create the Executor, our conductor.