Lab L3.5: Monitoring and Observability
🎯 Assignment: Accept this lab on GitHub Classroom
You’ll get your own repository with starter code, instructions, and automatic grading.
| Â | Â |
|---|---|
| Duration | 75 minutes |
| Prerequisites | Previous module completed |
Objectives
- Add metrics collection
- Implement structured logging
- Create health endpoints
How to Complete This Lab
- Accept the Assignment — Click the GitHub Classroom link above
- Clone Your Repo —
git clone <your-repo-url> - Read the README — Your repo has detailed requirements and grading criteria
- Write Your Code — Implement the solution in
solution/agent.py - Test Locally — Use
swaig-testto verify your agent works - Push to Submit —
git pushtriggers auto-grading
Key Concepts
The following exercises walk through the concepts you’ll need. Your GitHub Classroom repo README has the specific requirements for grading.
Scenario
Add comprehensive observability to a customer service agent to monitor:
- Call volume and patterns
- Function performance
- Error rates
- Business metrics (resolutions, transfers)
Part 1: Structured Logging (25 min)
Create observable_agent.py
#!/usr/bin/env python3
"""Customer service agent with full observability."""
import os
import json
import time
import logging
from datetime import datetime
from signalwire_agents import AgentBase, SwaigFunctionResult
class JSONFormatter(logging.Formatter):
"""JSON log formatter for structured logging."""
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName
}
# Add extra fields
for key in ["call_id", "customer_id", "function_name",
"duration_ms", "error_type"]:
if hasattr(record, key):
log_data[key] = getattr(record, key)
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
def setup_logging():
"""Configure structured logging."""
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("agent")
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
class ObservableAgent(AgentBase):
def __init__(self):
super().__init__(name="observable-agent")
self.logger = setup_logging()
self.logger.info("Agent initializing")
self._configure_prompts()
self.add_language("English", "en-US", "rime.spore")
self._setup_functions()
self.logger.info("Agent initialized successfully")
def _configure_prompts(self):
self.prompt_add_section(
"Role",
"Customer service agent. Help with orders and support."
)
def _log_function_call(self, func_name: str, args: dict,
call_id: str, duration_ms: float,
success: bool, error: str = None):
"""Log function execution with context."""
extra = {
"call_id": call_id,
"function_name": func_name,
"duration_ms": round(duration_ms, 2)
}
if success:
self.logger.info(
f"Function {func_name} completed",
extra=extra
)
else:
extra["error_type"] = error
self.logger.error(
f"Function {func_name} failed: {error}",
extra=extra
)
def _setup_functions(self):
@self.tool(
description="Look up order status",
parameters={
"type": "object",
"properties": {
"order_id": {"type": "string"}
},
"required": ["order_id"]
}
)
def get_order_status(args: dict, raw_data: dict = None) -> SwaigFunctionResult:
order_id = args.get("order_id", "")
call_id = raw_data.get("call_id", "unknown")
start = time.perf_counter()
try:
# Simulated order lookup
time.sleep(0.2) # Simulated latency
status = "shipped"
duration_ms = (time.perf_counter() - start) * 1000
self._log_function_call(
"get_order_status",
{"order_id": order_id},
call_id,
duration_ms,
success=True
)
return SwaigFunctionResult(f"Order {order_id}: {status}")
except Exception as e:
duration_ms = (time.perf_counter() - start) * 1000
self._log_function_call(
"get_order_status",
{"order_id": order_id},
call_id,
duration_ms,
success=False,
error=str(e)
)
return SwaigFunctionResult(
"I'm having trouble looking up that order."
)
@self.tool(
description="Create support ticket",
parameters={
"type": "object",
"properties": {
"issue": {"type": "string"},
"priority": {
"type": "string",
"enum": ["low", "medium", "high"]
}
},
"required": ["issue"]
}
)
def create_ticket(args: dict, raw_data: dict = None) -> SwaigFunctionResult:
issue = args.get("issue", "")
priority = args.get("priority", "medium")
call_id = raw_data.get("call_id", "unknown") if raw_data else "unknown"
start = time.perf_counter()
ticket_id = f"TKT-{datetime.now().strftime('%Y%m%d%H%M%S')}"
duration_ms = (time.perf_counter() - start) * 1000
self._log_function_call(
"create_ticket",
{"priority": priority},
call_id,
duration_ms,
success=True
)
# Log business event
self.logger.info(
"Ticket created",
extra={
"call_id": call_id,
"ticket_id": ticket_id,
"priority": priority
}
)
return SwaigFunctionResult(f"Created ticket {ticket_id}.")
@self.tool(description="Transfer to specialist")
def transfer_specialist(args: dict, raw_data: dict = None) -> SwaigFunctionResult:
department = args.get("department", "")
call_id = raw_data.get("call_id", "unknown")
self.logger.info(
"Transfer initiated",
extra={
"call_id": call_id,
"department": department
}
)
return (
SwaigFunctionResult(f"Transferring to {department}.")
.connect(f"/agents/{department}", final=True)
)
if __name__ == "__main__":
agent = ObservableAgent()
agent.run()
Part 2: Prometheus Metrics (30 min)
Add Metrics
Create metrics.py:
"""Prometheus metrics for voice agent."""
from prometheus_client import Counter, Histogram, Gauge, start_http_server
# Call metrics
CALLS_TOTAL = Counter(
'voice_agent_calls_total',
'Total calls received',
['agent', 'status']
)
ACTIVE_CALLS = Gauge(
'voice_agent_active_calls',
'Currently active calls',
['agent']
)
# Function metrics
FUNCTION_CALLS = Counter(
'voice_agent_function_calls_total',
'Total function calls',
['agent', 'function', 'status']
)
FUNCTION_LATENCY = Histogram(
'voice_agent_function_latency_seconds',
'Function execution latency',
['agent', 'function'],
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
# Business metrics
TICKETS_CREATED = Counter(
'voice_agent_tickets_total',
'Support tickets created',
['agent', 'priority']
)
TRANSFERS_TOTAL = Counter(
'voice_agent_transfers_total',
'Call transfers',
['agent', 'department']
)
RESOLUTIONS_TOTAL = Counter(
'voice_agent_resolutions_total',
'Issues resolved by agent',
['agent', 'resolution_type']
)
# Error metrics
ERRORS_TOTAL = Counter(
'voice_agent_errors_total',
'Total errors',
['agent', 'function', 'error_type']
)
def start_metrics_server(port: int = 9090):
"""Start Prometheus metrics server."""
start_http_server(port)
print(f"Metrics server started on port {port}")
Integrate Metrics
Update observable_agent.py:
from metrics import (
CALLS_TOTAL, ACTIVE_CALLS, FUNCTION_CALLS, FUNCTION_LATENCY,
TICKETS_CREATED, TRANSFERS_TOTAL, ERRORS_TOTAL, start_metrics_server
)
class MetricsObservableAgent(ObservableAgent):
def __init__(self):
super().__init__()
# Start metrics server
start_metrics_server(9090)
def _log_function_call(self, func_name: str, args: dict,
call_id: str, duration_ms: float,
success: bool, error: str = None):
# Parent logging
super()._log_function_call(
func_name, args, call_id, duration_ms, success, error
)
# Prometheus metrics
status = "success" if success else "error"
FUNCTION_CALLS.labels(
agent="observable-agent",
function=func_name,
status=status
).inc()
FUNCTION_LATENCY.labels(
agent="observable-agent",
function=func_name
).observe(duration_ms / 1000) # Convert to seconds
if not success:
ERRORS_TOTAL.labels(
agent="observable-agent",
function=func_name,
error_type=error or "unknown"
).inc()
def _setup_functions(self):
# Inherit parent functions and add metrics
@self.tool(description="Create support ticket")
def create_ticket(args: dict, raw_data: dict = None) -> SwaigFunctionResult:
issue = args.get("issue", "")
priority = args.get("priority", "medium")
# ... existing implementation ...
# Add business metric
TICKETS_CREATED.labels(
agent="observable-agent",
priority=priority
).inc()
return SwaigFunctionResult(f"Created ticket {ticket_id}.")
@self.tool(description="Transfer to specialist")
def transfer_specialist(args: dict, raw_data: dict = None) -> SwaigFunctionResult:
department = args.get("department", "")
# ... existing implementation ...
# Add business metric
TRANSFERS_TOTAL.labels(
agent="observable-agent",
department=department
).inc()
return (
SwaigFunctionResult(f"Transferring to {department}.")
.connect(f"/agents/{department}", final=True)
)
Part 3: Health Endpoint (15 min)
Add Health Check
from fastapi import FastAPI
from datetime import datetime
import psutil
# After creating AgentServer
@server.app.get("/health")
async def health():
"""Comprehensive health check."""
process = psutil.Process()
checks = {
"agent": {"status": "healthy"},
"memory_mb": round(process.memory_info().rss / 1024 / 1024, 2),
"cpu_percent": process.cpu_percent()
}
# Check if agent can generate SWML
try:
agent.get_swml()
checks["swml_generation"] = {"status": "healthy"}
except Exception as e:
checks["swml_generation"] = {"status": "unhealthy", "error": str(e)}
all_healthy = all(
c.get("status") == "healthy"
for c in checks.values()
if isinstance(c, dict)
)
return {
"status": "healthy" if all_healthy else "degraded",
"timestamp": datetime.utcnow().isoformat(),
"checks": checks
}
@server.app.get("/metrics")
async def metrics_info():
"""Metrics endpoint info."""
return {
"metrics_url": "http://localhost:9090/metrics",
"available_metrics": [
"voice_agent_calls_total",
"voice_agent_function_calls_total",
"voice_agent_function_latency_seconds",
"voice_agent_tickets_total",
"voice_agent_transfers_total",
"voice_agent_errors_total"
]
}
Part 4: Alert Configuration (20 min)
Create alerts.yml
groups:
- name: voice_agent_alerts
rules:
# High error rate
- alert: VoiceAgentHighErrorRate
expr: |
sum(rate(voice_agent_errors_total[5m]))
/ sum(rate(voice_agent_function_calls_total[5m])) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "Voice agent error rate > 5%"
description: "Error rate is %"
# Slow function latency
- alert: VoiceAgentSlowLatency
expr: |
histogram_quantile(0.95,
rate(voice_agent_function_latency_seconds_bucket[5m])
) > 2
for: 5m
labels:
severity: warning
annotations:
summary: "Voice agent P95 latency > 2s"
description: "P95 latency: s"
# High transfer rate
- alert: VoiceAgentHighTransferRate
expr: |
sum(rate(voice_agent_transfers_total[1h]))
/ sum(rate(voice_agent_calls_total[1h])) > 0.4
for: 30m
labels:
severity: warning
annotations:
summary: "Transfer rate > 40%"
description: "High transfer rate may indicate issues"
# No activity
- alert: VoiceAgentNoActivity
expr: |
sum(rate(voice_agent_function_calls_total[15m])) == 0
for: 15m
labels:
severity: warning
annotations:
summary: "No agent activity for 15 minutes"
Testing
# Run agent with metrics
python observable_agent.py
# Test function and check logs
swaig-test observable_agent.py --exec get_order_status \
--order_id "ORD-12345"
# Check metrics endpoint
curl http://localhost:9090/metrics
# Check health
curl http://localhost:3000/health
# Filter logs
python observable_agent.py 2>&1 | jq 'select(.level=="ERROR")'
# Count function calls in logs
python observable_agent.py 2>&1 | jq 'select(.function_name)' | wc -l
Grafana Dashboard (Optional)
Create dashboard.json
{
"dashboard": {
"title": "Voice Agent Dashboard",
"panels": [
{
"title": "Function Calls / Minute",
"type": "graph",
"gridPos": {"x": 0, "y": 0, "w": 12, "h": 8},
"targets": [{
"expr": "sum(rate(voice_agent_function_calls_total[1m])) by (function) * 60",
"legendFormat": ""
}]
},
{
"title": "Error Rate",
"type": "gauge",
"gridPos": {"x": 12, "y": 0, "w": 6, "h": 8},
"targets": [{
"expr": "sum(rate(voice_agent_errors_total[5m])) / sum(rate(voice_agent_function_calls_total[5m])) * 100"
}],
"options": {
"thresholds": [
{"value": 0, "color": "green"},
{"value": 2, "color": "yellow"},
{"value": 5, "color": "red"}
]
}
},
{
"title": "P95 Latency",
"type": "graph",
"gridPos": {"x": 0, "y": 8, "w": 12, "h": 8},
"targets": [{
"expr": "histogram_quantile(0.95, rate(voice_agent_function_latency_seconds_bucket[5m]))",
"legendFormat": ""
}]
},
{
"title": "Tickets by Priority",
"type": "piechart",
"gridPos": {"x": 12, "y": 8, "w": 6, "h": 8},
"targets": [{
"expr": "sum(voice_agent_tickets_total) by (priority)",
"legendFormat": ""
}]
}
]
}
}
Validation Checklist
- JSON structured logging implemented
- Logs include call_id, function_name, duration
- Prometheus metrics defined
- Metrics server running on port 9090
- Health endpoint returns system status
- Alert rules defined
- Grafana dashboard configured (optional)
Submission
Submit:
observable_agent.pymetrics.pyalerts.ymldashboard.json(optional)- Sample log output
Complete Agent Code
Click to reveal complete solution
#!/usr/bin/env python3
"""Customer service agent with full observability.
Lab 3.5 Deliverable: Demonstrates structured logging, Prometheus metrics,
health endpoints, and comprehensive monitoring patterns.
"""
import os
import json
import time
import logging
from datetime import datetime
from signalwire_agents import AgentBase, AgentServer, SwaigFunctionResult
# Try to import prometheus_client, but don't fail if not available
try:
from prometheus_client import Counter, Histogram, Gauge, start_http_server
PROMETHEUS_AVAILABLE = True
except ImportError:
PROMETHEUS_AVAILABLE = False
# ============================================================
# Structured Logging Setup
# ============================================================
class JSONFormatter(logging.Formatter):
"""JSON log formatter for structured logging."""
def format(self, record):
log_data = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName
}
# Add extra fields if present
extra_fields = [
"call_id", "customer_id", "function_name",
"duration_ms", "error_type", "ticket_id",
"priority", "department"
]
for key in extra_fields:
if hasattr(record, key):
log_data[key] = getattr(record, key)
if record.exc_info:
log_data["exception"] = self.formatException(record.exc_info)
return json.dumps(log_data)
def setup_logging():
"""Configure structured JSON logging."""
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("agent")
logger.addHandler(handler)
logger.setLevel(logging.INFO)
return logger
# ============================================================
# Prometheus Metrics (if available)
# ============================================================
if PROMETHEUS_AVAILABLE:
# Call metrics
CALLS_TOTAL = Counter(
'voice_agent_calls_total',
'Total calls received',
['agent', 'status']
)
ACTIVE_CALLS = Gauge(
'voice_agent_active_calls',
'Currently active calls',
['agent']
)
# Function metrics
FUNCTION_CALLS = Counter(
'voice_agent_function_calls_total',
'Total function calls',
['agent', 'function', 'status']
)
FUNCTION_LATENCY = Histogram(
'voice_agent_function_latency_seconds',
'Function execution latency',
['agent', 'function'],
buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
# Business metrics
TICKETS_CREATED = Counter(
'voice_agent_tickets_total',
'Support tickets created',
['agent', 'priority']
)
TRANSFERS_TOTAL = Counter(
'voice_agent_transfers_total',
'Call transfers',
['agent', 'department']
)
# Error metrics
ERRORS_TOTAL = Counter(
'voice_agent_errors_total',
'Total errors',
['agent', 'function', 'error_type']
)
# ============================================================
# Observable Agent
# ============================================================
class ObservableAgent(AgentBase):
"""Customer service agent with comprehensive observability."""
def __init__(self):
super().__init__(name="observable-agent")
self.logger = setup_logging()
self.logger.info("Agent initializing")
self._configure_prompts()
self.add_language("English", "en-US", "rime.spore")
self._setup_functions()
self.logger.info("Agent initialized successfully")
def _configure_prompts(self):
"""Configure agent prompts."""
self.prompt_add_section(
"Role",
"Customer service agent. Help with orders and support."
)
self.prompt_add_section(
"Guidelines",
bullets=[
"Be helpful and efficient",
"Create tickets for complex issues",
"Transfer to specialists when needed"
]
)
def _log_function_call(
self,
func_name: str,
call_id: str,
duration_ms: float,
success: bool,
error: str = None
):
"""Log function execution with context and metrics."""
extra = {
"call_id": call_id,
"function_name": func_name,
"duration_ms": round(duration_ms, 2)
}
if success:
self.logger.info(f"Function {func_name} completed", extra=extra)
else:
extra["error_type"] = error
self.logger.error(f"Function {func_name} failed: {error}", extra=extra)
# Update Prometheus metrics if available
if PROMETHEUS_AVAILABLE:
status = "success" if success else "error"
FUNCTION_CALLS.labels(
agent="observable-agent",
function=func_name,
status=status
).inc()
FUNCTION_LATENCY.labels(
agent="observable-agent",
function=func_name
).observe(duration_ms / 1000)
if not success:
ERRORS_TOTAL.labels(
agent="observable-agent",
function=func_name,
error_type=error or "unknown"
).inc()
def _setup_functions(self):
"""Define observable functions."""
@self.tool(
description="Look up order status",
parameters={
"type": "object",
"properties": {
"order_id": {"type": "string"}
},
"required": ["order_id"]
},
fillers=["Looking up your order..."]
)
def get_order_status(order_id: str, raw_data: dict) -> SwaigFunctionResult:
call_id = raw_data.get("call_id", "unknown")
start = time.perf_counter()
try:
# Simulated order lookup
time.sleep(0.2)
status = "shipped"
duration_ms = (time.perf_counter() - start) * 1000
self._log_function_call(
"get_order_status",
call_id,
duration_ms,
success=True
)
return SwaigFunctionResult(f"Order {order_id}: {status}")
except Exception as e:
duration_ms = (time.perf_counter() - start) * 1000
self._log_function_call(
"get_order_status",
call_id,
duration_ms,
success=False,
error=str(e)
)
return SwaigFunctionResult(
"I'm having trouble looking up that order. "
"Can I help with something else?"
)
@self.tool(
description="Create a support ticket",
parameters={
"type": "object",
"properties": {
"issue": {"type": "string"},
"priority": {
"type": "string",
"enum": ["low", "medium", "high"]
}
},
"required": ["issue"]
}
)
def create_ticket(
issue: str,
priority: str = "medium",
raw_data: dict = None
) -> SwaigFunctionResult:
call_id = raw_data.get("call_id", "unknown") if raw_data else "unknown"
start = time.perf_counter()
ticket_id = f"TKT-{datetime.now().strftime('%Y%m%d%H%M%S')}"
duration_ms = (time.perf_counter() - start) * 1000
self._log_function_call(
"create_ticket",
call_id,
duration_ms,
success=True
)
# Log business event
self.logger.info(
"Ticket created",
extra={
"call_id": call_id,
"ticket_id": ticket_id,
"priority": priority
}
)
# Update business metrics
if PROMETHEUS_AVAILABLE:
TICKETS_CREATED.labels(
agent="observable-agent",
priority=priority
).inc()
return (
SwaigFunctionResult(f"Created ticket {ticket_id}.")
.update_global_data({
"ticket_id": ticket_id,
"ticket_priority": priority
})
)
@self.tool(
description="Transfer to specialist",
parameters={
"type": "object",
"properties": {
"department": {
"type": "string",
"enum": ["sales", "support", "billing"]
}
},
"required": ["department"]
}
)
def transfer_specialist(
department: str,
raw_data: dict
) -> SwaigFunctionResult:
call_id = raw_data.get("call_id", "unknown")
self.logger.info(
"Transfer initiated",
extra={
"call_id": call_id,
"department": department
}
)
# Update business metrics
if PROMETHEUS_AVAILABLE:
TRANSFERS_TOTAL.labels(
agent="observable-agent",
department=department
).inc()
return (
SwaigFunctionResult(f"Transferring to {department}.", post_process=True)
.swml_transfer(f"/agents/{department}", "Goodbye!", final=True)
)
@self.tool(description="Get system status")
def system_status() -> SwaigFunctionResult:
return SwaigFunctionResult("All systems operational.")
# ============================================================
# Server with Health Endpoints
# ============================================================
def create_server():
"""Create server with health and metrics endpoints."""
host = os.getenv("HOST", "0.0.0.0")
port = int(os.getenv("PORT", "3000"))
metrics_port = int(os.getenv("METRICS_PORT", "9090"))
server = AgentServer(host=host, port=port)
agent = ObservableAgent()
server.register(agent)
# Start Prometheus metrics server if available
if PROMETHEUS_AVAILABLE:
try:
start_http_server(metrics_port)
print(f"Metrics server started on port {metrics_port}")
except Exception as e:
print(f"Could not start metrics server: {e}")
# Health endpoint
@server.app.get("/health")
async def health():
return {
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"version": os.getenv("APP_VERSION", "1.0.0")
}
# Readiness endpoint
@server.app.get("/ready")
async def ready():
return {"ready": True}
# Detailed health check
@server.app.get("/health/detailed")
async def health_detailed():
checks = {
"agent": {"status": "healthy"},
"metrics": {"status": "healthy" if PROMETHEUS_AVAILABLE else "unavailable"}
}
try:
# Test SWML generation
agent.get_swml()
checks["swml_generation"] = {"status": "healthy"}
except Exception as e:
checks["swml_generation"] = {"status": "unhealthy", "error": str(e)}
all_healthy = all(
c.get("status") == "healthy"
for c in checks.values()
if isinstance(c, dict) and c.get("status") != "unavailable"
)
return {
"status": "healthy" if all_healthy else "degraded",
"timestamp": datetime.utcnow().isoformat(),
"checks": checks
}
# Metrics info endpoint
@server.app.get("/metrics/info")
async def metrics_info():
return {
"metrics_available": PROMETHEUS_AVAILABLE,
"metrics_port": metrics_port if PROMETHEUS_AVAILABLE else None,
"available_metrics": [
"voice_agent_calls_total",
"voice_agent_function_calls_total",
"voice_agent_function_latency_seconds",
"voice_agent_tickets_total",
"voice_agent_transfers_total",
"voice_agent_errors_total"
] if PROMETHEUS_AVAILABLE else []
}
return server
if __name__ == "__main__":
server = create_server()
server.run()