Skip to main content

Documentation Index

Fetch the complete documentation index at: https://patter-06b046ce-feat-observability-otel-attrs-0-6-1.mintlify.app/llms.txt

Use this file to discover all available pages before exploring further.

Events & Callbacks

Patter fires async callbacks at key moments in the call lifecycle. Use them to log calls, update CRMs, trigger workflows, or control conversation flow. All callbacks are async functions. They are passed as parameters to serve().

Available Callbacks

CallbackTrigger
on_call_startA call connects
on_call_endA call ends
on_transcriptEach utterance is transcribed
on_messageUser message received (pipeline mode)
on_metricsAfter each conversation turn (real-time cost/latency)
For fine-grained pipeline observability (every interim transcript, every LLM chunk, every TTS chunk, every tool start) subscribe to the EventBus below — it complements these callbacks rather than replacing them. For mutating prompts and responses (RAG augmentation, output validation, PII redaction) use PipelineHooks — they sit inside the LLM step rather than firing alongside it.

on_call_start

Fires when a call connects. Use it to log call starts, initialize state, or fetch customer data.
async def on_call_start(event):
    print(f"Call started: {event['call_id']}")
    print(f"Caller: {event['caller']}")
    print(f"Callee: {event['callee']}")
    print(f"Direction: {event['direction']}")
    print(f"Custom params: {event.get('custom_params', {})}")

Event Fields

FieldTypeDescription
call_idstrUnique identifier for this call.
callerstrThe caller’s phone number (E.164).
calleestrThe callee’s phone number (E.164).
directionstr"inbound" or "outbound".
custom_paramsdictCustom parameters passed with the call (if any).

on_call_end

Fires when a call ends. Use it to save transcripts, calculate duration, or trigger post-call workflows.
async def on_call_end(event):
    print(f"Call ended: {event['call_id']}")
    for entry in event["transcript"]:
        print(f"  [{entry['role']}]: {entry['text']}")

Event Fields

FieldTypeDescription
call_idstrUnique identifier for this call.
callerstrThe caller’s phone number (E.164).
calleestrThe callee’s phone number (E.164).
ended_atfloatUnix timestamp when the call ended (e.g. 1710489601.234).
transcriptlist[dict]Full conversation transcript. Each entry has role ("user" or "assistant") and text.
metricsCallMetrics | NoneCall metrics with cost and latency breakdowns. None if metrics collection failed. See Metrics & Cost Tracking.

on_transcript

Fires each time an utterance is transcribed during the call. Use it for real-time logging, sentiment analysis, or live dashboards.
async def on_transcript(event):
    print(f"[{event['role']}] {event['text']}")
    # Access conversation history so far
    for entry in event.get("history", []):
        pass  # {role, text, timestamp}

Event Fields

FieldTypeDescription
rolestr"user" or "assistant".
textstrThe transcribed text.
call_idstrUnique identifier for this call.
historylist[dict]Conversation history so far. Each entry has role, text, and timestamp.

on_message

Fires when a user message is received in pipeline mode. Your callback processes the message and returns the agent’s response as a string, which is then synthesized to speech.
on_message is only used in pipeline mode (when you pass stt= / tts= instead of engine=). In engine mode (OpenAI Realtime, ElevenLabs ConvAI) the engine handles responses directly.
async def on_message(event) -> str:
    user_text = event["text"]
    call_id = event["call_id"]
    caller = event["caller"]
    history = event.get("history", [])

    # Your custom logic here — call an LLM, query a database, etc.
    response = await my_llm_handler(user_text, history)
    return response

Event Fields

FieldTypeDescription
textstrThe user’s transcribed message.
call_idstrUnique identifier for this call.
callerstrThe caller’s phone number.
calleestrThe callee’s phone number.
historylist[dict]Conversation history. Each entry has role, text, and timestamp.

Return Value

Return a str with the agent’s response. This text is sent to the TTS provider and played back to the caller.

EventBus

The EventBus exposes fine-grained pipeline events that don’t have first-class callbacks. Subscribe with on(event_type, handler) from inside on_call_start (or any place you have a Patter reference).
from getpatter import EventBus, PatterEventType

# `phone.events` is the per-process EventBus.
phone.events.on(PatterEventType.TRANSCRIPT_PARTIAL, lambda ev: print("partial:", ev["text"]))
phone.events.on(PatterEventType.LLM_CHUNK, lambda ev: log_chunk(ev["call_id"], ev["text"]))
PatterEventTypeFires
TRANSCRIPT_PARTIALEvery interim STT result (before endpointing).
TRANSCRIPT_FINALEvery final STT result (after endpointing). Same payload as on_transcript.
LLM_CHUNKEvery streamed LLM token / chunk.
TTS_CHUNKEvery TTS audio chunk written to the carrier.
TOOL_CALL_STARTEDTool dispatched (paired with the existing tool_call_completed you can observe via on_call_end).
Handlers are non-blocking (run in a fire-and-forget task). Throwing inside a handler logs the error but does not interrupt the call.

Pipeline Hooks

PipelineHooks lets you intercept data at each stage of the pipeline mode STT → LLM → TTS pipeline. Pass an instance via phone.agent(hooks=...). Hooks may be sync or async; if a hook throws, the error is logged and the original value passes through unchanged (fail-open).
from getpatter import PipelineHooks

hooks = PipelineHooks(
    before_send_to_stt=...,   # raw PCM in → drop chunk by returning None
    after_transcribe=...,     # transcript in → skip turn by returning None
    before_llm=...,           # messages list in → replace by returning new list
    after_llm=...,            # see "after_llm 3-tier API" below
    before_synthesize=...,    # sentence in → skip TTS for sentence by returning None
    after_synthesize=...,     # audio chunk in → discard by returning None
)

agent = phone.agent(stt=..., llm=..., tts=..., hooks=hooks, system_prompt="...")

after_llm — 3-tier API

after_llm accepts either a dict with on_chunk / on_sentence / on_response keys, or any object exposing those attributes (dataclass, custom class, Protocol implementation).
# Recommended: 3-tier API
after_llm = {
    "on_chunk":    lambda chunk: chunk.replace("um", ""),     # sync, ~0 ms
    "on_sentence": async_redact_pii,                          # async, 50-300 ms
    "on_response": async_validate_json_schema,                # async, 500 ms-2 s, BLOCKS streaming TTS
}

hooks = PipelineHooks(after_llm=after_llm)
TierSync/AsyncLatency budgetWhen it runsReturn semantics
on_chunk(chunk: str) -> strsync~0 msPer LLM token chunk, before sentence aggregationReturn new string. Use for cheap text rewrites.
on_sentence(sentence: str, ctx: HookContext) -> str | Noneasync50–300 msPer complete sentence, between chunker and TTSReturn new sentence, None to keep original, or "" to drop the sentence.
on_response(text: str, ctx: HookContext) -> str | Noneasync500 ms–2 sOnce at end of LLM stream, blocks streaming TTSReturn new text, or None to keep original.
Pick the lowest tier that does the job — on_chunk for fast string ops, on_sentence for per-sentence I/O (PII redaction, translation), on_response only when you need the whole response (JSON-schema validation, full-context moderation).

Migration: legacy after_llm callable

The legacy single-callable form is still supported for backward compatibility but is deprecated:
# Legacy (deprecated, removed in v0.7.0):
hooks = PipelineHooks(
    after_llm=lambda text, ctx: text.upper(),
)
The legacy callable is mapped internally to the on_response slot and emits a one-shot PatterDeprecationWarning on first use. Migrate to the 3-tier dict to silence the warning and unlock the lower-latency on_chunk / on_sentence tiers.
# Migrated:
hooks = PipelineHooks(
    after_llm={"on_response": async_uppercase},
)

HookContext

Hooks that take a ctx argument receive a frozen HookContext dataclass:
@dataclass(frozen=True)
class HookContext:
    call_id: str
    caller: str
    callee: str
    history: tuple[dict, ...] = ()
PipelineHooks also exposes before_stt / after_stt and before_tts / after_tts for audio-stage interception. See the API Reference for the full signature.

Conversation History

All callbacks that include history receive it as a list of dictionaries:
[
    {"role": "assistant", "text": "Hello! How can I help?", "timestamp": 1710489601.234},
    {"role": "user", "text": "I'd like to check my order status.", "timestamp": 1710489605.891},
    {"role": "assistant", "text": "Sure! What's your order ID?", "timestamp": 1710489606.712},
]
Timestamps are Unix floats (from Python’s time.time()), not ISO-8601 strings.

Complete Example

import os
import asyncio
from dotenv import load_dotenv
from getpatter import Patter, Twilio, OpenAIRealtime

load_dotenv()

phone = Patter(
    carrier=Twilio(),                               # TWILIO_* from env
    phone_number=os.environ["PHONE_NUMBER"],
    webhook_url=os.environ["WEBHOOK_URL"],
)

agent = phone.agent(
    engine=OpenAIRealtime(),                        # OPENAI_API_KEY from env
    system_prompt="You are a helpful assistant.",
    first_message="Hi there! What can I do for you?",
)

async def on_call_start(event):
    print(f"[START] Call {event['call_id']} from {event['caller']} ({event['direction']})")

async def on_call_end(event):
    print(f"[END] Call {event['call_id']}")
    print(f"  Transcript ({len(event['transcript'])} messages):")
    for entry in event["transcript"]:
        print(f"    [{entry['role']}]: {entry['text']}")

async def on_transcript(event):
    print(f"  [{event['role']}]: {event['text']}")

async def main():
    await phone.serve(
        agent,
        port=8000,
        on_call_start=on_call_start,
        on_call_end=on_call_end,
        on_transcript=on_transcript,
    )

asyncio.run(main())

Speech-Edge Events (Turn-Taking)

The callbacks above describe the transcript-level lifecycle of a call. For turn-taking instrumentation — barge-in, end-of-utterance, time-to-first-token, TTS warmup vs. wire-time — Patter exposes seven additional async callbacks plus a read-only conversation_state snapshot directly on the Patter instance. These events expose the canonical voice-agent metric set (user/agent state transitions, turn boundaries, TTFT, audio first-byte) and align with OpenAI Realtime (input_audio_buffer.speech_started/_stopped/_committed) so downstream metrics work without translation.
Every callback defaults to None. Existing code that does not register any speech-edge callback sees exactly the previous behaviour and zero overhead. The state machine is updated regardless of whether callbacks are registered, so conversation_state is always usable.

The seven events

EventFires onSignal
on_user_speech_startedVAD positive edge of inbound audioRaw VAD start — not end-of-utterance. Use for cross-talk detection.
on_user_speech_endedVAD trailing edgeRaw VAD stop — not committed EOU. Use for talk-ratio.
on_user_speech_eosCommitted end-of-utteranceCanonical “user finished” signal. Anchor eos_to_first_token_ms here.
on_agent_speech_startedFirst wire-time chunk of the agent turnWhat the user actually hears (distinct from TTS warmup). Anchor barge-in latency here.
on_agent_speech_endedLast wire chunk of the agent turnPayload includes interrupted: bool. True = barge-in cancelled the turn.
on_llm_tokenFirst LLM token of the turnTTFT marker. Idempotent — fires once per turn.
on_audio_outFirst TTS audio chunk producedTTS warmup arrival (distinct from wire-time). Idempotent — fires once per turn.

Payload signature matrix

async def on_user_speech_started(event: dict) -> None:
    # event = {
    #   "timestamp_ms": int,
    #   "vad_confidence": float | omitted,
    #   "audio_offset_ms": int | omitted,
    # }
    ...

async def on_user_speech_ended(event: dict) -> None:
    # event = {
    #   "timestamp_ms": int,
    #   "speech_duration_ms": int,
    #   "vad_confidence": float | omitted,
    #   "audio_offset_ms": int | omitted,
    # }
    ...

async def on_user_speech_eos(event: dict) -> None:
    # event = {
    #   "timestamp_ms": int,
    #   "trigger": "vad_silence" | "semantic_turn_detector" | "manual_commit",
    #   "trailing_silence_ms": int | omitted,
    #   "transcript_so_far": str | omitted,
    # }
    ...

async def on_agent_speech_started(event: dict) -> None:
    # event = {
    #   "timestamp_ms": int,
    #   "turn_idx": int,
    #   "tts_provider": str | omitted,
    #   "engine": str | omitted,
    # }
    ...

async def on_agent_speech_ended(event: dict) -> None:
    # event = {
    #   "timestamp_ms": int,
    #   "turn_idx": int,
    #   "speech_duration_ms": int,
    #   "interrupted": bool,
    # }
    ...

async def on_llm_token(event: dict) -> None:
    # event = {
    #   "timestamp_ms": int,
    #   "turn_idx": int,
    #   "llm_provider": str,
    #   "model": str,
    # }
    ...

async def on_audio_out(event: dict) -> None:
    # event = {
    #   "timestamp_ms": int,
    #   "turn_idx": int,
    #   "tts_provider": str,
    # }
    ...
Compute end-to-end latency by anchoring eos_to_first_token_ms to on_user_speech_eos. It marks the moment the SDK has committed that the user is done speaking — VAD trailing edge plus trailing silence (and optionally a semantic turn-detector agreement). Anchoring to on_user_speech_ended instead would over-count by the silence window and double-fire on mid-utterance VAD blips. Hamming AI thresholds: <800 ms good, >1500 ms critical.

State machine

conversation_state returns a snapshot {"user": <user_state>, "agent": <agent_state>} you can read at any time:
SideStatesInitialSet by
userlistening · speaking · thinking · awaylisteningon_user_speech_startedspeaking, on_user_speech_ended / on_user_speech_eoslistening
agentinitializing · idle · listening · thinking · speakinginitializingcall accepted → idle, EOU committed → thinking, on_agent_speech_startedspeaking, on_agent_speech_endedidle
A monotonic turn_idx counter (also exposed on the dispatcher) increments on every committed EOU. The agent_speech_*, llm_token, and audio_out payloads all carry the current turn_idx so a per-turn metric can correlate them.

Sequence for a normal turn

user audio in       → on_user_speech_started        (user → speaking)
silence detected    → on_user_speech_ended          (user → listening)
silence + commit    → on_user_speech_eos            (turn_idx += 1, agent → thinking)
LLM streams         → on_llm_token   (once)         (TTFT)
TTS produces audio  → on_audio_out   (once)         (TTS warmup)
audio hits wire     → on_agent_speech_started       (agent → speaking)
last chunk          → on_agent_speech_ended         (agent → idle, interrupted=False)

Sequence for a barged-in turn

on_agent_speech_started                              (agent → speaking)
... user starts talking over the agent ...
on_user_speech_started                               (user → speaking)
on_agent_speech_ended { interrupted: True }          (agent → idle)
on_user_speech_eos                                   (turn_idx += 1, new turn begins)

Full example — wire all seven callbacks

import asyncio
from getpatter import Patter, Twilio, OpenAIRealtime

phone = Patter(carrier=Twilio(), phone_number="+15555550100")

agent = phone.agent(
    engine=OpenAIRealtime(),
    system_prompt="You are a helpful assistant.",
)

# --- raw VAD edges ---------------------------------------------------------
async def on_user_speech_started(ev):
    # Raw VAD positive edge — user might still be mid-utterance.
    print(f"[vad+] t={ev['timestamp_ms']} state={phone.conversation_state}")

async def on_user_speech_ended(ev):
    # Raw VAD trailing edge — NOT committed EOU. User may resume in 100ms.
    print(f"[vad-] dur={ev['speech_duration_ms']}ms")

# --- canonical 'user finished' signal --------------------------------------
async def on_user_speech_eos(ev):
    # Committed EOU. This is the timestamp to anchor TTFT against.
    print(f"[eos]  trigger={ev['trigger']} silence={ev.get('trailing_silence_ms')}ms")
    on_user_speech_eos.last_eos_ms = ev["timestamp_ms"]

on_user_speech_eos.last_eos_ms = 0

# --- model + audio first-fire markers --------------------------------------
async def on_llm_token(ev):
    ttft = ev["timestamp_ms"] - on_user_speech_eos.last_eos_ms
    print(f"[ttft] {ttft}ms  model={ev['model']}  provider={ev['llm_provider']}")

async def on_audio_out(ev):
    # TTS warmup — bytes produced, not yet on the wire.
    print(f"[tts ] turn={ev['turn_idx']} provider={ev['tts_provider']}")

# --- what the user hears + barge-in detection ------------------------------
async def on_agent_speech_started(ev):
    print(f"[wire] turn={ev['turn_idx']} engine={ev.get('engine')}")

async def on_agent_speech_ended(ev):
    if ev["interrupted"]:
        print(f"[barge] turn={ev['turn_idx']} cut at {ev['speech_duration_ms']}ms")
    else:
        print(f"[done] turn={ev['turn_idx']} spoke {ev['speech_duration_ms']}ms")

# Wire them all up — these are simple attribute assignments on the Patter
# instance; no `serve()` argument needed.
phone.on_user_speech_started = on_user_speech_started
phone.on_user_speech_ended   = on_user_speech_ended
phone.on_user_speech_eos     = on_user_speech_eos
phone.on_llm_token           = on_llm_token
phone.on_audio_out           = on_audio_out
phone.on_agent_speech_started = on_agent_speech_started
phone.on_agent_speech_ended  = on_agent_speech_ended

asyncio.run(phone.serve(agent, port=8000))

Barge-in detection

The cleanest way to detect a barge-in is to inspect on_agent_speech_ended.interrupted:
barge_ins: list[dict] = []

async def on_agent_speech_ended(ev):
    if ev["interrupted"]:
        barge_ins.append({
            "turn_idx": ev["turn_idx"],
            "spoke_for_ms": ev["speech_duration_ms"],
            "at_ms": ev["timestamp_ms"],
        })

phone.on_agent_speech_ended = on_agent_speech_ended
For barge-in latency (how fast the agent stopped after the user started talking), pair on_user_speech_started with the next on_agent_speech_ended({"interrupted": True}):
last_user_start_ms: int | None = None

async def on_user_speech_started(ev):
    global last_user_start_ms
    last_user_start_ms = ev["timestamp_ms"]

async def on_agent_speech_ended(ev):
    if ev["interrupted"] and last_user_start_ms is not None:
        latency_ms = ev["timestamp_ms"] - last_user_start_ms
        print(f"barge-in latency: {latency_ms}ms (target: <250ms)")

phone.on_user_speech_started = on_user_speech_started
phone.on_agent_speech_ended = on_agent_speech_ended

Wiring

The realtime stream handler fires user_speech_started/_ended/_eos and agent_speech_started/_ended automatically on the OpenAI Realtime + Twilio/Telnyx path — no extra setup required. on_llm_token and on_audio_out are exposed on the dispatcher (phone.speech_events) so custom adapters and pipeline-mode integrations can call them. If you are building a custom provider, call phone.speech_events.fire_llm_first_token(...) on your first streamed chunk and phone.speech_events.fire_audio_out(...) on your first synthesized audio buffer; both are idempotent within a turn.

Public exports

ExportTypeUse
SpeechEventsclassThe dispatcher. One instance per Patter (auto-created).
SpeechEventCallbacktype aliasCallable[[dict], Awaitable[None] | None].
ConversationStateSnapshotdict shape{"user": <user_state>, "agent": <agent_state>}.
UserStatestr literal"listening" | "speaking" | "thinking" | "away".
AgentStatestr literal"initializing" | "idle" | "listening" | "thinking" | "speaking".
EouTriggerstr literal"vad_silence" | "semantic_turn_detector" | "manual_commit".
from getpatter import SpeechEvents, SpeechEventCallback

OpenTelemetry attach contract

Every speech-edge event also records a span event on the active call span when PATTER_OTEL_ENABLED=1 and the optional opentelemetry peer dep is installed. When OTel is missing or disabled, the OTel branch is a zero-cost no-op — there is no overhead and no failure.
CallbackSpan event nameSelected attributes
on_user_speech_startedpatter.event.user_speech_startedpatter.audio.offset_ms, patter.vad.confidence
on_user_speech_endedpatter.event.user_speech_endedpatter.speech.duration_ms
on_user_speech_eospatter.event.user_speech_eospatter.eos.trigger, patter.eos.trailing_silence_ms
on_agent_speech_startedpatter.event.agent_speech_startedpatter.turn.idx, patter.tts.provider, patter.engine
on_agent_speech_endedpatter.event.agent_speech_endedpatter.turn.idx, patter.speech.duration_ms, patter.turn.interrupted
on_llm_tokenpatter.event.llm_first_tokengen_ai.request.model, gen_ai.provider.name (per OTel GenAI semconv), patter.turn.idx
on_audio_outpatter.event.tts_first_audiopatter.turn.idx, patter.tts.provider
See Tracing for the OTel installation and exporter setup.

Callback safety

Observer exceptions are caught and logged, never propagated to the live call. A misbehaving callback cannot crash the call or break audio. Errors are logged at WARNING level under the getpatter.events logger with the offending span event name for easy correlation.

Design notes

  • on_user_speech_ended vs. on_user_speech_eos: surfaced as separate events because they are two different signals. silence_gap_ms_max wants the EOU; cross_talk_pct wants the raw VAD edge.
  • on_agent_speech_started vs. on_audio_out: on_audio_out is when TTS bytes arrive in the buffer (warmup metric). on_agent_speech_started is when those bytes hit the carrier wire — what the user actually hears. Subtract the two to measure carrier-side jitter.
  • Idempotency: on_llm_token and on_audio_out fire at most once per turn. The guard is reset on on_user_speech_eos so the next turn re-arms cleanly.