> Production Agent Deployment

Budding
planted Jan 8, 2026tended Jan 8, 2026
#ai-agents#production#deployment#scaling#operations

Production Agent Deployment

🌿 Budding note β€” taking agents from prototype to production.

Production Readiness

Before deploying agents to production:

Requirements:

  • Error handling and recovery
  • Logging and monitoring
  • Rate limiting and quotas
  • Security hardening
  • Cost management
  • Performance optimization

Related: AI Agents Fundamentals and Agent Security Considerations

Architecture Patterns

1. API-Based Deployment

from fastapi import FastAPI, HTTPException
from pydantic import BaseModel

app = FastAPI()

class AgentRequest(BaseModel):
    task: str
    user_id: str

class AgentResponse(BaseModel):
    result: str
    tokens_used: int
    execution_time: float

@app.post("/agent/run", response_model=AgentResponse)
async def run_agent(request: AgentRequest):
    """Run agent via API"""
    try:
        start = time.time()

        # Rate limit check
        if not rate_limiter.check(request.user_id):
            raise HTTPException(429, "Rate limit exceeded")

        # Execute agent
        result = await agent.process(request.task)

        return AgentResponse(
            result=result.text,
            tokens_used=result.usage.total_tokens,
            execution_time=time.time() - start
        )

    except Exception as e:
        logger.error(f"Agent error: {e}", exc_info=True)
        raise HTTPException(500, str(e))

2. Queue-Based Processing

from celery import Celery
from redis import Redis

app = Celery('agents', broker='redis://localhost:6379/0')
redis_client = Redis()

@app.task(bind=True, max_retries=3)
def process_agent_task(self, task_id: str, task_data: dict):
    """Background agent processing"""
    try:
        # Update status
        redis_client.setex(f"task:{task_id}:status", 3600, "processing")

        # Run agent
        result = agent.process(task_data["query"])

        # Store result
        redis_client.setex(
            f"task:{task_id}:result",
            3600,
            json.dumps(result)
        )

        return {"status": "completed", "result": result}

    except Exception as e:
        # Retry with backoff
        self.retry(exc=e, countdown=2 ** self.request.retries)

3. Serverless Deployment

# AWS Lambda handler
import json

def lambda_handler(event, context):
    """Serverless agent execution"""
    try:
        task = json.loads(event['body'])

        # Cold start optimization: reuse client
        global agent_client
        if 'agent_client' not in globals():
            agent_client = initialize_agent()

        result = agent_client.process(task['query'])

        return {
            'statusCode': 200,
            'body': json.dumps({
                'result': result,
                'request_id': context.request_id
            })
        }

    except Exception as e:
        return {
            'statusCode': 500,
            'body': json.dumps({'error': str(e)})
        }

Cost Management

Token Budgets

class CostManager:
    """Manage API costs"""
    def __init__(self, budget_per_user: float):
        self.budget = budget_per_user
        self.usage = defaultdict(float)

    def check_budget(self, user_id: str, estimated_cost: float) -> bool:
        """Check if user has budget"""
        current_usage = self.usage[user_id]
        return (current_usage + estimated_cost) <= self.budget

    def record_usage(self, user_id: str, actual_cost: float):
        """Track spending"""
        self.usage[user_id] += actual_cost

        # Alert if approaching limit
        if self.usage[user_id] > self.budget * 0.9:
            self.alert_approaching_limit(user_id)

# Usage
cost_mgr = CostManager(budget_per_user=10.0)  # $10 per user

if not cost_mgr.check_budget(user_id, estimated_cost=0.50):
    raise BudgetExceeded("Monthly budget exceeded")

result = agent.process(task)
cost_mgr.record_usage(user_id, result.cost)

Caching Strategies

import hashlib
from functools import lru_cache

class ResponseCache:
    """Cache agent responses"""
    def __init__(self, redis_client):
        self.redis = redis_client
        self.ttl = 3600  # 1 hour

    def get_cache_key(self, query: str, context: dict = None) -> str:
        """Generate cache key"""
        data = f"{query}:{json.dumps(context or {}, sort_keys=True)}"
        return hashlib.md5(data.encode()).hexdigest()

    async def get_or_compute(self, query: str, context: dict, compute_fn):
        """Get from cache or compute"""
        cache_key = self.get_cache_key(query, context)

        # Check cache
        cached = self.redis.get(f"agent:response:{cache_key}")
        if cached:
            return json.loads(cached)

        # Compute
        result = await compute_fn(query, context)

        # Cache
        self.redis.setex(
            f"agent:response:{cache_key}",
            self.ttl,
            json.dumps(result)
        )

        return result

Monitoring

Metrics Collection

from prometheus_client import Counter, Histogram, Gauge

# Define metrics
agent_requests = Counter(
    'agent_requests_total',
    'Total agent requests',
    ['agent_type', 'status']
)

agent_latency = Histogram(
    'agent_latency_seconds',
    'Agent response time',
    ['agent_type']
)

agent_tokens = Histogram(
    'agent_tokens_used',
    'Tokens used per request',
    ['agent_type']
)

active_agents = Gauge(
    'agent_active_count',
    'Number of currently running agents'
)

# Use in agent
async def monitored_agent_call(task: str):
    """Agent call with metrics"""
    active_agents.inc()
    start = time.time()

    try:
        result = await agent.process(task)

        # Record success metrics
        agent_requests.labels(
            agent_type='research',
            status='success'
        ).inc()

        agent_latency.labels(agent_type='research').observe(
            time.time() - start
        )

        agent_tokens.labels(agent_type='research').observe(
            result.tokens_used
        )

        return result

    except Exception as e:
        agent_requests.labels(
            agent_type='research',
            status='error'
        ).inc()
        raise

    finally:
        active_agents.dec()

Structured Logging

import structlog

logger = structlog.get_logger()

def log_agent_execution(
    agent_id: str,
    task: str,
    result: dict,
    metadata: dict
):
    """Structured logging for agents"""
    logger.info(
        "agent_execution",
        agent_id=agent_id,
        task_preview=task[:100],
        status=result.get("status"),
        tokens_used=result.get("tokens"),
        latency_ms=metadata.get("latency") * 1000,
        user_id=metadata.get("user_id"),
        request_id=metadata.get("request_id")
    )

Error Handling

Retry Logic

from tenacity import retry, stop_after_attempt, wait_exponential

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def resilient_agent_call(task: str):
    """Agent call with automatic retry"""
    try:
        return await agent.process(task)
    except RateLimitError:
        # Don't retry rate limits
        raise
    except Exception as e:
        logger.warning(f"Agent call failed, will retry: {e}")
        raise

Circuit Breaker

class CircuitBreaker:
    """Prevent cascading failures"""
    def __init__(self, failure_threshold: int = 5, timeout: int = 60):
        self.failure_count = 0
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.last_failure_time = None
        self.state = "closed"  # closed, open, half_open

    async def call(self, func, *args, **kwargs):
        """Execute with circuit breaker"""
        if self.state == "open":
            if time.time() - self.last_failure_time > self.timeout:
                self.state = "half_open"
            else:
                raise CircuitOpen("Circuit breaker is open")

        try:
            result = await func(*args, **kwargs)

            # Success - reset
            if self.state == "half_open":
                self.state = "closed"
                self.failure_count = 0

            return result

        except Exception as e:
            self.failure_count += 1
            self.last_failure_time = time.time()

            if self.failure_count >= self.failure_threshold:
                self.state = "open"

            raise

Scaling Strategies

Horizontal Scaling

# Load balancer distributes across agent instances
class AgentPool:
    """Pool of agent instances"""
    def __init__(self, num_workers: int):
        self.workers = [
            AgentWorker(id=i) for i in range(num_workers)
        ]
        self.current_worker = 0

    async def process(self, task: str):
        """Round-robin distribution"""
        worker = self.workers[self.current_worker]
        self.current_worker = (self.current_worker + 1) % len(self.workers)

        return await worker.process(task)

Auto-Scaling

class AutoScaler:
    """Automatically scale agent workers"""
    def __init__(self, min_workers: int = 1, max_workers: int = 10):
        self.min_workers = min_workers
        self.max_workers = max_workers
        self.current_workers = min_workers

    async def check_and_scale(self, metrics: dict):
        """Scale based on metrics"""
        queue_length = metrics["queue_length"]
        avg_latency = metrics["avg_latency"]

        # Scale up if queue is growing
        if queue_length > 100 and self.current_workers < self.max_workers:
            await self.scale_up()

        # Scale down if idle
        elif queue_length < 10 and self.current_workers > self.min_workers:
            await self.scale_down()

    async def scale_up(self):
        """Add worker"""
        self.current_workers += 1
        # Start new worker process/container

    async def scale_down(self):
        """Remove worker"""
        self.current_workers -= 1
        # Stop idle worker

Health Checks

@app.get("/health")
async def health_check():
    """Service health endpoint"""
    health = {
        "status": "healthy",
        "checks": {}
    }

    # Check LLM API
    try:
        await client.messages.create(
            model="claude-sonnet-4-5-20250929",
            max_tokens=10,
            messages=[{"role": "user", "content": "test"}]
        )
        health["checks"]["llm_api"] = "ok"
    except:
        health["checks"]["llm_api"] = "failed"
        health["status"] = "unhealthy"

    # Check database
    try:
        db.ping()
        health["checks"]["database"] = "ok"
    except:
        health["checks"]["database"] = "failed"
        health["status"] = "unhealthy"

    # Check Redis
    try:
        redis_client.ping()
        health["checks"]["redis"] = "ok"
    except:
        health["checks"]["redis"] = "failed"
        health["status"] = "unhealthy"

    return health

Deployment Checklist

## Pre-Deployment

- [ ] Load testing completed
- [ ] Error handling tested
- [ ] Logging configured
- [ ] Monitoring dashboards set up
- [ ] Rate limits configured
- [ ] Security review passed
- [ ] Cost budgets set
- [ ] Backup and recovery tested
- [ ] Documentation updated
- [ ] Runbook created

## Post-Deployment

- [ ] Monitor error rates
- [ ] Track latency metrics
- [ ] Review costs daily
- [ ] Check logs for issues
- [ ] Test rollback procedure
- [ ] Collect user feedback

Related: Agent Evaluation & Testing

Connection Points

Prerequisites:

Related:

Testing:

  • Agent Evaluation & Testing β€” Pre-production testing

>> referenced by (10)

Agent Evaluation and Testing
..._rate_24h"] > 0.05: self.alert("Error rate above 5%") ``` Related: [[Production Agent Deployment]] Connection Points Prerequisites: - [[AI Agents Fundamentals]] β€” Agent...
Agent Frameworks Comparison
...self.spent += self.cost_per_call return response `` Related: [[Production Agent Deployment]] Migration Patterns From LangChain to LangGraph ``python Before: La...
Agent Memory Systems
...t) await self.vector_store.upsert_async(vector, metadata) ``` Related: [[Production Agent Deployment]] Connection Points Prerequisites: - [[AI Agents Fundamentals]] β€” Agent...
Agent Security Considerations
...ecurity testing completed - [ ] Incident response plan documented ``` Related: [[Production Agent Deployment]] Connection Points Prerequisites: - [[AI Agents Fundamentals]] β€” Agent...
AI Agents
...[Agent Security Considerations]] 🌿 β€” Prompt injection, tool safety, auditing - [[Production Agent Deployment]] 🌿 β€” Scaling, monitoring, and operations Production Considerations Ope...
AI Agents Fundamentals
...ts(code) return code `` Related: [[Building Agents with LangChain]], [[Production Agent Deployment]] Research Assistant ``python class ResearchAgent: async def research_...
Building Agents with LangChain
...β€” Tool patterns - [[Agent Memory Systems]] β€” LangChain memory Advanced: - [[Production Agent Deployment]] β€” Deploying LangChain agents - [[Agent Security Considerations]] β€” LangChain se...
Claude Agent Patterns
...ude agents - [[Agent Security Considerations]] β€” Claude safety Advanced: - [[Production Agent Deployment]] β€” Scaling Claude agents - [[Building Agents with LangChain]] β€” LangChain + Clau...
Multi-Agent Systems
...β€” CrewAI, LangGraph - [[Agent Evaluation & Testing]] β€” Testing collaborations - [[Production Agent Deployment]] β€” Scaling multiple agents
Tool Use and Function Calling
...safety - [[Building Agents with LangChain]] β€” LangChain tools Advanced: - [[Production Agent Deployment]] β€” Production tool management - [[Agent Evaluation & Testing]] β€” Testing tool re...