Module agents.tokenize
Text chunking and filtering for the cascade pipeline.
The public surface exposes the abstract interfaces and the default
implementations. External chunker plugins can plug into SentenceChunker
without touching core code.
Naming note: these classes are intentionally distinct from other frameworks'
*Tokenizer types — VideoSDK chunks LLM output into sentence-sized
segments for streaming TTS, hence the Chunker family.
Sub-modules
agents.tokenize.baseagents.tokenize.basicagents.tokenize.filtersagents.tokenize.hyphenate-
English word hyphenation via Frank Liang's algorithm.
agents.tokenize.indicagents.tokenize.patternsagents.tokenize.streamagents.tokenize.verbalize
Functions
def detect_script(sample: str, *, default: str = 'en') ‑> str-
Expand source code
def detect_script(sample: str, *, default: str = "en") -> str: """Return an ISO 639-1 hint based on the first non-Latin script found. Looks at the sample string for characters from each supported script in priority order. Falls back to ``default`` when only Latin characters (or whitespace / digits / punctuation) are present. Args: sample: Text to inspect. The first ~200 characters are sufficient. default: Language code returned when no non-Latin script is detected. Returns: An ISO 639-1 code. The hint is used for abbreviation selection and locale-specific disambiguation (e.g. Greek ``;`` handling); it does not gate which strong terminators are recognised. """ if not sample: return default scores: dict[str, int] = {} for code, pattern in _SCRIPT_RANGES: matches = pattern.findall(sample) if matches: scores[code] = len(matches) if not scores: return default if "ja" in scores: return "ja" if "ko" in scores: return "ko" return max(scores.items(), key=lambda kv: kv[1])[0]Return an ISO 639-1 hint based on the first non-Latin script found.
Looks at the sample string for characters from each supported script in priority order. Falls back to
defaultwhen only Latin characters (or whitespace / digits / punctuation) are present.Args
sample- Text to inspect. The first ~200 characters are sufficient.
default- Language code returned when no non-Latin script is detected.
Returns
An ISO 639-1 code. The hint is used for abbreviation selection and locale-specific disambiguation (e.g. Greek
;handling); it does not gate which strong terminators are recognised. def hyphenate_english(word: str) ‑> list[str]-
Expand source code
def hyphenate_english(word: str) -> list[str]: """Split ``word`` at every legal English hyphenation point. Args: word: An English word. Short words (≤4 chars) and non-alphabetic tokens are returned as a single-element list. Returns: A list of substrings whose concatenation equals ``word``. Boundaries between consecutive pieces are valid hyphenation points. Examples: >>> hyphenate_english("hyphenation") ['hy', 'phen', 'ation'] >>> hyphenate_english("the") ['the'] >>> hyphenate_english("associate") ['as', 'so', 'ciate'] """ return _get_hyphenator().hyphenate(word)Split
wordat every legal English hyphenation point.Args
word- An English word. Short words (≤4 chars) and non-alphabetic tokens are returned as a single-element list.
Returns
A list of substrings whose concatenation equals
word. Boundaries between consecutive pieces are valid hyphenation points.Examples
>>> hyphenate_english("hyphenation") ['hy', 'phen', 'ation'] >>> hyphenate_english("the") ['the'] >>> hyphenate_english("associate") ['as', 'so', 'ciate'] def normalize_lang_code(code: str | None) ‑> str | None-
Expand source code
def normalize_lang_code(code: str | None) -> str | None: """Reduce an STT/TTS language tag to a bare ISO 639-1 code, or ``None``. """ if not code: return None base = code.split("-", 1)[0].split("_", 1)[0].strip().lower() if not base or base == "auto": return None return baseReduce an STT/TTS language tag to a bare ISO 639-1 code, or
None. def pre_warm_tokenizer() ‑> None-
Expand source code
def pre_warm_tokenizer() -> None: """Eagerly import ``indic-nlp-library`` so the first ``.tokenize()`` call is cheap. The underlying ``indicnlp.tokenize.sentence_tokenize`` module performs its expensive initialisation on first import (~6s on a cold Python process). Calling this at worker start — alongside ``TurnDetector.pre_download_model()`` — moves that cost out of the first conversational turn. """ _load_sentence_split()Eagerly import
indic-nlp-libraryso the first.tokenize()call is cheap.The underlying
indicnlp.tokenize.sentence_tokenizemodule performs its expensive initialisation on first import (~6s on a cold Python process). Calling this at worker start — alongsideTurnDetector.pre_download_model()— moves that cost out of the first conversational turn.
Classes
class BasicSentenceChunker (*,
language: str = 'auto',
min_sentence_len: int = 20,
strong_terminators: str = '.!?…‼⁇⁈⁉。!?।॥؟۔؞։።፧፨།༎༏༐༑။។៕\n',
weak_terminators: str = ',;:、,;:،؛՝՛፣፤፥་၊៖—–',
abbreviations: frozenset[str] | None = None)-
Expand source code
class BasicSentenceChunker(SentenceChunker): """Default multilingual, Unicode-aware sentence chunker. Works correctly without a ``language`` hint for every major world script. An explicit hint sharpens behaviour (abbreviation set, Greek ``;`` upgrade) but is not required. Args: language: ISO 639-1 code or ``"auto"`` (default). ``"auto"`` triggers script detection on the first ~200 characters of each tokenize call. min_sentence_len: Minimum character length before a weak terminator (``, ; :`` etc.) is treated as a cut boundary. Strong terminators (``. ! ? 。 ! ? । 。 ۔ ։ ።`` …) always cut regardless of length. strong_terminators: Override the default strong-terminator character class. weak_terminators: Override the default weak-terminator character class. abbreviations: Override the abbreviation set. When ``None`` (default), the set is chosen from ``ABBREVIATIONS_BY_LANG`` based on the resolved language. """ def __init__( self, *, language: str = "auto", min_sentence_len: int = 20, strong_terminators: str = STRONG_TERMINATORS, weak_terminators: str = WEAK_TERMINATORS, abbreviations: frozenset[str] | None = None, ) -> None: self._language = language self._min_sentence_len = max(1, int(min_sentence_len)) self._strong = strong_terminators self._weak = weak_terminators self._override_abbreviations = abbreviations def tokenize(self, text: str, *, language: str | None = None) -> list[str]: """Split ``text`` into sentence-sized strings. Args: text: Input text. May be a full response or a partial buffer. language: Optional ISO 639-1 override. When omitted, uses the instance's default. Returns: Sentence strings in input order, leading/trailing whitespace stripped, empty segments filtered out. When the input ends without a confirmed sentence boundary, the final element contains the unterminated remainder (the stream adapter uses this to retain buffer state). """ if not text: return [] lang = language or self._language if lang == "auto": lang = detect_script(text[:200]) strong = self._strong weak = self._weak if lang == "el": if ";" not in strong: strong = strong + ";" weak = weak.replace(";", "") if self._override_abbreviations is not None: abbrevs = self._override_abbreviations else: abbrevs = ABBREVIATIONS_BY_LANG.get(lang, ABBREVIATIONS_EN) protected, restore_map = self._protect(text, abbrevs) segments = self._split(protected, strong=strong, weak=weak) return [self._restore(s, restore_map) for s in segments if s.strip()] def tokenize_raw( self, text: str, *, language: str | None = None, ) -> list[str]: """Return raw (un-stripped) segment substrings. Same boundary decisions as :py:meth:`tokenize`, but each segment is returned as it appears in the input text — leading / trailing whitespace intact. Used by the stream adapter so it can re-use the final segment as the continuation buffer without losing the space that separates it from the upcoming chunk. """ if not text: return [] lang = language or self._language if lang == "auto": lang = detect_script(text[:200]) strong = self._strong weak = self._weak if lang == "el": if ";" not in strong: strong = strong + ";" weak = weak.replace(";", "") if self._override_abbreviations is not None: abbrevs = self._override_abbreviations else: abbrevs = ABBREVIATIONS_BY_LANG.get(lang, ABBREVIATIONS_EN) protected, restore_map = self._protect(text, abbrevs) segments = self._split(protected, strong=strong, weak=weak) return [self._restore_raw(s, restore_map) for s in segments] @staticmethod def _restore_raw(text: str, restore_map: dict[str, str]) -> str: """Same as :py:meth:`_restore` but preserves edge whitespace.""" if not restore_map: return text for key, value in restore_map.items(): text = text.replace(key, value) return text def stream(self, *, language: str | None = None) -> SentenceChunkStream: """Open a push-based stream adapter bound to this chunker.""" from .stream import BufferedSentenceChunkStream return BufferedSentenceChunkStream( tokenize_fn=partial(self.tokenize_raw, language=language), strong_terminators=self._resolve_strong_for_stream(language), min_sentence_len=self._min_sentence_len, ) def _protect( self, text: str, abbreviations: frozenset[str], ) -> tuple[str, dict[str, str]]: """Replace structural tokens and abbreviation dots with placeholders. Each protected region becomes a single PUA character that the split scan cannot interpret as a terminator. """ restore_map: dict[str, str] = {} counter = [0] def _substitute(match: re.Match[str]) -> str: key = _placeholder_char(counter[0]) counter[0] += 1 restore_map[key] = match.group(0) return key for regex in _STRUCTURAL_PATTERNS: text = regex.sub(_substitute, text) if abbreviations: escaped = sorted( (re.escape(a) for a in abbreviations), key=len, reverse=True, ) abbrev_regex = re.compile( r"\b(?:" + "|".join(escaped) + r")\.", re.IGNORECASE, ) text = abbrev_regex.sub(_substitute, text) return text, restore_map @staticmethod def _restore(text: str, restore_map: dict[str, str]) -> str: """Replace every placeholder with its original text and strip edges.""" if not restore_map: return text.strip() for key, value in restore_map.items(): text = text.replace(key, value) return text.strip() def _split( self, text: str, *, strong: str, weak: str, ) -> list[str]: """Walk ``text`` and return a list of sentence segments. Strong terminators always cut (with lookahead-before-commit). Weak terminators cut only after the accumulated buffer reaches ``min_sentence_len`` chars so we don't emit fragments from every comma at the start of a response. """ strong_set = set(strong) weak_set = set(weak) closing_quote_set = set(CLOSING_QUOTES) segments: list[str] = [] start = 0 i = 0 n = len(text) while i < n: ch = text[i] if ch not in strong_set and ch not in weak_set: i += 1 continue j = i + 1 cur_set = strong_set if ch in strong_set else weak_set while j < n and text[j] in cur_set: j += 1 if j < n and text[j] in closing_quote_set: j += 1 look = j saw_newline = ch == "\n" while look < n and text[look] in " \t": look += 1 if look < n and text[look] == "\n": saw_newline = True look += 1 while look < n and text[look] in " \t": look += 1 if ch in strong_set: if look < n or saw_newline: segments.append(text[start:j]) start = j i = look continue break cluster_end = j if cluster_end - start >= self._min_sentence_len and look < n: segments.append(text[start:cluster_end]) start = cluster_end i = look continue i = j if start < n: segments.append(text[start:n]) return segments def _resolve_strong_for_stream(self, language: str | None) -> str: """Return the strong-terminator set the stream adapter should use for fast-path checks.""" lang = language or self._language if lang == "el" and ";" not in self._strong: return self._strong + ";" return self._strongDefault multilingual, Unicode-aware sentence chunker.
Works correctly without a
languagehint for every major world script. An explicit hint sharpens behaviour (abbreviation set, Greek;upgrade) but is not required.Args
language- ISO 639-1 code or
"auto"(default)."auto"triggers script detection on the first ~200 characters of each tokenize call. min_sentence_len- Minimum character length before a weak terminator
(
, ; :etc.) is treated as a cut boundary. Strong terminators (. ! ? 。 ! ? । 。 ۔ ։ ።…) always cut regardless of length. strong_terminators- Override the default strong-terminator character class.
weak_terminators- Override the default weak-terminator character class.
abbreviations- Override the abbreviation set. When
None(default), the set is chosen fromABBREVIATIONS_BY_LANGbased on the resolved language.
Ancestors
- SentenceChunker
- abc.ABC
Methods
def stream(self, *, language: str | None = None) ‑> SentenceChunkStream-
Expand source code
def stream(self, *, language: str | None = None) -> SentenceChunkStream: """Open a push-based stream adapter bound to this chunker.""" from .stream import BufferedSentenceChunkStream return BufferedSentenceChunkStream( tokenize_fn=partial(self.tokenize_raw, language=language), strong_terminators=self._resolve_strong_for_stream(language), min_sentence_len=self._min_sentence_len, )Open a push-based stream adapter bound to this chunker.
def tokenize(self, text: str, *, language: str | None = None) ‑> list[str]-
Expand source code
def tokenize(self, text: str, *, language: str | None = None) -> list[str]: """Split ``text`` into sentence-sized strings. Args: text: Input text. May be a full response or a partial buffer. language: Optional ISO 639-1 override. When omitted, uses the instance's default. Returns: Sentence strings in input order, leading/trailing whitespace stripped, empty segments filtered out. When the input ends without a confirmed sentence boundary, the final element contains the unterminated remainder (the stream adapter uses this to retain buffer state). """ if not text: return [] lang = language or self._language if lang == "auto": lang = detect_script(text[:200]) strong = self._strong weak = self._weak if lang == "el": if ";" not in strong: strong = strong + ";" weak = weak.replace(";", "") if self._override_abbreviations is not None: abbrevs = self._override_abbreviations else: abbrevs = ABBREVIATIONS_BY_LANG.get(lang, ABBREVIATIONS_EN) protected, restore_map = self._protect(text, abbrevs) segments = self._split(protected, strong=strong, weak=weak) return [self._restore(s, restore_map) for s in segments if s.strip()]Split
textinto sentence-sized strings.Args
text- Input text. May be a full response or a partial buffer.
language- Optional ISO 639-1 override. When omitted, uses the instance's default.
Returns
Sentence strings in input order, leading/trailing whitespace stripped, empty segments filtered out. When the input ends without a confirmed sentence boundary, the final element contains the unterminated remainder (the stream adapter uses this to retain buffer state).
def tokenize_raw(self, text: str, *, language: str | None = None) ‑> list[str]-
Expand source code
def tokenize_raw( self, text: str, *, language: str | None = None, ) -> list[str]: """Return raw (un-stripped) segment substrings. Same boundary decisions as :py:meth:`tokenize`, but each segment is returned as it appears in the input text — leading / trailing whitespace intact. Used by the stream adapter so it can re-use the final segment as the continuation buffer without losing the space that separates it from the upcoming chunk. """ if not text: return [] lang = language or self._language if lang == "auto": lang = detect_script(text[:200]) strong = self._strong weak = self._weak if lang == "el": if ";" not in strong: strong = strong + ";" weak = weak.replace(";", "") if self._override_abbreviations is not None: abbrevs = self._override_abbreviations else: abbrevs = ABBREVIATIONS_BY_LANG.get(lang, ABBREVIATIONS_EN) protected, restore_map = self._protect(text, abbrevs) segments = self._split(protected, strong=strong, weak=weak) return [self._restore_raw(s, restore_map) for s in segments]Return raw (un-stripped) segment substrings.
Same boundary decisions as :py:meth:
tokenize, but each segment is returned as it appears in the input text — leading / trailing whitespace intact. Used by the stream adapter so it can re-use the final segment as the continuation buffer without losing the space that separates it from the upcoming chunk.
class BasicTextFilter (*,
language: str = 'auto',
strip_markdown: bool = True,
strip_llm_metadata: bool = True,
collapse_script_parens: bool = True,
normalise_punctuation: bool = True,
expand_symbols: bool = True,
expand_ranges: bool = True,
protect_structural: bool = True,
respect_quotes: bool = True,
respect_parens: bool = True,
ssml_flavor: str = 'none',
verbalize_currency: bool = True,
verbalize_numbers: bool = False,
currency_hint: str | None = None)-
Expand source code
class BasicTextFilter(TextFilter): """Default text filter with six independently-toggleable rules. All rules are on by default. Symbol expansion is suppressed for non-English languages; the TTS provider handles symbol readings for those. """ def __init__( self, *, language: str = "auto", strip_markdown: bool = True, strip_llm_metadata: bool = True, collapse_script_parens: bool = True, normalise_punctuation: bool = True, expand_symbols: bool = True, expand_ranges: bool = True, protect_structural: bool = True, respect_quotes: bool = True, respect_parens: bool = True, ssml_flavor: str = "none", verbalize_currency: bool = True, verbalize_numbers: bool = False, currency_hint: str | None = None, ) -> None: """Initialise the filter. Args: language: ISO 639-1 code or ``"auto"``. Drives symbol expansion (English only) and numeric-range separator word ("to", "से", "から", etc.). strip_markdown: Strip Markdown syntax before TTS. Default ``True``. strip_llm_metadata: Strip common LLM state-leak shapes such as ``(SESSION STATE: intro_delivered=true)`` or ``STATE: language=en``. Safety net — the real fix is a clearer prompt. Default ``True``. collapse_script_parens: When the LLM writes both an English word and its non-Latin gloss (``Tally (टैली)``, ``one click (एक क्लिक)``), collapse to just the gloss so the TTS doesn't speak the phrase twice. Word-count aware: ``Extra manpower (मैनपावर)`` → ``Extra मैनपावर``. Default ``True``. normalise_punctuation: Collapse ``"..."`` → ``"…"`` etc. Default ``True``. expand_symbols: Rewrite ``&``, ``%``, ``#N`` → words (English only). Default ``True``. expand_ranges: Rewrite short digit ranges ``2-3`` → ``2 to 3`` using the locale's separator word. Default ``True``. protect_structural: Preserve URLs / paths / versions / emails through the Markdown and symbol stages. Default ``True``. respect_quotes: (Reserved — currently a no-op; preserved state is tracked per turn so future logic can use it.) respect_parens: (Reserved — same as above.) """ self._language = language self._strip_markdown = strip_markdown self._strip_llm_metadata = strip_llm_metadata self._collapse_script_parens = collapse_script_parens self._normalise_punctuation = normalise_punctuation self._expand_symbols = expand_symbols self._expand_ranges = expand_ranges self._protect_structural = protect_structural self._respect_quotes = respect_quotes self._respect_parens = respect_parens self._ssml_flavor = (ssml_flavor or "none").lower() self._verbalize_currency = verbalize_currency self._verbalize_numbers = verbalize_numbers self._currency_hint = currency_hint.lower() if currency_hint else None self._buffer: str = "" self._in_code_fence: bool = False self._placeholder_counter: int = 0 self._placeholder_map: dict[str, str] = {} @classmethod def for_language(cls, language: str | None) -> "BasicTextFilter": """Build a filter pre-configured for ``language`` — used by ``Pipeline`` auto-wiring. """ lang = language or "auto" return cls( language=lang, verbalize_numbers=lang in _VERBALIZE_LANGS, currency_hint=_CURRENCY_BY_LANG.get(lang), ) async def filter(self, chunks: AsyncIterator[str]) -> AsyncIterator[str]: """Transform an incoming chunk stream.""" await self.reset() try: async for chunk in chunks: if not chunk: continue logger.debug("[chunking] filter ← raw: %r", chunk) self._buffer += chunk emitted_before_fence: list[str] = [] while True: if self._in_code_fence: close_idx = self._buffer.find("```") if close_idx == -1: self._buffer = "" break self._buffer = self._buffer[close_idx + 3 :] self._in_code_fence = False continue open_idx = self._buffer.find("```") if open_idx == -1: break close_idx = self._buffer.find("```", open_idx + 3) if close_idx != -1: break prefix = self._buffer[:open_idx] self._buffer = "" self._in_code_fence = True if prefix: processed = self._process(prefix) if processed: emitted_before_fence.append(processed) break for piece in emitted_before_fence: logger.debug("[chunking] filter → tokenizer: %r", piece) yield piece if self._in_code_fence or not self._buffer: continue safe_cut = self._find_emit_boundary(self._buffer) if safe_cut <= 0: if len(self._buffer) > _MAX_BUFFER: processed = self._process(self._buffer) self._buffer = "" if processed: logger.debug("[chunking] filter → tokenizer: %r", processed) yield processed continue processed = self._process(self._buffer[:safe_cut]) self._buffer = self._buffer[safe_cut:] if processed: logger.debug("[chunking] filter → tokenizer: %r", processed) yield processed if self._in_code_fence: self._buffer = "" self._in_code_fence = False if self._buffer: processed = self._process(self._buffer) self._buffer = "" if processed: logger.debug("[chunking] filter → tokenizer (drain): %r", processed) yield processed except Exception: logger.error("BasicTextFilter errored; yielding buffer raw", exc_info=True) if self._buffer: yield self._buffer self._buffer = "" async def reset(self) -> None: """Reset per-turn state.""" self._buffer = "" self._in_code_fence = False self._placeholder_counter = 0 self._placeholder_map = {} def _find_emit_boundary(self, text: str) -> int: """Return the length of the prefix that can be safely emitted. Assumes fence handling has already happened: this only worries about balanced inline Markdown markers (``**``, ``__``, backticks, brackets). Returns 0 when no safe whitespace boundary exists yet. """ if not text: return 0 for idx in range(len(text) - 1, -1, -1): if not text[idx].isspace(): continue candidate = text[: idx + 1] if self._has_balanced_markers(candidate): return idx + 1 return 0 @staticmethod def _has_balanced_markers(text: str) -> bool: """Cheap balance check for common Markdown / grouping markers. Ensures we never emit a prefix that leaves an opener dangling — needed so that: - inline ``**bold**`` / ``_italic_`` / ``` `code` ``` all close, - Markdown links ``[text](url)`` both brackets close, - and, critically, parentheses balance. The last matters for the metadata stripper: an LLM-leaked ``(SESSION STATE: …)`` must reach the processor as one intact block so the regex can delete it. """ if text.count("**") % 2 != 0: return False if text.count("__") % 2 != 0: return False bt_count = sum(1 for c in text if c == "`") if bt_count % 2 != 0: return False if text.count("[") != text.count("]"): return False if text.count("(") != text.count(")"): return False return True def _process(self, text: str) -> str: if not text: return "" if self._protect_structural: text = self._protect(text) if self._strip_llm_metadata: text = self._strip_metadata(text) if self._strip_markdown: text = self._strip_md(text) if self._collapse_script_parens: text = self._collapse_script_parens_fn(text) if self._normalise_punctuation: text = self._normalise_punct(text) if self._ssml_flavor == "cartesia": text = self._inject_cartesia_ssml(text) elif self._ssml_flavor == "digits": text = self._inject_digits_spaced(text) if self._expand_ranges: text = self._expand_numeric_ranges(text) if self._verbalize_currency: text = expand_currency( text, language=self._language or "en", hint=self._currency_hint, ) if self._verbalize_numbers: text = expand_cardinals(text, language=self._language or "en") if self._expand_symbols and self._language_is_english(): text = self._expand(text) if self._protect_structural: text = self._restore(text) return text def _language_is_english(self) -> bool: lang = (self._language or "").lower() return lang in ("en", "auto", "") def _protect(self, text: str) -> str: def _substitute(match: re.Match[str]) -> str: key = chr(_FILTER_PLACEHOLDER_BASE + self._placeholder_counter) self._placeholder_counter += 1 self._placeholder_map[key] = match.group(0) return key for regex in (URL_REGEX, EMAIL_REGEX, PATH_REGEX, VERSION_REGEX): text = regex.sub(_substitute, text) return text def _restore(self, text: str) -> str: if not self._placeholder_map: return text for key, value in self._placeholder_map.items(): if key in text: text = text.replace(key, value) return text @staticmethod def _strip_metadata(text: str) -> str: """Remove common LLM state-leak shapes. Targets patterns the model produces when it misinterprets a ``SESSION STATE`` / ``INTERNAL`` block in the system prompt as something to announce: parenthesised ALL-CAPS key/value dumps, and bare ``KEYWORD: key=value`` lines. Designed to never match normal parentheticals — the key detector is ``= inside all-caps-prefixed``. """ text = METADATA_PARENS_REGEX.sub("", text) text = METADATA_PREFIX_REGEX.sub("", text) return text @staticmethod def _collapse_script_parens_fn(text: str) -> str: """Collapse ``<Latin word(s)> (<non-Latin gloss>)`` → ``<gloss>``. Walks each ``(…)`` containing a non-ASCII character, counts its words, and removes exactly that many Latin-script words from the immediately-preceding text. Preserves whitespace and non-Latin context around the match. """ matches = list(SCRIPT_MIXED_PAREN_REGEX.finditer(text)) if not matches: return text result: list[str] = [] pos = 0 for m in matches: paren_start = m.start() paren_end = m.end() content = m.group(1).strip() if not content: result.append(text[pos:paren_end]) pos = paren_end continue n_words = len(content.split()) before = text[pos:paren_start] idx = len(before) removed = 0 while removed < n_words: while idx > 0 and before[idx - 1].isspace(): idx -= 1 end = idx while ( idx > 0 and before[idx - 1].isascii() and before[idx - 1].isalnum() ): idx -= 1 if end == idx: break removed += 1 if removed == n_words: kept = before[:idx] if kept and not kept[-1:].isspace(): result.append(kept + " ") else: result.append(kept) result.append(content) else: result.append(before) result.append(text[paren_start:paren_end]) pos = paren_end result.append(text[pos:]) return "".join(result) @staticmethod def _strip_md(text: str) -> str: text = MD_FENCED_CODE_REGEX.sub("", text) text = MD_INLINE_CODE_REGEX.sub("", text) text = MD_IMAGE_REGEX.sub("", text) text = MD_LINK_REGEX.sub(r"\1", text) text = MD_HEADING_REGEX.sub("", text) text = MD_LIST_MARKER_REGEX.sub("", text) text = MD_BLOCKQUOTE_REGEX.sub("", text) text = MD_HR_REGEX.sub("", text) text = MD_TABLE_SEP_REGEX.sub("", text) text = MD_TABLE_PIPE_REGEX.sub(" ", text) text = MD_BOLD_STAR_REGEX.sub(r"\1", text) text = MD_BOLD_UNDER_REGEX.sub(r"\1", text) text = MD_ITALIC_STAR_REGEX.sub(r"\1", text) text = MD_ITALIC_UNDER_REGEX.sub(r"\1", text) return text @staticmethod def _normalise_punct(text: str) -> str: text = PUNCT_ELLIPSIS_REGEX.sub("…", text) text = PUNCT_SPACED_DASH_REGEX.sub(" — ", text) return text @staticmethod def _expand(text: str) -> str: for regex, replacement in SYMBOL_EXPANSIONS_EN: text = regex.sub(replacement, text) return text @staticmethod def _inject_digits_spaced(text: str) -> str: """Replace phones and 6+ digit runs with space-separated digits. Example: ``+91 98765 43210`` → ``9 1 9 8 7 6 5 4 3 2 1 0``. This is the universal fallback when the TTS doesn't support Cartesia's ``<spell>`` tag (Sarvam AI, ElevenLabs, Google TTS, Azure, AWS Polly, etc.). Every modern TTS reads space-separated digits character-by-character naturally — in the active language (Hindi TTS says "नौ आठ सात…", English TTS says "nine eight seven…"). The ``+``, ``(``, ``)``, ``-``, and interior spaces of phone patterns are stripped so the TTS doesn't read "plus paren…" etc. """ def _phone_to_digits(m: re.Match[str]) -> str: digits = re.sub(r"\D", "", m.group(0)) return " ".join(digits) text = SPELL_PHONE_REGEX.sub(_phone_to_digits, text) text = SPELL_LONG_DIGITS_REGEX.sub( lambda m: " ".join(m.group(0)), text, ) return text @staticmethod def _inject_cartesia_ssml(text: str) -> str: """Wrap phone numbers and long digit runs in ``<spell>…</spell>``. Cartesia Sonic-3 reads unwrapped digit strings as natural-language numbers ("one thousand two hundred thirty four"). For IDs, account numbers, OTPs, and phone numbers you almost always want digit-by-digit. The order matters: phone regex runs first (more specific), then the long-digit fallback for any standalone 7+ digit run. """ text = SPELL_PHONE_REGEX.sub(lambda m: f"<spell>{m.group(0)}</spell>", text) text = SPELL_LONG_DIGITS_REGEX.sub( lambda m: f"<spell>{m.group(0)}</spell>", text ) return text def _expand_numeric_ranges(self, text: str) -> str: """Rewrite ``N-M`` as ``N <sep> M`` using the locale's separator word. Regex width is adaptive: * With ``ssml_flavor="cartesia"`` (phones already wrapped in ``<spell>…</spell>``) — uses the WIDE regex (up to 4 digits per side), so ``500-1000 entries`` becomes ``500 से 1000 entries``. * Otherwise — uses the narrow 3-digit regex so an English phone like ``555-1234`` is not mistaken for a range. * For non-Latin agents (Hindi, Tamil, etc.), we also use the wide regex because phone numbers are rare in those prose contexts, while range patterns (दाम पांच सौ से हज़ार, etc.) are common. """ lang = (self._language or "").lower() separator = RANGE_SEPARATOR_BY_LANG.get(lang) if separator is None: return text regex = ( RANGE_REGEX_WIDE if self._ssml_flavor == "cartesia" or not self._language_is_english() else RANGE_REGEX ) return regex.sub( lambda m: f"{m.group(1)} {separator} {m.group(2)}", text, )Default text filter with six independently-toggleable rules.
All rules are on by default. Symbol expansion is suppressed for non-English languages; the TTS provider handles symbol readings for those.
Initialise the filter.
Args
language- ISO 639-1 code or
"auto". Drives symbol expansion (English only) and numeric-range separator word ("to", "से", "から", etc.). strip_markdown- Strip Markdown syntax before TTS. Default
True. strip_llm_metadata- Strip common LLM state-leak shapes such as
(SESSION STATE: intro_delivered=true)orSTATE: language=en. Safety net — the real fix is a clearer prompt. DefaultTrue. collapse_script_parens- When the LLM writes both an English word
and its non-Latin gloss (
Tally (टैली),one click (एक क्लिक)), collapse to just the gloss so the TTS doesn't speak the phrase twice. Word-count aware:Extra manpower (मैनपावर)→Extra मैनपावर<code>. Default </code>True. normalise_punctuation- Collapse
"..."→"…"etc. DefaultTrue. expand_symbols- Rewrite
&,%,#N→ words (English only). DefaultTrue. expand_ranges- Rewrite short digit ranges
2-3→2 to 3using the locale's separator word. DefaultTrue. protect_structural- Preserve URLs / paths / versions / emails
through the Markdown and symbol stages. Default
True. respect_quotes- (Reserved — currently a no-op; preserved state is tracked per turn so future logic can use it.)
respect_parens- (Reserved — same as above.)
Ancestors
- TextFilter
- abc.ABC
Static methods
def for_language(language: str | None) ‑> BasicTextFilter-
Build a filter pre-configured for
language— used byPipelineauto-wiring.
Methods
async def filter(self, chunks: AsyncIterator[str]) ‑> AsyncIterator[str]-
Expand source code
async def filter(self, chunks: AsyncIterator[str]) -> AsyncIterator[str]: """Transform an incoming chunk stream.""" await self.reset() try: async for chunk in chunks: if not chunk: continue logger.debug("[chunking] filter ← raw: %r", chunk) self._buffer += chunk emitted_before_fence: list[str] = [] while True: if self._in_code_fence: close_idx = self._buffer.find("```") if close_idx == -1: self._buffer = "" break self._buffer = self._buffer[close_idx + 3 :] self._in_code_fence = False continue open_idx = self._buffer.find("```") if open_idx == -1: break close_idx = self._buffer.find("```", open_idx + 3) if close_idx != -1: break prefix = self._buffer[:open_idx] self._buffer = "" self._in_code_fence = True if prefix: processed = self._process(prefix) if processed: emitted_before_fence.append(processed) break for piece in emitted_before_fence: logger.debug("[chunking] filter → tokenizer: %r", piece) yield piece if self._in_code_fence or not self._buffer: continue safe_cut = self._find_emit_boundary(self._buffer) if safe_cut <= 0: if len(self._buffer) > _MAX_BUFFER: processed = self._process(self._buffer) self._buffer = "" if processed: logger.debug("[chunking] filter → tokenizer: %r", processed) yield processed continue processed = self._process(self._buffer[:safe_cut]) self._buffer = self._buffer[safe_cut:] if processed: logger.debug("[chunking] filter → tokenizer: %r", processed) yield processed if self._in_code_fence: self._buffer = "" self._in_code_fence = False if self._buffer: processed = self._process(self._buffer) self._buffer = "" if processed: logger.debug("[chunking] filter → tokenizer (drain): %r", processed) yield processed except Exception: logger.error("BasicTextFilter errored; yielding buffer raw", exc_info=True) if self._buffer: yield self._buffer self._buffer = ""Transform an incoming chunk stream.
async def reset(self) ‑> None-
Expand source code
async def reset(self) -> None: """Reset per-turn state.""" self._buffer = "" self._in_code_fence = False self._placeholder_counter = 0 self._placeholder_map = {}Reset per-turn state.
class BufferedSentenceChunkStream (*,
tokenize_fn: Callable[[str], list[str]],
strong_terminators: str,
min_sentence_len: int = 20,
idle_flush_ms: int = 150,
idle_min_chars: int = 12)-
Expand source code
class BufferedSentenceChunkStream(SentenceChunkStream): """Stream adapter that wraps a one-shot tokenize function. Strategy: * Accumulate incoming text in a buffer. * After each ``push_text``, call the tokenize function on the buffer. When it returns more than one segment, every segment except the last is confirmed complete (they're followed by at least one more character in the buffer, which the tokenizer interprets as lookahead). Emit them. The last segment is the in-flight remainder; keep it as the new buffer. * An idle-flush background task watches for silence. If nothing has been pushed for ``idle_flush_ms`` and the buffer holds enough text, cut on a word or CJK-character boundary and emit. This bounds time-to-audio for terminator-less responses. * ``flush()`` and ``end_input()`` drain the buffer as a single trailing segment and close the iterator cleanly. """ def __init__( self, *, tokenize_fn: Callable[[str], list[str]], strong_terminators: str, min_sentence_len: int = 20, idle_flush_ms: int = 150, idle_min_chars: int = 12, ) -> None: """Initialise the stream. Args: tokenize_fn: Callable that maps buffer string → list of segments. Typically ``partial(tokenizer.tokenize, language=...)``. strong_terminators: Character class used for fast lookahead checks. min_sentence_len: Passed through for consistency (tokenize_fn already knows this; kept here for the idle-flush heuristic). idle_flush_ms: Milliseconds of inactivity before a word-boundary cut fires. Clamped to at least 100 ms. idle_min_chars: Minimum buffer length before idle-flush is allowed to fire. Prevents emitting 1-3 char fragments on slow LLMs. """ self._tokenize_fn = tokenize_fn self._strong_set = set(strong_terminators) self._min_sentence_len = max(1, int(min_sentence_len)) self._idle_flush_s = max(0.1, idle_flush_ms / 1000.0) self._idle_min_chars = max(1, int(idle_min_chars)) self._buffer: str = "" self._buffer_lock = asyncio.Lock() self._queue: asyncio.Queue[object] = asyncio.Queue() self._closed: bool = False self._last_push_monotonic: float = time.monotonic() self._idle_task: asyncio.Task[None] | None = None async def push_text(self, text: str) -> None: """Feed more text into the stream.""" if self._closed: logger.debug("push_text on closed BufferedSentenceChunkStream; ignored") return if not text: return async with self._buffer_lock: self._buffer += text self._last_push_monotonic = time.monotonic() self._ensure_idle_task() await self._try_emit_locked() async def flush(self) -> None: """Emit any buffered text as a single trailing segment.""" async with self._buffer_lock: await self._flush_locked() async def end_input(self) -> None: """Signal end of input; drain the buffer and close the iterator.""" async with self._buffer_lock: await self._flush_locked() if not self._closed: self._closed = True await self._queue.put(_EOS) if self._idle_task is not None and not self._idle_task.done(): self._idle_task.cancel() def __aiter__(self) -> AsyncIterator[str]: return self async def __anext__(self) -> str: item = await self._queue.get() if item is _EOS: raise StopAsyncIteration return item async def _try_emit_locked(self) -> None: """Emit every confirmed-complete sentence from the current buffer. Called with ``_buffer_lock`` held. """ if not self._buffer: return try: segments = self._tokenize_fn(self._buffer) except Exception: logger.error("Tokenizer raised; flushing buffer as-is", exc_info=True) remainder = self._buffer.strip() self._buffer = "" if remainder: await self._queue.put(remainder) return if len(segments) <= 1: return for segment in segments[:-1]: stripped = segment.strip() if stripped: logger.debug("[chunking] tokenizer emit: %r", stripped) await self._queue.put(stripped) self._buffer = segments[-1] async def _flush_locked(self) -> None: """Force-emit any buffered text. Called with ``_buffer_lock`` held.""" if not self._buffer: return try: segments = self._tokenize_fn(self._buffer) except Exception: logger.error("Tokenizer raised during flush", exc_info=True) segments = [self._buffer] logger.debug("[chunking] flush: buffer=%r -> segments=%r", self._buffer, segments) self._buffer = "" for segment in segments: stripped = segment.strip() if stripped: logger.debug("[chunking] flush emit: %r", stripped) await self._queue.put(stripped) def _ensure_idle_task(self) -> None: """Spawn the idle-flush watchdog on first push.""" if self._idle_task is None or self._idle_task.done(): self._idle_task = asyncio.create_task(self._idle_monitor()) async def _idle_monitor(self) -> None: """Fire word-boundary cuts when no push arrives for ``idle_flush_ms``.""" try: while not self._closed: await asyncio.sleep(self._idle_flush_s) if self._closed: return if time.monotonic() - self._last_push_monotonic < self._idle_flush_s: continue await self._idle_flush() except asyncio.CancelledError: raise async def _idle_flush(self) -> None: """Emit a word-boundary cut from the current buffer, if it's long enough.""" async with self._buffer_lock: if len(self._buffer) < self._idle_min_chars: return stripped = self._buffer.rstrip() if stripped and stripped[-1] in self._strong_set: self._buffer = "" logger.debug( "Idle-flush emitting complete sentence %d chars", len(stripped) ) await self._queue.put(stripped) return cut_index = self._find_fallback_cut(self._buffer) if cut_index <= 0: return remainder = self._buffer[cut_index:].strip() if len(remainder) < self._idle_min_chars: return segment = self._buffer[:cut_index].strip() self._buffer = self._buffer[cut_index:].lstrip() if segment: logger.debug("Idle-flush emitting %d chars", len(segment)) await self._queue.put(segment) def _find_fallback_cut(self, text: str) -> int: """Pick the best place to cut ``text`` in the absence of a terminator. Prefers the last space. If there are no spaces (CJK / Thai / Lao / Myanmar / Khmer prose), cuts at the last character-range boundary. Returns 0 when no safe cut exists. """ last_space = text.rfind(" ") if last_space > 0: return last_space last_cjk = -1 for m in NO_SPACE_SCRIPTS_REGEX.finditer(text): last_cjk = m.end() if last_cjk > 0 and last_cjk < len(text): return last_cjk return 0Stream adapter that wraps a one-shot tokenize function.
Strategy:
- Accumulate incoming text in a buffer.
- After each
push_text, call the tokenize function on the buffer. When it returns more than one segment, every segment except the last is confirmed complete (they're followed by at least one more character in the buffer, which the tokenizer interprets as lookahead). Emit them. The last segment is the in-flight remainder; keep it as the new buffer. - An idle-flush background task watches for silence. If nothing has been
pushed for
idle_flush_msand the buffer holds enough text, cut on a word or CJK-character boundary and emit. This bounds time-to-audio for terminator-less responses. flush()andend_input()drain the buffer as a single trailing segment and close the iterator cleanly.
Initialise the stream.
Args
tokenize_fn- Callable that maps buffer string → list of segments.
Typically
partial(tokenizer.tokenize, language=...). strong_terminators- Character class used for fast lookahead checks.
min_sentence_len- Passed through for consistency (tokenize_fn already knows this; kept here for the idle-flush heuristic).
idle_flush_ms- Milliseconds of inactivity before a word-boundary cut fires. Clamped to at least 100 ms.
idle_min_chars- Minimum buffer length before idle-flush is allowed to fire. Prevents emitting 1-3 char fragments on slow LLMs.
Ancestors
- SentenceChunkStream
- abc.ABC
Methods
async def end_input(self) ‑> None-
Expand source code
async def end_input(self) -> None: """Signal end of input; drain the buffer and close the iterator.""" async with self._buffer_lock: await self._flush_locked() if not self._closed: self._closed = True await self._queue.put(_EOS) if self._idle_task is not None and not self._idle_task.done(): self._idle_task.cancel()Signal end of input; drain the buffer and close the iterator.
async def flush(self) ‑> None-
Expand source code
async def flush(self) -> None: """Emit any buffered text as a single trailing segment.""" async with self._buffer_lock: await self._flush_locked()Emit any buffered text as a single trailing segment.
async def push_text(self, text: str) ‑> None-
Expand source code
async def push_text(self, text: str) -> None: """Feed more text into the stream.""" if self._closed: logger.debug("push_text on closed BufferedSentenceChunkStream; ignored") return if not text: return async with self._buffer_lock: self._buffer += text self._last_push_monotonic = time.monotonic() self._ensure_idle_task() await self._try_emit_locked()Feed more text into the stream.
class EnglishHyphenator (patterns: str, exceptions: str = '')-
Expand source code
class EnglishHyphenator: """Trie-based hyphenator using Liang's algorithm. Instantiate via :func:`_get_hyphenator` (cached), or call :func:`hyphenate_english` for one-shot use. """ def __init__(self, patterns: str, exceptions: str = "") -> None: self.tree: dict[Any, Any] = {} for pattern in patterns.split(): self._insert_pattern(pattern) self.exceptions: dict[str, list[int]] = {} for ex in exceptions.split(): points = [0] + [int(h == "-") for h in re.split(r"[a-z]", ex)] self.exceptions[ex.replace("-", "")] = points def _insert_pattern(self, pattern: str) -> None: chars = re.sub(r"[0-9]", "", pattern) points = [int(d or 0) for d in re.split(r"[.a-z]", pattern)] t = self.tree for c in chars: if c not in t: t[c] = {} t = t[c] t[None] = points def hyphenate(self, word: str) -> list[str]: """Return the word split at every legal hyphenation point. Short words (≤ 4 chars) pass through unchanged. Words in the exception list use the stored split; everything else runs the trie walk. """ if len(word) <= 4: return [word] if word.lower() in self.exceptions: points = self.exceptions[word.lower()] else: work = "." + word.lower() + "." points = [0] * (len(work) + 1) for i in range(len(work)): t = self.tree for c in work[i:]: if c in t: t = t[c] if None in t: p = t[None] for j, p_j in enumerate(p): points[i + j] = max(points[i + j], p_j) else: break # No hyphens in the first or last two chars. points[1] = points[2] = points[-2] = points[-3] = 0 pieces = [""] for c, p in zip(word, points[2:]): pieces[-1] += c if p % 2: pieces.append("") return piecesTrie-based hyphenator using Liang's algorithm.
Instantiate via :func:
_get_hyphenator(cached), or call :func:hyphenate_english()for one-shot use.Methods
def hyphenate(self, word: str) ‑> list[str]-
Expand source code
def hyphenate(self, word: str) -> list[str]: """Return the word split at every legal hyphenation point. Short words (≤ 4 chars) pass through unchanged. Words in the exception list use the stored split; everything else runs the trie walk. """ if len(word) <= 4: return [word] if word.lower() in self.exceptions: points = self.exceptions[word.lower()] else: work = "." + word.lower() + "." points = [0] * (len(work) + 1) for i in range(len(work)): t = self.tree for c in work[i:]: if c in t: t = t[c] if None in t: p = t[None] for j, p_j in enumerate(p): points[i + j] = max(points[i + j], p_j) else: break # No hyphens in the first or last two chars. points[1] = points[2] = points[-2] = points[-3] = 0 pieces = [""] for c, p in zip(word, points[2:]): pieces[-1] += c if p % 2: pieces.append("") return piecesReturn the word split at every legal hyphenation point.
Short words (≤ 4 chars) pass through unchanged. Words in the exception list use the stored split; everything else runs the trie walk.
class IndicScriptTransliterator (*, source: str, target: str)-
Expand source code
class IndicScriptTransliterator: """Thin wrapper around ``UnicodeIndicTransliterator``. Useful as a ``TextFilter``-adjacent utility when your LLM emits Hindi but the TTS speaks Telugu (or similar cross-Indic scenarios). Example:: from videosdk.agents.tokenize import IndicScriptTransliterator trans = IndicScriptTransliterator(source="hi", target="te") out = trans.convert("नमस्ते दुनिया") # out == "నమస్తే దునియా" (approximate phonetic conversion) """ def __init__(self, *, source: str, target: str) -> None: self._source = source self._target = target def convert(self, text: str) -> str: try: from indicnlp.transliterate.unicode_transliterate import ( UnicodeIndicTransliterator, ) except ImportError as exc: raise ImportError( "indic-nlp-library is missing — it ships as a dependency of " "videosdk-agents; reinstall with: uv pip install -U videosdk-agents" ) from exc return UnicodeIndicTransliterator.transliterate(text, self._source, self._target)Thin wrapper around
UnicodeIndicTransliterator.Useful as a
TextFilter-adjacent utility when your LLM emits Hindi but the TTS speaks Telugu (or similar cross-Indic scenarios).Example::
from videosdk.agents.tokenize import IndicScriptTransliterator trans = IndicScriptTransliterator(source="hi", target="te") out = trans.convert("नमस्ते दुनिया") # out == "నమస్తే దునియా" (approximate phonetic conversion)Methods
def convert(self, text: str) ‑> str-
Expand source code
def convert(self, text: str) -> str: try: from indicnlp.transliterate.unicode_transliterate import ( UnicodeIndicTransliterator, ) except ImportError as exc: raise ImportError( "indic-nlp-library is missing — it ships as a dependency of " "videosdk-agents; reinstall with: uv pip install -U videosdk-agents" ) from exc return UnicodeIndicTransliterator.transliterate(text, self._source, self._target)
class IndicSentenceChunker (*, language: str = 'hi', min_sentence_len: int = 1, idle_flush_ms: int = 400)-
Expand source code
class IndicSentenceChunker(SentenceChunker): """Sentence chunker for Indic scripts using indic-nlp-library. Falls back gracefully for unsupported ``language`` values by returning the input text as a single segment — the ``BufferedSentenceChunkStream`` then relies on idle-flush for phrasing. """ def __init__( self, *, language: str = "hi", min_sentence_len: int = 1, idle_flush_ms: int = 400, ) -> None: """Initialise the chunker. Args: language: Default ISO 639-1 code. Override per-turn via ``tokenize(..., language=...)`` or ``stream(language=...)``. min_sentence_len: Passed through to the ``BufferedSentenceChunkStream`` idle-flush heuristic. Default 1 (Indic sentences can be very short). idle_flush_ms: Idle timeout before a word-boundary cut fires. """ if language not in INDIC_LANGS: logger.warning( "IndicSentenceChunker language %r is not in the supported set " "%s; sentence splitting may degrade. Consider using " "BasicSentenceChunker for non-Indic languages.", language, sorted(INDIC_LANGS), ) self._default_language = language self._min_sentence_len = int(min_sentence_len) self._idle_flush_ms = int(idle_flush_ms) self._split_fn: "_SentenceSplit | None" = None def tokenize(self, text: str, *, language: str | None = None) -> list[str]: if not text or not text.strip(): return [] lang = self._resolve_language(language) split_fn = self._get_split_fn() try: sentences = split_fn(text, lang=lang) except Exception: # pragma: no cover - defensive logger.warning( "indic-nlp-library sentence_split raised for lang=%r; returning " "input as single segment", lang, exc_info=True, ) return [text.strip()] return [s.strip() for s in sentences if s and s.strip()] def tokenize_raw(self, text: str, *, language: str | None = None) -> list[str]: """Return segments with original whitespace preserved. Used by the stream adapter: the ``BufferedSentenceChunkStream`` re-uses the last segment as the continuation buffer, so losing edge whitespace would concatenate unrelated words (e.g. ``व्यापार`` + ``से`` → ``व्यापारसे``) when the next chunk arrives. indic-nlp-library's ``sentence_split`` returns stripped strings, so we map each stripped segment back to its position in the original text and slice raw ranges, preserving the whitespace that originally sat between sentences. """ if not text: return [] lang = self._resolve_language(language) split_fn = self._get_split_fn() try: stripped_sentences = split_fn(text, lang=lang) except Exception: # pragma: no cover - defensive return [text] raw_segments: list[str] = [] cursor = 0 for s in stripped_sentences: core = s.strip() if s else "" if not core: continue idx = text.find(core, cursor) if idx < 0: # Shouldn't happen — defensive fallback keeps the stripped text. raw_segments.append(core) continue end = idx + len(core) raw_segments.append(text[cursor:end]) cursor = end if cursor < len(text): if raw_segments: raw_segments[-1] = raw_segments[-1] + text[cursor:] else: raw_segments.append(text[cursor:]) return raw_segments def stream(self, *, language: str | None = None) -> SentenceChunkStream: lang = self._resolve_language(language) tokenize_fn: Callable[[str], list[str]] = partial( self.tokenize_raw, language=lang ) return BufferedSentenceChunkStream( tokenize_fn=tokenize_fn, strong_terminators="।॥.!?", # Devanagari danda + Latin terminators min_sentence_len=self._min_sentence_len, idle_flush_ms=self._idle_flush_ms, ) def _resolve_language(self, language: str | None) -> str: """Pick the language code to pass to the upstream library.""" lang = (language or self._default_language or "hi").lower() if lang == "auto": return self._default_language return lang def _get_split_fn(self) -> "_SentenceSplit": """Lazy-load the upstream function on first use.""" if self._split_fn is None: self._split_fn = _load_sentence_split() return self._split_fnSentence chunker for Indic scripts using indic-nlp-library.
Falls back gracefully for unsupported
languagevalues by returning the input text as a single segment — theBufferedSentenceChunkStreamthen relies on idle-flush for phrasing.Initialise the chunker.
Args
language- Default ISO 639-1 code. Override per-turn via
tokenize(..., language=...)orstream(language=...). min_sentence_len- Passed through to the
BufferedSentenceChunkStreamidle-flush heuristic. Default 1 (Indic sentences can be very short). idle_flush_ms- Idle timeout before a word-boundary cut fires.
Ancestors
- SentenceChunker
- abc.ABC
Methods
def tokenize_raw(self, text: str, *, language: str | None = None) ‑> list[str]-
Expand source code
def tokenize_raw(self, text: str, *, language: str | None = None) -> list[str]: """Return segments with original whitespace preserved. Used by the stream adapter: the ``BufferedSentenceChunkStream`` re-uses the last segment as the continuation buffer, so losing edge whitespace would concatenate unrelated words (e.g. ``व्यापार`` + ``से`` → ``व्यापारसे``) when the next chunk arrives. indic-nlp-library's ``sentence_split`` returns stripped strings, so we map each stripped segment back to its position in the original text and slice raw ranges, preserving the whitespace that originally sat between sentences. """ if not text: return [] lang = self._resolve_language(language) split_fn = self._get_split_fn() try: stripped_sentences = split_fn(text, lang=lang) except Exception: # pragma: no cover - defensive return [text] raw_segments: list[str] = [] cursor = 0 for s in stripped_sentences: core = s.strip() if s else "" if not core: continue idx = text.find(core, cursor) if idx < 0: # Shouldn't happen — defensive fallback keeps the stripped text. raw_segments.append(core) continue end = idx + len(core) raw_segments.append(text[cursor:end]) cursor = end if cursor < len(text): if raw_segments: raw_segments[-1] = raw_segments[-1] + text[cursor:] else: raw_segments.append(text[cursor:]) return raw_segmentsReturn segments with original whitespace preserved.
Used by the stream adapter: the
BufferedSentenceChunkStreamre-uses the last segment as the continuation buffer, so losing edge whitespace would concatenate unrelated words (e.g.व्यापार+से→व्यापारसे) when the next chunk arrives.indic-nlp-library's
sentence_splitreturns stripped strings, so we map each stripped segment back to its position in the original text and slice raw ranges, preserving the whitespace that originally sat between sentences.
Inherited members
class SentenceChunkStream-
Expand source code
class SentenceChunkStream(ABC): """Push-based stream adapter for sentence chunkers. A ``SentenceChunkStream`` is single-use: open it with ``SentenceChunker.stream()``, push text as deltas arrive, call ``end_input()`` when the upstream source closes, and iterate over it to receive sentence-sized strings. """ @abstractmethod async def push_text(self, text: str) -> None: """Feed more text into the stream. Args: text: A chunk of text. May be a single character or a multi-word fragment. Need not align with word or sentence boundaries. """ @abstractmethod async def flush(self) -> None: """Force-emit any buffered text as a single trailing sentence.""" @abstractmethod async def end_input(self) -> None: """Signal that no more text will arrive. Drains the buffer and closes the stream.""" @abstractmethod def __aiter__(self) -> AsyncIterator[str]: ...Push-based stream adapter for sentence chunkers.
A
SentenceChunkStreamis single-use: open it withSentenceChunker.stream(), push text as deltas arrive, callend_input()when the upstream source closes, and iterate over it to receive sentence-sized strings.Ancestors
- abc.ABC
Subclasses
Methods
async def end_input(self) ‑> None-
Expand source code
@abstractmethod async def end_input(self) -> None: """Signal that no more text will arrive. Drains the buffer and closes the stream."""Signal that no more text will arrive. Drains the buffer and closes the stream.
async def flush(self) ‑> None-
Expand source code
@abstractmethod async def flush(self) -> None: """Force-emit any buffered text as a single trailing sentence."""Force-emit any buffered text as a single trailing sentence.
async def push_text(self, text: str) ‑> None-
Expand source code
@abstractmethod async def push_text(self, text: str) -> None: """Feed more text into the stream. Args: text: A chunk of text. May be a single character or a multi-word fragment. Need not align with word or sentence boundaries. """Feed more text into the stream.
Args
text- A chunk of text. May be a single character or a multi-word fragment. Need not align with word or sentence boundaries.
class SentenceChunker-
Expand source code
class SentenceChunker(ABC): """Abstract chunker that splits text into sentence-sized segments for TTS.""" @abstractmethod def tokenize(self, text: str, *, language: str | None = None) -> list[str]: """Split the given text into sentences in one shot. Args: text: Full text to split. language: Optional ISO 639-1 language hint. When omitted, the chunker uses its internal heuristic (usually script detection). Returns: A list of sentence-sized strings with leading/trailing whitespace stripped. """ @abstractmethod def stream(self, *, language: str | None = None) -> SentenceChunkStream: """Open a push-based stream for incremental chunking. Args: language: Optional ISO 639-1 language hint forwarded to ``tokenize``. Returns: A fresh ``SentenceChunkStream`` instance. """Abstract chunker that splits text into sentence-sized segments for TTS.
Ancestors
- abc.ABC
Subclasses
Methods
def stream(self, *, language: str | None = None) ‑> SentenceChunkStream-
Expand source code
@abstractmethod def stream(self, *, language: str | None = None) -> SentenceChunkStream: """Open a push-based stream for incremental chunking. Args: language: Optional ISO 639-1 language hint forwarded to ``tokenize``. Returns: A fresh ``SentenceChunkStream`` instance. """Open a push-based stream for incremental chunking.
Args
language- Optional ISO 639-1 language hint forwarded to
tokenize.
Returns
A fresh
SentenceChunkStreaminstance. def tokenize(self, text: str, *, language: str | None = None) ‑> list[str]-
Expand source code
@abstractmethod def tokenize(self, text: str, *, language: str | None = None) -> list[str]: """Split the given text into sentences in one shot. Args: text: Full text to split. language: Optional ISO 639-1 language hint. When omitted, the chunker uses its internal heuristic (usually script detection). Returns: A list of sentence-sized strings with leading/trailing whitespace stripped. """Split the given text into sentences in one shot.
Args
text- Full text to split.
language- Optional ISO 639-1 language hint. When omitted, the chunker uses its internal heuristic (usually script detection).
Returns
A list of sentence-sized strings with leading/trailing whitespace stripped.
class TextFilter-
Expand source code
class TextFilter(ABC): """Pre-chunking text transformation. Filters sit *before* the chunker. They may be stateful across a turn (e.g. tracking whether the stream is currently inside a Markdown code fence) and are reset between turns via ``reset()``. """ @abstractmethod def filter(self, chunks: AsyncIterator[str]) -> AsyncIterator[str]: """Transform an input stream of text chunks. Args: chunks: Async iterator of raw LLM deltas. Yields: Filtered text chunks ready to be consumed by a ``SentenceChunker``. """ @abstractmethod async def reset(self) -> None: """Reset internal state between turns."""Pre-chunking text transformation.
Filters sit before the chunker. They may be stateful across a turn (e.g. tracking whether the stream is currently inside a Markdown code fence) and are reset between turns via
reset().Ancestors
- abc.ABC
Subclasses
Methods
def filter(self, chunks: AsyncIterator[str]) ‑> AsyncIterator[str]-
Expand source code
@abstractmethod def filter(self, chunks: AsyncIterator[str]) -> AsyncIterator[str]: """Transform an input stream of text chunks. Args: chunks: Async iterator of raw LLM deltas. Yields: Filtered text chunks ready to be consumed by a ``SentenceChunker``. """Transform an input stream of text chunks.
Args
chunks- Async iterator of raw LLM deltas.
Yields
Filtered text chunks ready to be consumed by a
SentenceChunker. async def reset(self) ‑> None-
Expand source code
@abstractmethod async def reset(self) -> None: """Reset internal state between turns."""Reset internal state between turns.