We had specialized agents and a shared working environment. But we were missing the most important piece: a central brain. A component that could look at the big picture, decide which task was most important at any given moment, and assign it to the right agent.
Without an orchestrator, our system would have been like an orchestra without a conductor: a group of talented musicians all playing simultaneously, creating only noise.
We designed our orchestrator, which we called Executor
, not as a simple queue manager, but as an intelligent and continuous event loop.
Reference code: backend/executor.py
Its basic operation is simple but powerful:
pending
status.Executor Orchestration Flow:
Initially, our priority system was trivial: a simple if/else
based on a priority
field ("high", "medium", "low") in the database. It worked for about a day.
We quickly realized that the true priority of a task isnt a static value, but depends on the dynamic context of the project. A low-priority task can suddenly become critical if its blocking ten other tasks.
This was our first real application of Pillar #2 (AI-Driven, zero hard-coding) at the orchestration level. We replaced the if/else
logic with a function we call _calculate_ai_driven_base_priority
.
Reference code: backend/executor.py
def _calculate_ai_driven_base_priority(task_data: dict, context: dict) -> int:
"""
Uses an AI model to calculate the strategic priority of a task.
"""
prompt = f"""
Analyze the following task and project context. Assign a priority score from 0 to 1000.
TASK: {task_data.get(name')}
DESCRIPTION: {task_data.get(description)}
PROJECT CONTEXT:
- Current Objective: {context.get('current_goal)}
- Blocked Tasks Waiting: {context.get(blocked_tasks_count)}
- Task Age (days): {context.get(task_age_days)}
Consider:
- Tasks that unblock other tasks are more important.
- Older tasks should have higher priority.
- Tasks directly connected to the current objective are critical.
Respond only with a JSON integer: {{"priority_score": }}
"""
# ... logic to call AI and parse response ...
return ai_response.get("priority_score", 100)
This transformed our Executor from a simple queue manager into a true AI Project Manager, capable of making strategic decisions about where to allocate team resources.
With the introduction of agents capable of creating other tasks, we unleashed a monster we hadnt anticipated: the infinite loop of task creation.
Disaster Logbook (July 26th):
INFO: Agent A created Task B.
INFO: Agent B created Task C.
INFO: Agent C created Task D.
... (after 20 minutes)
ERROR: Workspace a352c927... has 5,000+ pending tasks. Halting operations.
An agent, in a clumsy attempt to "decompose the problem", kept creating sub-tasks of sub-tasks, blocking the entire system.
The solution was twofold:
delegation_depth
field to each tasks context_data
. If a task was created by another task, its depth increased by 1. We set a maximum limit (e.g., 5 levels) to prevent infinite recursion.This experience taught us a fundamental lesson about managing autonomous systems: autonomy without limits leads to chaos. Its necessary to implement safety "fuses" that protect the system from itself.
Our AI-driven prioritization system had a hidden flaw that only manifested when we started testing it with more complex workspaces. The problem? Analysis paralysis.
Disaster Logbook:
INFO: Calculating AI-driven priority for Task_A...
INFO: AI priority calculation took 4.2 seconds
INFO: Calculating AI-driven priority for Task_B...
INFO: AI priority calculation took 3.8 seconds
INFO: Calculating AI-driven priority for Task_C...
INFO: AI priority calculation took 5.1 seconds
... (15 minutes later)
WARNING: Still calculating priorities. No tasks executed yet.
The problem was that each AI call to calculate priority took 3-5 seconds. With workspaces that had 20+ pending tasks, our event loop transformed into an "event crawl". The system was technically correct, but practically unusable.
The Solution: Intelligent Priority Caching with "Semantic Hashing"
Instead of calling AI for every single task, we introduced an intelligent semantic caching system:
def _get_cached_or_calculate_priority(task_data: dict, context: dict) -> int:
"""
Intelligent priority caching based on semantic hashing
"""
# Create a semantic hash of the task and context
semantic_hash = create_semantic_hash(task_data, context)
# Check if we've already calculated a similar priority
cached_priority = priority_cache.get(semantic_hash)
if cached_priority and cache_is_fresh(cached_priority, max_age_minutes=30):
return cached_priority.score
# Only if we dont have a valid cache, call AI
ai_priority = _calculate_ai_driven_base_priority(task_data, context)
priority_cache.set(semantic_hash, ai_priority, ttl=1800) # 30 min TTL
return ai_priority
The create_semantic_hash()
generates a hash based on the key concepts of the task (objective, content type, dependencies) rather than the exact string. This means similar tasks (e.g., "Write blog post about AI" vs "Create article on artificial intelligence") share the same cached priority.
Result: Average prioritization time dropped from 4 seconds to 0.1 seconds for 80% of tasks.
We were proud of our asynchronous worker pool. 10 workers that could process tasks in parallel, making the system extremely fast. At least, thats what we thought.
The problem emerged when we tested the system with a workspace requiring heavy web research. Multiple tasks started making simultaneous calls to different external APIs (Google search, social media, news databases).
Disaster Logbook:
INFO: Worker_1 executing research task (target: competitor analysis)
INFO: Worker_2 executing research task (target: market trends)
INFO: Worker_3 executing research task (target: industry reports)
... (all 10 workers active)
ERROR: Rate limit exceeded for Google Search API (429)
ERROR: Rate limit exceeded for Twitter API (429)
ERROR: Rate limit exceeded for News API (429)
WARNING: 7/10 workers stuck in retry loops
CRITICAL: Executor queue backup - 234 pending tasks
All workers had exhausted external API rate limits simultaneously, causing a domino effect. The system was technically scalable, but had created its worst enemy: resource contention.
The Solution: Intelligent Resource Arbitration
We introduced a Resource Arbitrator that manages shared resources (API calls, database connections, memory) like an intelligent semaphore:
class ResourceArbitrator:
def __init__(self):
self.resource_quotas = {
"google_search_api": TokenBucket(max_tokens=100, refill_rate=1),
"twitter_api": TokenBucket(max_tokens=50, refill_rate=0.5),
"database_connections": TokenBucket(max_tokens=20, refill_rate=10)
}
async def acquire_resource(self, resource_type: str, estimated_cost: int = 1):
"""
Acquires a resource if available, otherwise queues
"""
bucket = self.resource_quotas.get(resource_type)
if bucket and await bucket.consume(estimated_cost):
return ResourceLock(resource_type, estimated_cost)
else:
# Queue the task for this specific resource
await self.queue_for_resource(resource_type, estimated_cost)
# In the executor:
async def execute_task_with_arbitration(task_data):
required_resources = analyze_required_resources(task_data)
# Acquire all necessary resources before starting
async with resource_arbitrator.acquire_resources(required_resources):
return await execute_task(task_data)
Result: Rate limit errors dropped by 95%, system throughput increased by 40% thanks to better resource management.
What we had built was powerful, but still monolithic. As the system grew, we realized orchestration needed more nuances:
This led us, in later phases of the project, to completely rethink the orchestration architecture. But this is a story well tell in Part II of this manual, when we explore how we went from an MVP to an enterprise-ready system.
For more technical readers, its worth exploring how we implemented the Executors central event loop. It's not a simple while True
, but a layered system:
class IntelligentEventLoop:
def __init__(self):
self.polling_intervals = {
"high_priority_workspaces": 5, # seconds
"normal_workspaces": 15, # seconds
"low_activity_workspaces": 60, # seconds
"maintenance_mode": 300 # seconds
}
self.workspace_activity_tracker = ActivityTracker()
async def adaptive_polling_cycle(self):
"""
Polling cycle that adapts intervals based on activity
"""
while self.is_running:
workspaces_by_priority = self.classify_workspaces_by_activity()
for priority_tier, workspaces in workspaces_by_priority.items():
interval = self.polling_intervals[priority_tier]
# Process high-priority workspaces more frequently
if time.time() - self.last_poll_time[priority_tier] >= interval:
await self.process_workspaces_batch(workspaces)
self.last_poll_time[priority_tier] = time.time()
# Dynamic pause based on system load
await asyncio.sleep(self.calculate_dynamic_sleep_time())
This adaptive polling approach means active workspaces are checked every 5 seconds, while dormant workspaces are checked only every 5 minutes, optimizing both responsiveness and efficiency.
After implementing the optimizations, our system achieved these metrics:
Metric | Baseline (v1) | Optimized (v2) | Improvement |
---|---|---|---|
Task/sec throughput | 2.3 | 8.1 | +252% |
Average prioritization time | 4.2s | 0.1s | -97% |
Resource contention errors | 34/hour | 1.7/hour | -95% |
Memory usage (idle) | 450MB | 280MB | -38% |
websearch
).The adoption of these primitives accelerated our development exponentially. Instead of building complex systems for memory or tool management from scratch, we were able to leverage ready-made, tested, and optimized components.
Our decision to adopt an SDK wasnt just a tactical choice to simplify code, but a strategic bet on a more open and interoperable future. At the heart of this vision is a fundamental concept: the Model Context Protocol (MCP).
What is MCP? The "USB-C" for Artificial Intelligence.
Imagine a world where every AI tool (an analysis tool, a vector database, another agent) speaks a different language. To make them collaborate, you have to build a custom adapter for every pair. Its an integration nightmare.
MCP aims to solve this problem. Its an open protocol that standardizes how applications provide context and tools to LLMs. It works like a USB-C port: a single standard that allows any AI model to connect to any data source or tool that "speaks" the same language.
Why MCP is the Future (and why we care):
Choosing an SDK that embraces (or moves toward) MCP principles is a strategic move that aligns perfectly with our pillars:
MCP Strategic Benefit | Description | Corresponding Reference Pillar |
---|---|---|
End of Vendor Lock-in | If more models and tools support MCP, we can switch AI providers or integrate new third-party tools with minimal effort. | ;#15 (Robustness & Fallback) |
A "Plug-and-Play" Tool Ecosystem | A true marketplace of specialized tools (financial, scientific, creative) will emerge that we can "plug into" our agents instantly. | #14 (Modular Tool/Service-Layer) |
Interoperability Between Agents | Two different agent systems, built by different companies, could collaborate if both support MCP. This unlocks industry-wide automation potential. | #4 (Scalable & Self-learning) |
Our choice to use the OpenAI Agents SDK was therefore a bet that, even though the SDK itself is specific, the principles its based on (tool abstraction, handoffs, context management) are the same ones driving the MCP standard. Were building our cathedral not on sand foundations, but on rocky ground that's becoming standardized.
The "easy" path would have led us to a complex, entangled, and fragile system. The "simple" path, while requiring more initial work to configure the SDK, led us to a system much easier to understand, maintain, and extend.
This decision paid enormous dividends almost immediately. When we had to implement memory, tools, and quality gates, we didnt have to build the infrastructure from scratch. We could use the primitives the SDK already offered.
✓ Abstract External Dependencies: Never couple your business logic directly to an external API. Always use an abstraction layer.
✓ Think in Terms of "Capabilities", not "API Calls": The SDK allowed us to stop thinking about "how to format the request for endpoint X" and start thinking about "how can I use this agent's planning capability?".
✓ Leverage Existing Primitives: Before building a complex system (e.g., memory management), check if the SDK youre using already offers a solution. Reinventing the wheel is a classic mistake that leads to technical debt.
Chapter Conclusion
With the SDK as the backbone of our architecture, we finally had all the pieces to build not just agents, but a real team. We had a common language and robust infrastructure.
We were ready for the next challenge: orchestration. How to make these specialized agents collaborate to achieve a common goal? This led us to create the Executor, our conductor.
The Code Implementation (Simplified):
Reference code: backend/database.py
def try_claim_task(agent_id: str, task_id: str) -> bool:
"""
Tries to claim a task atomically. Returns True if successful, False if another agent claimed it first.
"""
try:
# This UPDATE query only succeeds if the task is still pending
result = supabase.table(tasks).update({
status': in_progress,
assigned_agent_id: agent_id,
started_at: datetime.utcnow().isoformat()
});.eq(id, task_id).eq('status, pending').execute()
# If no rows were affected, another agent already claimed the task
return len(result.data) > 0
except Exception as e:
logger.error(f"Error claiming task {task_id}: {e}")
return False
This simple conditional update ensured that only one agent could claim a task, eliminating race conditions and duplicate work.
As our agents became more capable, our database schema had to evolve to support increasingly complex interactions.
Phase 1: Basic Task Management
We started with simple tables: tasks
, agents
, workspaces
. Basic CRUD operations.
Phase 2: Memory Integration
We added memory_insights
, context_embeddings
tables. Agents could now learn and remember.
Phase 3: Quality Gates
We introduced quality_checks
, human_feedback
. Every deliverable had to pass validation.
Phase 4: Advanced Orchestration
Finally: goal_progress_logs
, agent_handoffs
, deliverable_assets
. A complete ecosystem.
Each phase required us to maintain backward compatibility while adding new capabilities. The DAL pattern proved invaluable here: changes to the database schema required updates only to our database.py
file, not to every agent.
The most important insight from this phase was changing our mental model. We stopped thinking of the database as a mere "storage" and started treating it as a communication protocol between agents.
Every table became a "channel":
tasks
table was the "work queue" – agents published work here and claimed assignments.memory_insights
table was the "knowledge sharing channel" – agents contributed learnings for others to benefit from.goal_progress_logs
table was the "coordination channel" – agents announced progress and celebrated achievements.This paradigm shift from "storage-centric" to "communication-centric" was fundamental to scaling our system. Instead of requiring complex inter-agent communication protocols, we had a simple, reliable, and auditable message-passing system.
✓ Design for Concurrency from Day One: Multi-agent systems will have race conditions. Plan for them with atomic operations and proper locking.
✓ Use a Data Access Layer (DAL): Never let your agents talk directly to the database. Abstract all interactions through a dedicated service layer.
✓ Database as Communication Protocol: In a multi-agent system, your database isnt just storage – its the nervous system enabling coordination.
✓ Plan for Schema Evolution: Your data needs will grow more complex. Design your abstractions to handle schema changes gracefully.
Chapter Conclusion
With a robust database interaction layer, our agents finally had "hands" to manipulate their environment. They could read tasks, update progress, create new work, and share knowledge. We had built the foundation for true collaboration.
But having capable individual agents wasnt enough. We needed someone to conduct the orchestra, to ensure the right agent got the right task at the right time. This brought us to our next challenge: building the Orchestrator, the brain that would coordinate our entire AI team.