Module agents.tokenize.filters
Classes
class BasicTextFilter (*,
language: str = 'auto',
strip_markdown: bool = True,
strip_llm_metadata: bool = True,
collapse_script_parens: bool = True,
normalise_punctuation: bool = True,
expand_symbols: bool = True,
expand_ranges: bool = True,
protect_structural: bool = True,
respect_quotes: bool = True,
respect_parens: bool = True,
ssml_flavor: str = 'none',
verbalize_currency: bool = True,
verbalize_numbers: bool = False,
currency_hint: str | None = None)-
Expand source code
class BasicTextFilter(TextFilter): """Default text filter with six independently-toggleable rules. All rules are on by default. Symbol expansion is suppressed for non-English languages; the TTS provider handles symbol readings for those. """ def __init__( self, *, language: str = "auto", strip_markdown: bool = True, strip_llm_metadata: bool = True, collapse_script_parens: bool = True, normalise_punctuation: bool = True, expand_symbols: bool = True, expand_ranges: bool = True, protect_structural: bool = True, respect_quotes: bool = True, respect_parens: bool = True, ssml_flavor: str = "none", verbalize_currency: bool = True, verbalize_numbers: bool = False, currency_hint: str | None = None, ) -> None: """Initialise the filter. Args: language: ISO 639-1 code or ``"auto"``. Drives symbol expansion (English only) and numeric-range separator word ("to", "से", "から", etc.). strip_markdown: Strip Markdown syntax before TTS. Default ``True``. strip_llm_metadata: Strip common LLM state-leak shapes such as ``(SESSION STATE: intro_delivered=true)`` or ``STATE: language=en``. Safety net — the real fix is a clearer prompt. Default ``True``. collapse_script_parens: When the LLM writes both an English word and its non-Latin gloss (``Tally (टैली)``, ``one click (एक क्लिक)``), collapse to just the gloss so the TTS doesn't speak the phrase twice. Word-count aware: ``Extra manpower (मैनपावर)`` → ``Extra मैनपावर``. Default ``True``. normalise_punctuation: Collapse ``"..."`` → ``"…"`` etc. Default ``True``. expand_symbols: Rewrite ``&``, ``%``, ``#N`` → words (English only). Default ``True``. expand_ranges: Rewrite short digit ranges ``2-3`` → ``2 to 3`` using the locale's separator word. Default ``True``. protect_structural: Preserve URLs / paths / versions / emails through the Markdown and symbol stages. Default ``True``. respect_quotes: (Reserved — currently a no-op; preserved state is tracked per turn so future logic can use it.) respect_parens: (Reserved — same as above.) """ self._language = language self._strip_markdown = strip_markdown self._strip_llm_metadata = strip_llm_metadata self._collapse_script_parens = collapse_script_parens self._normalise_punctuation = normalise_punctuation self._expand_symbols = expand_symbols self._expand_ranges = expand_ranges self._protect_structural = protect_structural self._respect_quotes = respect_quotes self._respect_parens = respect_parens self._ssml_flavor = (ssml_flavor or "none").lower() self._verbalize_currency = verbalize_currency self._verbalize_numbers = verbalize_numbers self._currency_hint = currency_hint.lower() if currency_hint else None self._buffer: str = "" self._in_code_fence: bool = False self._placeholder_counter: int = 0 self._placeholder_map: dict[str, str] = {} @classmethod def for_language(cls, language: str | None) -> "BasicTextFilter": """Build a filter pre-configured for ``language`` — used by ``Pipeline`` auto-wiring. """ lang = language or "auto" return cls( language=lang, verbalize_numbers=lang in _VERBALIZE_LANGS, currency_hint=_CURRENCY_BY_LANG.get(lang), ) async def filter(self, chunks: AsyncIterator[str]) -> AsyncIterator[str]: """Transform an incoming chunk stream.""" await self.reset() try: async for chunk in chunks: if not chunk: continue logger.debug("[chunking] filter ← raw: %r", chunk) self._buffer += chunk emitted_before_fence: list[str] = [] while True: if self._in_code_fence: close_idx = self._buffer.find("```") if close_idx == -1: self._buffer = "" break self._buffer = self._buffer[close_idx + 3 :] self._in_code_fence = False continue open_idx = self._buffer.find("```") if open_idx == -1: break close_idx = self._buffer.find("```", open_idx + 3) if close_idx != -1: break prefix = self._buffer[:open_idx] self._buffer = "" self._in_code_fence = True if prefix: processed = self._process(prefix) if processed: emitted_before_fence.append(processed) break for piece in emitted_before_fence: logger.debug("[chunking] filter → tokenizer: %r", piece) yield piece if self._in_code_fence or not self._buffer: continue safe_cut = self._find_emit_boundary(self._buffer) if safe_cut <= 0: if len(self._buffer) > _MAX_BUFFER: processed = self._process(self._buffer) self._buffer = "" if processed: logger.debug("[chunking] filter → tokenizer: %r", processed) yield processed continue processed = self._process(self._buffer[:safe_cut]) self._buffer = self._buffer[safe_cut:] if processed: logger.debug("[chunking] filter → tokenizer: %r", processed) yield processed if self._in_code_fence: self._buffer = "" self._in_code_fence = False if self._buffer: processed = self._process(self._buffer) self._buffer = "" if processed: logger.debug("[chunking] filter → tokenizer (drain): %r", processed) yield processed except Exception: logger.error("BasicTextFilter errored; yielding buffer raw", exc_info=True) if self._buffer: yield self._buffer self._buffer = "" async def reset(self) -> None: """Reset per-turn state.""" self._buffer = "" self._in_code_fence = False self._placeholder_counter = 0 self._placeholder_map = {} def _find_emit_boundary(self, text: str) -> int: """Return the length of the prefix that can be safely emitted. Assumes fence handling has already happened: this only worries about balanced inline Markdown markers (``**``, ``__``, backticks, brackets). Returns 0 when no safe whitespace boundary exists yet. """ if not text: return 0 for idx in range(len(text) - 1, -1, -1): if not text[idx].isspace(): continue candidate = text[: idx + 1] if self._has_balanced_markers(candidate): return idx + 1 return 0 @staticmethod def _has_balanced_markers(text: str) -> bool: """Cheap balance check for common Markdown / grouping markers. Ensures we never emit a prefix that leaves an opener dangling — needed so that: - inline ``**bold**`` / ``_italic_`` / ``` `code` ``` all close, - Markdown links ``[text](url)`` both brackets close, - and, critically, parentheses balance. The last matters for the metadata stripper: an LLM-leaked ``(SESSION STATE: …)`` must reach the processor as one intact block so the regex can delete it. """ if text.count("**") % 2 != 0: return False if text.count("__") % 2 != 0: return False bt_count = sum(1 for c in text if c == "`") if bt_count % 2 != 0: return False if text.count("[") != text.count("]"): return False if text.count("(") != text.count(")"): return False return True def _process(self, text: str) -> str: if not text: return "" if self._protect_structural: text = self._protect(text) if self._strip_llm_metadata: text = self._strip_metadata(text) if self._strip_markdown: text = self._strip_md(text) if self._collapse_script_parens: text = self._collapse_script_parens_fn(text) if self._normalise_punctuation: text = self._normalise_punct(text) if self._ssml_flavor == "cartesia": text = self._inject_cartesia_ssml(text) elif self._ssml_flavor == "digits": text = self._inject_digits_spaced(text) if self._expand_ranges: text = self._expand_numeric_ranges(text) if self._verbalize_currency: text = expand_currency( text, language=self._language or "en", hint=self._currency_hint, ) if self._verbalize_numbers: text = expand_cardinals(text, language=self._language or "en") if self._expand_symbols and self._language_is_english(): text = self._expand(text) if self._protect_structural: text = self._restore(text) return text def _language_is_english(self) -> bool: lang = (self._language or "").lower() return lang in ("en", "auto", "") def _protect(self, text: str) -> str: def _substitute(match: re.Match[str]) -> str: key = chr(_FILTER_PLACEHOLDER_BASE + self._placeholder_counter) self._placeholder_counter += 1 self._placeholder_map[key] = match.group(0) return key for regex in (URL_REGEX, EMAIL_REGEX, PATH_REGEX, VERSION_REGEX): text = regex.sub(_substitute, text) return text def _restore(self, text: str) -> str: if not self._placeholder_map: return text for key, value in self._placeholder_map.items(): if key in text: text = text.replace(key, value) return text @staticmethod def _strip_metadata(text: str) -> str: """Remove common LLM state-leak shapes. Targets patterns the model produces when it misinterprets a ``SESSION STATE`` / ``INTERNAL`` block in the system prompt as something to announce: parenthesised ALL-CAPS key/value dumps, and bare ``KEYWORD: key=value`` lines. Designed to never match normal parentheticals — the key detector is ``= inside all-caps-prefixed``. """ text = METADATA_PARENS_REGEX.sub("", text) text = METADATA_PREFIX_REGEX.sub("", text) return text @staticmethod def _collapse_script_parens_fn(text: str) -> str: """Collapse ``<Latin word(s)> (<non-Latin gloss>)`` → ``<gloss>``. Walks each ``(…)`` containing a non-ASCII character, counts its words, and removes exactly that many Latin-script words from the immediately-preceding text. Preserves whitespace and non-Latin context around the match. """ matches = list(SCRIPT_MIXED_PAREN_REGEX.finditer(text)) if not matches: return text result: list[str] = [] pos = 0 for m in matches: paren_start = m.start() paren_end = m.end() content = m.group(1).strip() if not content: result.append(text[pos:paren_end]) pos = paren_end continue n_words = len(content.split()) before = text[pos:paren_start] idx = len(before) removed = 0 while removed < n_words: while idx > 0 and before[idx - 1].isspace(): idx -= 1 end = idx while ( idx > 0 and before[idx - 1].isascii() and before[idx - 1].isalnum() ): idx -= 1 if end == idx: break removed += 1 if removed == n_words: kept = before[:idx] if kept and not kept[-1:].isspace(): result.append(kept + " ") else: result.append(kept) result.append(content) else: result.append(before) result.append(text[paren_start:paren_end]) pos = paren_end result.append(text[pos:]) return "".join(result) @staticmethod def _strip_md(text: str) -> str: text = MD_FENCED_CODE_REGEX.sub("", text) text = MD_INLINE_CODE_REGEX.sub("", text) text = MD_IMAGE_REGEX.sub("", text) text = MD_LINK_REGEX.sub(r"\1", text) text = MD_HEADING_REGEX.sub("", text) text = MD_LIST_MARKER_REGEX.sub("", text) text = MD_BLOCKQUOTE_REGEX.sub("", text) text = MD_HR_REGEX.sub("", text) text = MD_TABLE_SEP_REGEX.sub("", text) text = MD_TABLE_PIPE_REGEX.sub(" ", text) text = MD_BOLD_STAR_REGEX.sub(r"\1", text) text = MD_BOLD_UNDER_REGEX.sub(r"\1", text) text = MD_ITALIC_STAR_REGEX.sub(r"\1", text) text = MD_ITALIC_UNDER_REGEX.sub(r"\1", text) return text @staticmethod def _normalise_punct(text: str) -> str: text = PUNCT_ELLIPSIS_REGEX.sub("…", text) text = PUNCT_SPACED_DASH_REGEX.sub(" — ", text) return text @staticmethod def _expand(text: str) -> str: for regex, replacement in SYMBOL_EXPANSIONS_EN: text = regex.sub(replacement, text) return text @staticmethod def _inject_digits_spaced(text: str) -> str: """Replace phones and 6+ digit runs with space-separated digits. Example: ``+91 98765 43210`` → ``9 1 9 8 7 6 5 4 3 2 1 0``. This is the universal fallback when the TTS doesn't support Cartesia's ``<spell>`` tag (Sarvam AI, ElevenLabs, Google TTS, Azure, AWS Polly, etc.). Every modern TTS reads space-separated digits character-by-character naturally — in the active language (Hindi TTS says "नौ आठ सात…", English TTS says "nine eight seven…"). The ``+``, ``(``, ``)``, ``-``, and interior spaces of phone patterns are stripped so the TTS doesn't read "plus paren…" etc. """ def _phone_to_digits(m: re.Match[str]) -> str: digits = re.sub(r"\D", "", m.group(0)) return " ".join(digits) text = SPELL_PHONE_REGEX.sub(_phone_to_digits, text) text = SPELL_LONG_DIGITS_REGEX.sub( lambda m: " ".join(m.group(0)), text, ) return text @staticmethod def _inject_cartesia_ssml(text: str) -> str: """Wrap phone numbers and long digit runs in ``<spell>…</spell>``. Cartesia Sonic-3 reads unwrapped digit strings as natural-language numbers ("one thousand two hundred thirty four"). For IDs, account numbers, OTPs, and phone numbers you almost always want digit-by-digit. The order matters: phone regex runs first (more specific), then the long-digit fallback for any standalone 7+ digit run. """ text = SPELL_PHONE_REGEX.sub(lambda m: f"<spell>{m.group(0)}</spell>", text) text = SPELL_LONG_DIGITS_REGEX.sub( lambda m: f"<spell>{m.group(0)}</spell>", text ) return text def _expand_numeric_ranges(self, text: str) -> str: """Rewrite ``N-M`` as ``N <sep> M`` using the locale's separator word. Regex width is adaptive: * With ``ssml_flavor="cartesia"`` (phones already wrapped in ``<spell>…</spell>``) — uses the WIDE regex (up to 4 digits per side), so ``500-1000 entries`` becomes ``500 से 1000 entries``. * Otherwise — uses the narrow 3-digit regex so an English phone like ``555-1234`` is not mistaken for a range. * For non-Latin agents (Hindi, Tamil, etc.), we also use the wide regex because phone numbers are rare in those prose contexts, while range patterns (दाम पांच सौ से हज़ार, etc.) are common. """ lang = (self._language or "").lower() separator = RANGE_SEPARATOR_BY_LANG.get(lang) if separator is None: return text regex = ( RANGE_REGEX_WIDE if self._ssml_flavor == "cartesia" or not self._language_is_english() else RANGE_REGEX ) return regex.sub( lambda m: f"{m.group(1)} {separator} {m.group(2)}", text, )Default text filter with six independently-toggleable rules.
All rules are on by default. Symbol expansion is suppressed for non-English languages; the TTS provider handles symbol readings for those.
Initialise the filter.
Args
language- ISO 639-1 code or
"auto". Drives symbol expansion (English only) and numeric-range separator word ("to", "से", "から", etc.). strip_markdown- Strip Markdown syntax before TTS. Default
True. strip_llm_metadata- Strip common LLM state-leak shapes such as
(SESSION STATE: intro_delivered=true)orSTATE: language=en. Safety net — the real fix is a clearer prompt. DefaultTrue. collapse_script_parens- When the LLM writes both an English word
and its non-Latin gloss (
Tally (टैली),one click (एक क्लिक)), collapse to just the gloss so the TTS doesn't speak the phrase twice. Word-count aware:Extra manpower (मैनपावर)→Extra मैनपावर<code>. Default </code>True. normalise_punctuation- Collapse
"..."→"…"etc. DefaultTrue. expand_symbols- Rewrite
&,%,#N→ words (English only). DefaultTrue. expand_ranges- Rewrite short digit ranges
2-3→2 to 3using the locale's separator word. DefaultTrue. protect_structural- Preserve URLs / paths / versions / emails
through the Markdown and symbol stages. Default
True. respect_quotes- (Reserved — currently a no-op; preserved state is tracked per turn so future logic can use it.)
respect_parens- (Reserved — same as above.)
Ancestors
- TextFilter
- abc.ABC
Static methods
def for_language(language: str | None) ‑> BasicTextFilter-
Build a filter pre-configured for
language— used byPipelineauto-wiring.
Methods
async def filter(self, chunks: AsyncIterator[str]) ‑> AsyncIterator[str]-
Expand source code
async def filter(self, chunks: AsyncIterator[str]) -> AsyncIterator[str]: """Transform an incoming chunk stream.""" await self.reset() try: async for chunk in chunks: if not chunk: continue logger.debug("[chunking] filter ← raw: %r", chunk) self._buffer += chunk emitted_before_fence: list[str] = [] while True: if self._in_code_fence: close_idx = self._buffer.find("```") if close_idx == -1: self._buffer = "" break self._buffer = self._buffer[close_idx + 3 :] self._in_code_fence = False continue open_idx = self._buffer.find("```") if open_idx == -1: break close_idx = self._buffer.find("```", open_idx + 3) if close_idx != -1: break prefix = self._buffer[:open_idx] self._buffer = "" self._in_code_fence = True if prefix: processed = self._process(prefix) if processed: emitted_before_fence.append(processed) break for piece in emitted_before_fence: logger.debug("[chunking] filter → tokenizer: %r", piece) yield piece if self._in_code_fence or not self._buffer: continue safe_cut = self._find_emit_boundary(self._buffer) if safe_cut <= 0: if len(self._buffer) > _MAX_BUFFER: processed = self._process(self._buffer) self._buffer = "" if processed: logger.debug("[chunking] filter → tokenizer: %r", processed) yield processed continue processed = self._process(self._buffer[:safe_cut]) self._buffer = self._buffer[safe_cut:] if processed: logger.debug("[chunking] filter → tokenizer: %r", processed) yield processed if self._in_code_fence: self._buffer = "" self._in_code_fence = False if self._buffer: processed = self._process(self._buffer) self._buffer = "" if processed: logger.debug("[chunking] filter → tokenizer (drain): %r", processed) yield processed except Exception: logger.error("BasicTextFilter errored; yielding buffer raw", exc_info=True) if self._buffer: yield self._buffer self._buffer = ""Transform an incoming chunk stream.
async def reset(self) ‑> None-
Expand source code
async def reset(self) -> None: """Reset per-turn state.""" self._buffer = "" self._in_code_fence = False self._placeholder_counter = 0 self._placeholder_map = {}Reset per-turn state.