Documentation Index
Fetch the complete documentation index at: https://patter-06b046ce-feat-observability-otel-attrs-0-6-1.mintlify.app/llms.txt
Use this file to discover all available pages before exploring further.
Events & Callbacks
Patter fires async callbacks at key moments in the call lifecycle. Use them to log calls, update CRMs, trigger workflows, or control conversation flow.
All callbacks are async functions. They are passed as parameters to serve().
Available Callbacks
| Callback | Trigger |
|---|
on_call_start | A call connects |
on_call_end | A call ends |
on_transcript | Each utterance is transcribed |
on_message | User message received (pipeline mode) |
on_metrics | After each conversation turn (real-time cost/latency) |
For fine-grained pipeline observability (every interim transcript, every LLM chunk, every TTS chunk, every tool start) subscribe to the EventBus below — it complements these callbacks rather than replacing them.
For mutating prompts and responses (RAG augmentation, output validation, PII redaction) use PipelineHooks — they sit inside the LLM step rather than firing alongside it.
on_call_start
Fires when a call connects. Use it to log call starts, initialize state, or fetch customer data.
async def on_call_start(event):
print(f"Call started: {event['call_id']}")
print(f"Caller: {event['caller']}")
print(f"Callee: {event['callee']}")
print(f"Direction: {event['direction']}")
print(f"Custom params: {event.get('custom_params', {})}")
Event Fields
| Field | Type | Description |
|---|
call_id | str | Unique identifier for this call. |
caller | str | The caller’s phone number (E.164). |
callee | str | The callee’s phone number (E.164). |
direction | str | "inbound" or "outbound". |
custom_params | dict | Custom parameters passed with the call (if any). |
on_call_end
Fires when a call ends. Use it to save transcripts, calculate duration, or trigger post-call workflows.
async def on_call_end(event):
print(f"Call ended: {event['call_id']}")
for entry in event["transcript"]:
print(f" [{entry['role']}]: {entry['text']}")
Event Fields
| Field | Type | Description |
|---|
call_id | str | Unique identifier for this call. |
caller | str | The caller’s phone number (E.164). |
callee | str | The callee’s phone number (E.164). |
ended_at | float | Unix timestamp when the call ended (e.g. 1710489601.234). |
transcript | list[dict] | Full conversation transcript. Each entry has role ("user" or "assistant") and text. |
metrics | CallMetrics | None | Call metrics with cost and latency breakdowns. None if metrics collection failed. See Metrics & Cost Tracking. |
on_transcript
Fires each time an utterance is transcribed during the call. Use it for real-time logging, sentiment analysis, or live dashboards.
async def on_transcript(event):
print(f"[{event['role']}] {event['text']}")
# Access conversation history so far
for entry in event.get("history", []):
pass # {role, text, timestamp}
Event Fields
| Field | Type | Description |
|---|
role | str | "user" or "assistant". |
text | str | The transcribed text. |
call_id | str | Unique identifier for this call. |
history | list[dict] | Conversation history so far. Each entry has role, text, and timestamp. |
on_message
Fires when a user message is received in pipeline mode. Your callback processes the message and returns the agent’s response as a string, which is then synthesized to speech.
on_message is only used in pipeline mode (when you pass stt= / tts= instead of engine=). In engine mode (OpenAI Realtime, ElevenLabs ConvAI) the engine handles responses directly.
async def on_message(event) -> str:
user_text = event["text"]
call_id = event["call_id"]
caller = event["caller"]
history = event.get("history", [])
# Your custom logic here — call an LLM, query a database, etc.
response = await my_llm_handler(user_text, history)
return response
Event Fields
| Field | Type | Description |
|---|
text | str | The user’s transcribed message. |
call_id | str | Unique identifier for this call. |
caller | str | The caller’s phone number. |
callee | str | The callee’s phone number. |
history | list[dict] | Conversation history. Each entry has role, text, and timestamp. |
Return Value
Return a str with the agent’s response. This text is sent to the TTS provider and played back to the caller.
EventBus
The EventBus exposes fine-grained pipeline events that don’t have first-class callbacks. Subscribe with on(event_type, handler) from inside on_call_start (or any place you have a Patter reference).
from getpatter import EventBus, PatterEventType
# `phone.events` is the per-process EventBus.
phone.events.on(PatterEventType.TRANSCRIPT_PARTIAL, lambda ev: print("partial:", ev["text"]))
phone.events.on(PatterEventType.LLM_CHUNK, lambda ev: log_chunk(ev["call_id"], ev["text"]))
PatterEventType | Fires |
|---|
TRANSCRIPT_PARTIAL | Every interim STT result (before endpointing). |
TRANSCRIPT_FINAL | Every final STT result (after endpointing). Same payload as on_transcript. |
LLM_CHUNK | Every streamed LLM token / chunk. |
TTS_CHUNK | Every TTS audio chunk written to the carrier. |
TOOL_CALL_STARTED | Tool dispatched (paired with the existing tool_call_completed you can observe via on_call_end). |
Handlers are non-blocking (run in a fire-and-forget task). Throwing inside a handler logs the error but does not interrupt the call.
Pipeline Hooks
PipelineHooks lets you intercept data at each stage of the pipeline mode STT → LLM → TTS pipeline. Pass an instance via phone.agent(hooks=...). Hooks may be sync or async; if a hook throws, the error is logged and the original value passes through unchanged (fail-open).
from getpatter import PipelineHooks
hooks = PipelineHooks(
before_send_to_stt=..., # raw PCM in → drop chunk by returning None
after_transcribe=..., # transcript in → skip turn by returning None
before_llm=..., # messages list in → replace by returning new list
after_llm=..., # see "after_llm 3-tier API" below
before_synthesize=..., # sentence in → skip TTS for sentence by returning None
after_synthesize=..., # audio chunk in → discard by returning None
)
agent = phone.agent(stt=..., llm=..., tts=..., hooks=hooks, system_prompt="...")
after_llm — 3-tier API
after_llm accepts either a dict with on_chunk / on_sentence / on_response keys, or any object exposing those attributes (dataclass, custom class, Protocol implementation).
# Recommended: 3-tier API
after_llm = {
"on_chunk": lambda chunk: chunk.replace("um", ""), # sync, ~0 ms
"on_sentence": async_redact_pii, # async, 50-300 ms
"on_response": async_validate_json_schema, # async, 500 ms-2 s, BLOCKS streaming TTS
}
hooks = PipelineHooks(after_llm=after_llm)
| Tier | Sync/Async | Latency budget | When it runs | Return semantics |
|---|
on_chunk(chunk: str) -> str | sync | ~0 ms | Per LLM token chunk, before sentence aggregation | Return new string. Use for cheap text rewrites. |
on_sentence(sentence: str, ctx: HookContext) -> str | None | async | 50–300 ms | Per complete sentence, between chunker and TTS | Return new sentence, None to keep original, or "" to drop the sentence. |
on_response(text: str, ctx: HookContext) -> str | None | async | 500 ms–2 s | Once at end of LLM stream, blocks streaming TTS | Return new text, or None to keep original. |
Pick the lowest tier that does the job — on_chunk for fast string ops, on_sentence for per-sentence I/O (PII redaction, translation), on_response only when you need the whole response (JSON-schema validation, full-context moderation).
Migration: legacy after_llm callable
The legacy single-callable form is still supported for backward compatibility but is deprecated:
# Legacy (deprecated, removed in v0.7.0):
hooks = PipelineHooks(
after_llm=lambda text, ctx: text.upper(),
)
The legacy callable is mapped internally to the on_response slot and emits a one-shot PatterDeprecationWarning on first use. Migrate to the 3-tier dict to silence the warning and unlock the lower-latency on_chunk / on_sentence tiers.
# Migrated:
hooks = PipelineHooks(
after_llm={"on_response": async_uppercase},
)
HookContext
Hooks that take a ctx argument receive a frozen HookContext dataclass:
@dataclass(frozen=True)
class HookContext:
call_id: str
caller: str
callee: str
history: tuple[dict, ...] = ()
PipelineHooks also exposes before_stt / after_stt and before_tts / after_tts for audio-stage interception. See the API Reference for the full signature.
Conversation History
All callbacks that include history receive it as a list of dictionaries:
[
{"role": "assistant", "text": "Hello! How can I help?", "timestamp": 1710489601.234},
{"role": "user", "text": "I'd like to check my order status.", "timestamp": 1710489605.891},
{"role": "assistant", "text": "Sure! What's your order ID?", "timestamp": 1710489606.712},
]
Timestamps are Unix floats (from Python’s time.time()), not ISO-8601 strings.
Complete Example
import os
import asyncio
from dotenv import load_dotenv
from getpatter import Patter, Twilio, OpenAIRealtime
load_dotenv()
phone = Patter(
carrier=Twilio(), # TWILIO_* from env
phone_number=os.environ["PHONE_NUMBER"],
webhook_url=os.environ["WEBHOOK_URL"],
)
agent = phone.agent(
engine=OpenAIRealtime(), # OPENAI_API_KEY from env
system_prompt="You are a helpful assistant.",
first_message="Hi there! What can I do for you?",
)
async def on_call_start(event):
print(f"[START] Call {event['call_id']} from {event['caller']} ({event['direction']})")
async def on_call_end(event):
print(f"[END] Call {event['call_id']}")
print(f" Transcript ({len(event['transcript'])} messages):")
for entry in event["transcript"]:
print(f" [{entry['role']}]: {entry['text']}")
async def on_transcript(event):
print(f" [{event['role']}]: {event['text']}")
async def main():
await phone.serve(
agent,
port=8000,
on_call_start=on_call_start,
on_call_end=on_call_end,
on_transcript=on_transcript,
)
asyncio.run(main())
Speech-Edge Events (Turn-Taking)
The callbacks above describe the transcript-level lifecycle of a call. For turn-taking instrumentation — barge-in, end-of-utterance, time-to-first-token, TTS warmup vs. wire-time — Patter exposes seven additional async callbacks plus a read-only conversation_state snapshot directly on the Patter instance.
These events expose the canonical voice-agent metric set (user/agent state transitions, turn boundaries, TTFT, audio first-byte) and align with OpenAI Realtime (input_audio_buffer.speech_started/_stopped/_committed) so downstream metrics work without translation.
Every callback defaults to None. Existing code that does not register any speech-edge callback sees exactly the previous behaviour and zero overhead. The state machine is updated regardless of whether callbacks are registered, so conversation_state is always usable.
The seven events
| Event | Fires on | Signal |
|---|
on_user_speech_started | VAD positive edge of inbound audio | Raw VAD start — not end-of-utterance. Use for cross-talk detection. |
on_user_speech_ended | VAD trailing edge | Raw VAD stop — not committed EOU. Use for talk-ratio. |
on_user_speech_eos | Committed end-of-utterance | Canonical “user finished” signal. Anchor eos_to_first_token_ms here. |
on_agent_speech_started | First wire-time chunk of the agent turn | What the user actually hears (distinct from TTS warmup). Anchor barge-in latency here. |
on_agent_speech_ended | Last wire chunk of the agent turn | Payload includes interrupted: bool. True = barge-in cancelled the turn. |
on_llm_token | First LLM token of the turn | TTFT marker. Idempotent — fires once per turn. |
on_audio_out | First TTS audio chunk produced | TTS warmup arrival (distinct from wire-time). Idempotent — fires once per turn. |
Payload signature matrix
async def on_user_speech_started(event: dict) -> None:
# event = {
# "timestamp_ms": int,
# "vad_confidence": float | omitted,
# "audio_offset_ms": int | omitted,
# }
...
async def on_user_speech_ended(event: dict) -> None:
# event = {
# "timestamp_ms": int,
# "speech_duration_ms": int,
# "vad_confidence": float | omitted,
# "audio_offset_ms": int | omitted,
# }
...
async def on_user_speech_eos(event: dict) -> None:
# event = {
# "timestamp_ms": int,
# "trigger": "vad_silence" | "semantic_turn_detector" | "manual_commit",
# "trailing_silence_ms": int | omitted,
# "transcript_so_far": str | omitted,
# }
...
async def on_agent_speech_started(event: dict) -> None:
# event = {
# "timestamp_ms": int,
# "turn_idx": int,
# "tts_provider": str | omitted,
# "engine": str | omitted,
# }
...
async def on_agent_speech_ended(event: dict) -> None:
# event = {
# "timestamp_ms": int,
# "turn_idx": int,
# "speech_duration_ms": int,
# "interrupted": bool,
# }
...
async def on_llm_token(event: dict) -> None:
# event = {
# "timestamp_ms": int,
# "turn_idx": int,
# "llm_provider": str,
# "model": str,
# }
...
async def on_audio_out(event: dict) -> None:
# event = {
# "timestamp_ms": int,
# "turn_idx": int,
# "tts_provider": str,
# }
...
Compute end-to-end latency by anchoring eos_to_first_token_ms to on_user_speech_eos. It marks the moment the SDK has committed that the user is done speaking — VAD trailing edge plus trailing silence (and optionally a semantic turn-detector agreement). Anchoring to on_user_speech_ended instead would over-count by the silence window and double-fire on mid-utterance VAD blips. Hamming AI thresholds: <800 ms good, >1500 ms critical.
State machine
conversation_state returns a snapshot {"user": <user_state>, "agent": <agent_state>} you can read at any time:
| Side | States | Initial | Set by |
|---|
user | listening · speaking · thinking · away | listening | on_user_speech_started → speaking, on_user_speech_ended / on_user_speech_eos → listening |
agent | initializing · idle · listening · thinking · speaking | initializing | call accepted → idle, EOU committed → thinking, on_agent_speech_started → speaking, on_agent_speech_ended → idle |
A monotonic turn_idx counter (also exposed on the dispatcher) increments on every committed EOU. The agent_speech_*, llm_token, and audio_out payloads all carry the current turn_idx so a per-turn metric can correlate them.
Sequence for a normal turn
user audio in → on_user_speech_started (user → speaking)
silence detected → on_user_speech_ended (user → listening)
silence + commit → on_user_speech_eos (turn_idx += 1, agent → thinking)
LLM streams → on_llm_token (once) (TTFT)
TTS produces audio → on_audio_out (once) (TTS warmup)
audio hits wire → on_agent_speech_started (agent → speaking)
last chunk → on_agent_speech_ended (agent → idle, interrupted=False)
Sequence for a barged-in turn
on_agent_speech_started (agent → speaking)
... user starts talking over the agent ...
on_user_speech_started (user → speaking)
on_agent_speech_ended { interrupted: True } (agent → idle)
on_user_speech_eos (turn_idx += 1, new turn begins)
Full example — wire all seven callbacks
import asyncio
from getpatter import Patter, Twilio, OpenAIRealtime
phone = Patter(carrier=Twilio(), phone_number="+15555550100")
agent = phone.agent(
engine=OpenAIRealtime(),
system_prompt="You are a helpful assistant.",
)
# --- raw VAD edges ---------------------------------------------------------
async def on_user_speech_started(ev):
# Raw VAD positive edge — user might still be mid-utterance.
print(f"[vad+] t={ev['timestamp_ms']} state={phone.conversation_state}")
async def on_user_speech_ended(ev):
# Raw VAD trailing edge — NOT committed EOU. User may resume in 100ms.
print(f"[vad-] dur={ev['speech_duration_ms']}ms")
# --- canonical 'user finished' signal --------------------------------------
async def on_user_speech_eos(ev):
# Committed EOU. This is the timestamp to anchor TTFT against.
print(f"[eos] trigger={ev['trigger']} silence={ev.get('trailing_silence_ms')}ms")
on_user_speech_eos.last_eos_ms = ev["timestamp_ms"]
on_user_speech_eos.last_eos_ms = 0
# --- model + audio first-fire markers --------------------------------------
async def on_llm_token(ev):
ttft = ev["timestamp_ms"] - on_user_speech_eos.last_eos_ms
print(f"[ttft] {ttft}ms model={ev['model']} provider={ev['llm_provider']}")
async def on_audio_out(ev):
# TTS warmup — bytes produced, not yet on the wire.
print(f"[tts ] turn={ev['turn_idx']} provider={ev['tts_provider']}")
# --- what the user hears + barge-in detection ------------------------------
async def on_agent_speech_started(ev):
print(f"[wire] turn={ev['turn_idx']} engine={ev.get('engine')}")
async def on_agent_speech_ended(ev):
if ev["interrupted"]:
print(f"[barge] turn={ev['turn_idx']} cut at {ev['speech_duration_ms']}ms")
else:
print(f"[done] turn={ev['turn_idx']} spoke {ev['speech_duration_ms']}ms")
# Wire them all up — these are simple attribute assignments on the Patter
# instance; no `serve()` argument needed.
phone.on_user_speech_started = on_user_speech_started
phone.on_user_speech_ended = on_user_speech_ended
phone.on_user_speech_eos = on_user_speech_eos
phone.on_llm_token = on_llm_token
phone.on_audio_out = on_audio_out
phone.on_agent_speech_started = on_agent_speech_started
phone.on_agent_speech_ended = on_agent_speech_ended
asyncio.run(phone.serve(agent, port=8000))
Barge-in detection
The cleanest way to detect a barge-in is to inspect on_agent_speech_ended.interrupted:
barge_ins: list[dict] = []
async def on_agent_speech_ended(ev):
if ev["interrupted"]:
barge_ins.append({
"turn_idx": ev["turn_idx"],
"spoke_for_ms": ev["speech_duration_ms"],
"at_ms": ev["timestamp_ms"],
})
phone.on_agent_speech_ended = on_agent_speech_ended
For barge-in latency (how fast the agent stopped after the user started talking), pair on_user_speech_started with the next on_agent_speech_ended({"interrupted": True}):
last_user_start_ms: int | None = None
async def on_user_speech_started(ev):
global last_user_start_ms
last_user_start_ms = ev["timestamp_ms"]
async def on_agent_speech_ended(ev):
if ev["interrupted"] and last_user_start_ms is not None:
latency_ms = ev["timestamp_ms"] - last_user_start_ms
print(f"barge-in latency: {latency_ms}ms (target: <250ms)")
phone.on_user_speech_started = on_user_speech_started
phone.on_agent_speech_ended = on_agent_speech_ended
Wiring
The realtime stream handler fires user_speech_started/_ended/_eos and agent_speech_started/_ended automatically on the OpenAI Realtime + Twilio/Telnyx path — no extra setup required.
on_llm_token and on_audio_out are exposed on the dispatcher (phone.speech_events) so custom adapters and pipeline-mode integrations can call them. If you are building a custom provider, call phone.speech_events.fire_llm_first_token(...) on your first streamed chunk and phone.speech_events.fire_audio_out(...) on your first synthesized audio buffer; both are idempotent within a turn.
Public exports
| Export | Type | Use |
|---|
SpeechEvents | class | The dispatcher. One instance per Patter (auto-created). |
SpeechEventCallback | type alias | Callable[[dict], Awaitable[None] | None]. |
ConversationStateSnapshot | dict shape | {"user": <user_state>, "agent": <agent_state>}. |
UserState | str literal | "listening" | "speaking" | "thinking" | "away". |
AgentState | str literal | "initializing" | "idle" | "listening" | "thinking" | "speaking". |
EouTrigger | str literal | "vad_silence" | "semantic_turn_detector" | "manual_commit". |
from getpatter import SpeechEvents, SpeechEventCallback
OpenTelemetry attach contract
Every speech-edge event also records a span event on the active call span when PATTER_OTEL_ENABLED=1 and the optional opentelemetry peer dep is installed. When OTel is missing or disabled, the OTel branch is a zero-cost no-op — there is no overhead and no failure.
| Callback | Span event name | Selected attributes |
|---|
on_user_speech_started | patter.event.user_speech_started | patter.audio.offset_ms, patter.vad.confidence |
on_user_speech_ended | patter.event.user_speech_ended | patter.speech.duration_ms |
on_user_speech_eos | patter.event.user_speech_eos | patter.eos.trigger, patter.eos.trailing_silence_ms |
on_agent_speech_started | patter.event.agent_speech_started | patter.turn.idx, patter.tts.provider, patter.engine |
on_agent_speech_ended | patter.event.agent_speech_ended | patter.turn.idx, patter.speech.duration_ms, patter.turn.interrupted |
on_llm_token | patter.event.llm_first_token | gen_ai.request.model, gen_ai.provider.name (per OTel GenAI semconv), patter.turn.idx |
on_audio_out | patter.event.tts_first_audio | patter.turn.idx, patter.tts.provider |
See Tracing for the OTel installation and exporter setup.
Callback safety
Observer exceptions are caught and logged, never propagated to the live call. A misbehaving callback cannot crash the call or break audio. Errors are logged at WARNING level under the getpatter.events logger with the offending span event name for easy correlation.
Design notes
on_user_speech_ended vs. on_user_speech_eos: surfaced as separate events because they are two different signals. silence_gap_ms_max wants the EOU; cross_talk_pct wants the raw VAD edge.
on_agent_speech_started vs. on_audio_out: on_audio_out is when TTS bytes arrive in the buffer (warmup metric). on_agent_speech_started is when those bytes hit the carrier wire — what the user actually hears. Subtract the two to measure carrier-side jitter.
- Idempotency:
on_llm_token and on_audio_out fire at most once per turn. The guard is reset on on_user_speech_eos so the next turn re-arms cleanly.