Module agents.tokenize

Text chunking and filtering for the cascade pipeline.

The public surface exposes the abstract interfaces and the default implementations. External chunker plugins can plug into SentenceChunker without touching core code.

Naming note: these classes are intentionally distinct from other frameworks' *Tokenizer types — VideoSDK chunks LLM output into sentence-sized segments for streaming TTS, hence the Chunker family.

Sub-modules

agents.tokenize.base
agents.tokenize.basic
agents.tokenize.filters
agents.tokenize.hyphenate

English word hyphenation via Frank Liang's algorithm.

agents.tokenize.indic
agents.tokenize.patterns
agents.tokenize.stream
agents.tokenize.verbalize

Functions

def detect_script(sample: str, *, default: str = 'en') ‑> str
Expand source code
def detect_script(sample: str, *, default: str = "en") -> str:
    """Return an ISO 639-1 hint based on the first non-Latin script found.

    Looks at the sample string for characters from each supported script in
    priority order. Falls back to ``default`` when only Latin characters
    (or whitespace / digits / punctuation) are present.

    Args:
        sample: Text to inspect. The first ~200 characters are sufficient.
        default: Language code returned when no non-Latin script is detected.

    Returns:
        An ISO 639-1 code. The hint is used for abbreviation selection and
        locale-specific disambiguation (e.g. Greek ``;`` handling); it does
        not gate which strong terminators are recognised.
    """
    if not sample:
        return default
    scores: dict[str, int] = {}
    for code, pattern in _SCRIPT_RANGES:
        matches = pattern.findall(sample)
        if matches:
            scores[code] = len(matches)
    if not scores:
        return default

    if "ja" in scores:
        return "ja"
    if "ko" in scores:
        return "ko"
    return max(scores.items(), key=lambda kv: kv[1])[0]

Return an ISO 639-1 hint based on the first non-Latin script found.

Looks at the sample string for characters from each supported script in priority order. Falls back to default when only Latin characters (or whitespace / digits / punctuation) are present.

Args

sample
Text to inspect. The first ~200 characters are sufficient.
default
Language code returned when no non-Latin script is detected.

Returns

An ISO 639-1 code. The hint is used for abbreviation selection and locale-specific disambiguation (e.g. Greek ; handling); it does not gate which strong terminators are recognised.

def hyphenate_english(word: str) ‑> list[str]
Expand source code
def hyphenate_english(word: str) -> list[str]:
    """Split ``word`` at every legal English hyphenation point.

    Args:
        word: An English word. Short words (≤4 chars) and non-alphabetic
            tokens are returned as a single-element list.

    Returns:
        A list of substrings whose concatenation equals ``word``. Boundaries
        between consecutive pieces are valid hyphenation points.

    Examples:
        >>> hyphenate_english("hyphenation")
        ['hy', 'phen', 'ation']
        >>> hyphenate_english("the")
        ['the']
        >>> hyphenate_english("associate")
        ['as', 'so', 'ciate']
    """
    return _get_hyphenator().hyphenate(word)

Split word at every legal English hyphenation point.

Args

word
An English word. Short words (≤4 chars) and non-alphabetic tokens are returned as a single-element list.

Returns

A list of substrings whose concatenation equals word. Boundaries between consecutive pieces are valid hyphenation points.

Examples

>>> hyphenate_english("hyphenation")
['hy', 'phen', 'ation']
>>> hyphenate_english("the")
['the']
>>> hyphenate_english("associate")
['as', 'so', 'ciate']
def normalize_lang_code(code: str | None) ‑> str | None
Expand source code
def normalize_lang_code(code: str | None) -> str | None:
    """Reduce an STT/TTS language tag to a bare ISO 639-1 code, or ``None``.
    """
    if not code:
        return None
    base = code.split("-", 1)[0].split("_", 1)[0].strip().lower()
    if not base or base == "auto":
        return None
    return base

Reduce an STT/TTS language tag to a bare ISO 639-1 code, or None.

def pre_warm_tokenizer() ‑> None
Expand source code
def pre_warm_tokenizer() -> None:
    """Eagerly import ``indic-nlp-library`` so the first ``.tokenize()`` call is cheap.

    The underlying ``indicnlp.tokenize.sentence_tokenize`` module performs its
    expensive initialisation on first import (~6s on a cold Python process).
    Calling this at worker start — alongside ``TurnDetector.pre_download_model()``
    — moves that cost out of the first conversational turn.
    """
    _load_sentence_split()

Eagerly import indic-nlp-library so the first .tokenize() call is cheap.

The underlying indicnlp.tokenize.sentence_tokenize module performs its expensive initialisation on first import (~6s on a cold Python process). Calling this at worker start — alongside TurnDetector.pre_download_model() — moves that cost out of the first conversational turn.

Classes

class BasicSentenceChunker (*,
language: str = 'auto',
min_sentence_len: int = 20,
strong_terminators: str = '.!?…‼⁇⁈⁉。!?।॥؟۔؞։።፧፨།༎༏༐༑။។៕\n',
weak_terminators: str = ',;:、,;:،؛՝՛፣፤፥་၊៖—–',
abbreviations: frozenset[str] | None = None)
Expand source code
class BasicSentenceChunker(SentenceChunker):
    """Default multilingual, Unicode-aware sentence chunker.

    Works correctly without a ``language`` hint for every major world script.
    An explicit hint sharpens behaviour (abbreviation set, Greek ``;`` upgrade)
    but is not required.

    Args:
        language: ISO 639-1 code or ``"auto"`` (default). ``"auto"`` triggers
            script detection on the first ~200 characters of each tokenize call.
        min_sentence_len: Minimum character length before a weak terminator
            (``, ; :`` etc.) is treated as a cut boundary. Strong terminators
            (``. ! ? 。 ! ? । 。 ۔ ։ ።`` …) always cut regardless of length.
        strong_terminators: Override the default strong-terminator character class.
        weak_terminators: Override the default weak-terminator character class.
        abbreviations: Override the abbreviation set. When ``None`` (default),
            the set is chosen from ``ABBREVIATIONS_BY_LANG`` based on the
            resolved language.
    """

    def __init__(
        self,
        *,
        language: str = "auto",
        min_sentence_len: int = 20,
        strong_terminators: str = STRONG_TERMINATORS,
        weak_terminators: str = WEAK_TERMINATORS,
        abbreviations: frozenset[str] | None = None,
    ) -> None:
        self._language = language
        self._min_sentence_len = max(1, int(min_sentence_len))
        self._strong = strong_terminators
        self._weak = weak_terminators
        self._override_abbreviations = abbreviations

    def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
        """Split ``text`` into sentence-sized strings.

        Args:
            text: Input text. May be a full response or a partial buffer.
            language: Optional ISO 639-1 override. When omitted, uses the
                instance's default.

        Returns:
            Sentence strings in input order, leading/trailing whitespace
            stripped, empty segments filtered out. When the input ends without
            a confirmed sentence boundary, the final element contains the
            unterminated remainder (the stream adapter uses this to retain
            buffer state).
        """
        if not text:
            return []

        lang = language or self._language
        if lang == "auto":
            lang = detect_script(text[:200])

        strong = self._strong
        weak = self._weak
        if lang == "el":
            if ";" not in strong:
                strong = strong + ";"
            weak = weak.replace(";", "")

        if self._override_abbreviations is not None:
            abbrevs = self._override_abbreviations
        else:
            abbrevs = ABBREVIATIONS_BY_LANG.get(lang, ABBREVIATIONS_EN)

        protected, restore_map = self._protect(text, abbrevs)
        segments = self._split(protected, strong=strong, weak=weak)
        return [self._restore(s, restore_map) for s in segments if s.strip()]

    def tokenize_raw(
        self,
        text: str,
        *,
        language: str | None = None,
    ) -> list[str]:
        """Return raw (un-stripped) segment substrings.

        Same boundary decisions as :py:meth:`tokenize`, but each segment is
        returned as it appears in the input text — leading / trailing
        whitespace intact. Used by the stream adapter so it can re-use the
        final segment as the continuation buffer without losing the space
        that separates it from the upcoming chunk.
        """
        if not text:
            return []

        lang = language or self._language
        if lang == "auto":
            lang = detect_script(text[:200])

        strong = self._strong
        weak = self._weak
        if lang == "el":
            if ";" not in strong:
                strong = strong + ";"
            weak = weak.replace(";", "")

        if self._override_abbreviations is not None:
            abbrevs = self._override_abbreviations
        else:
            abbrevs = ABBREVIATIONS_BY_LANG.get(lang, ABBREVIATIONS_EN)

        protected, restore_map = self._protect(text, abbrevs)
        segments = self._split(protected, strong=strong, weak=weak)
        return [self._restore_raw(s, restore_map) for s in segments]

    @staticmethod
    def _restore_raw(text: str, restore_map: dict[str, str]) -> str:
        """Same as :py:meth:`_restore` but preserves edge whitespace."""
        if not restore_map:
            return text
        for key, value in restore_map.items():
            text = text.replace(key, value)
        return text

    def stream(self, *, language: str | None = None) -> SentenceChunkStream:
        """Open a push-based stream adapter bound to this chunker."""
        from .stream import BufferedSentenceChunkStream

        return BufferedSentenceChunkStream(
            tokenize_fn=partial(self.tokenize_raw, language=language),
            strong_terminators=self._resolve_strong_for_stream(language),
            min_sentence_len=self._min_sentence_len,
        )

    def _protect(
        self,
        text: str,
        abbreviations: frozenset[str],
    ) -> tuple[str, dict[str, str]]:
        """Replace structural tokens and abbreviation dots with placeholders.

        Each protected region becomes a single PUA character that the split
        scan cannot interpret as a terminator.
        """
        restore_map: dict[str, str] = {}
        counter = [0]

        def _substitute(match: re.Match[str]) -> str:
            key = _placeholder_char(counter[0])
            counter[0] += 1
            restore_map[key] = match.group(0)
            return key

        for regex in _STRUCTURAL_PATTERNS:
            text = regex.sub(_substitute, text)

        if abbreviations:
            escaped = sorted(
                (re.escape(a) for a in abbreviations),
                key=len,
                reverse=True,
            )
            abbrev_regex = re.compile(
                r"\b(?:" + "|".join(escaped) + r")\.",
                re.IGNORECASE,
            )
            text = abbrev_regex.sub(_substitute, text)

        return text, restore_map

    @staticmethod
    def _restore(text: str, restore_map: dict[str, str]) -> str:
        """Replace every placeholder with its original text and strip edges."""
        if not restore_map:
            return text.strip()
        for key, value in restore_map.items():
            text = text.replace(key, value)
        return text.strip()

    def _split(
        self,
        text: str,
        *,
        strong: str,
        weak: str,
    ) -> list[str]:
        """Walk ``text`` and return a list of sentence segments.

        Strong terminators always cut (with lookahead-before-commit). Weak
        terminators cut only after the accumulated buffer reaches
        ``min_sentence_len`` chars so we don't emit fragments from every comma
        at the start of a response.
        """
        strong_set = set(strong)
        weak_set = set(weak)
        closing_quote_set = set(CLOSING_QUOTES)

        segments: list[str] = []
        start = 0
        i = 0
        n = len(text)

        while i < n:
            ch = text[i]
            if ch not in strong_set and ch not in weak_set:
                i += 1
                continue

            j = i + 1
            cur_set = strong_set if ch in strong_set else weak_set
            while j < n and text[j] in cur_set:
                j += 1

            if j < n and text[j] in closing_quote_set:
                j += 1

            look = j
            saw_newline = ch == "\n"
            while look < n and text[look] in " \t":
                look += 1
            if look < n and text[look] == "\n":
                saw_newline = True
                look += 1
                while look < n and text[look] in " \t":
                    look += 1

            if ch in strong_set:
                if look < n or saw_newline:
                    segments.append(text[start:j])
                    start = j
                    i = look
                    continue
                break

            cluster_end = j
            if cluster_end - start >= self._min_sentence_len and look < n:
                segments.append(text[start:cluster_end])
                start = cluster_end
                i = look
                continue

            i = j

        if start < n:
            segments.append(text[start:n])

        return segments

    def _resolve_strong_for_stream(self, language: str | None) -> str:
        """Return the strong-terminator set the stream adapter should use for fast-path checks."""
        lang = language or self._language
        if lang == "el" and ";" not in self._strong:
            return self._strong + ";"
        return self._strong

Default multilingual, Unicode-aware sentence chunker.

Works correctly without a language hint for every major world script. An explicit hint sharpens behaviour (abbreviation set, Greek ; upgrade) but is not required.

Args

language
ISO 639-1 code or "auto" (default). "auto" triggers script detection on the first ~200 characters of each tokenize call.
min_sentence_len
Minimum character length before a weak terminator (, ; : etc.) is treated as a cut boundary. Strong terminators (. ! ? 。 ! ? । 。 ۔ ։ ። …) always cut regardless of length.
strong_terminators
Override the default strong-terminator character class.
weak_terminators
Override the default weak-terminator character class.
abbreviations
Override the abbreviation set. When None (default), the set is chosen from ABBREVIATIONS_BY_LANG based on the resolved language.

Ancestors

Methods

def stream(self, *, language: str | None = None) ‑> SentenceChunkStream
Expand source code
def stream(self, *, language: str | None = None) -> SentenceChunkStream:
    """Open a push-based stream adapter bound to this chunker."""
    from .stream import BufferedSentenceChunkStream

    return BufferedSentenceChunkStream(
        tokenize_fn=partial(self.tokenize_raw, language=language),
        strong_terminators=self._resolve_strong_for_stream(language),
        min_sentence_len=self._min_sentence_len,
    )

Open a push-based stream adapter bound to this chunker.

def tokenize(self, text: str, *, language: str | None = None) ‑> list[str]
Expand source code
def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
    """Split ``text`` into sentence-sized strings.

    Args:
        text: Input text. May be a full response or a partial buffer.
        language: Optional ISO 639-1 override. When omitted, uses the
            instance's default.

    Returns:
        Sentence strings in input order, leading/trailing whitespace
        stripped, empty segments filtered out. When the input ends without
        a confirmed sentence boundary, the final element contains the
        unterminated remainder (the stream adapter uses this to retain
        buffer state).
    """
    if not text:
        return []

    lang = language or self._language
    if lang == "auto":
        lang = detect_script(text[:200])

    strong = self._strong
    weak = self._weak
    if lang == "el":
        if ";" not in strong:
            strong = strong + ";"
        weak = weak.replace(";", "")

    if self._override_abbreviations is not None:
        abbrevs = self._override_abbreviations
    else:
        abbrevs = ABBREVIATIONS_BY_LANG.get(lang, ABBREVIATIONS_EN)

    protected, restore_map = self._protect(text, abbrevs)
    segments = self._split(protected, strong=strong, weak=weak)
    return [self._restore(s, restore_map) for s in segments if s.strip()]

Split text into sentence-sized strings.

Args

text
Input text. May be a full response or a partial buffer.
language
Optional ISO 639-1 override. When omitted, uses the instance's default.

Returns

Sentence strings in input order, leading/trailing whitespace stripped, empty segments filtered out. When the input ends without a confirmed sentence boundary, the final element contains the unterminated remainder (the stream adapter uses this to retain buffer state).

def tokenize_raw(self, text: str, *, language: str | None = None) ‑> list[str]
Expand source code
def tokenize_raw(
    self,
    text: str,
    *,
    language: str | None = None,
) -> list[str]:
    """Return raw (un-stripped) segment substrings.

    Same boundary decisions as :py:meth:`tokenize`, but each segment is
    returned as it appears in the input text — leading / trailing
    whitespace intact. Used by the stream adapter so it can re-use the
    final segment as the continuation buffer without losing the space
    that separates it from the upcoming chunk.
    """
    if not text:
        return []

    lang = language or self._language
    if lang == "auto":
        lang = detect_script(text[:200])

    strong = self._strong
    weak = self._weak
    if lang == "el":
        if ";" not in strong:
            strong = strong + ";"
        weak = weak.replace(";", "")

    if self._override_abbreviations is not None:
        abbrevs = self._override_abbreviations
    else:
        abbrevs = ABBREVIATIONS_BY_LANG.get(lang, ABBREVIATIONS_EN)

    protected, restore_map = self._protect(text, abbrevs)
    segments = self._split(protected, strong=strong, weak=weak)
    return [self._restore_raw(s, restore_map) for s in segments]

Return raw (un-stripped) segment substrings.

Same boundary decisions as :py:meth:tokenize, but each segment is returned as it appears in the input text — leading / trailing whitespace intact. Used by the stream adapter so it can re-use the final segment as the continuation buffer without losing the space that separates it from the upcoming chunk.

class BasicTextFilter (*,
language: str = 'auto',
strip_markdown: bool = True,
strip_llm_metadata: bool = True,
collapse_script_parens: bool = True,
normalise_punctuation: bool = True,
expand_symbols: bool = True,
expand_ranges: bool = True,
protect_structural: bool = True,
respect_quotes: bool = True,
respect_parens: bool = True,
ssml_flavor: str = 'none',
verbalize_currency: bool = True,
verbalize_numbers: bool = False,
currency_hint: str | None = None)
Expand source code
class BasicTextFilter(TextFilter):
    """Default text filter with six independently-toggleable rules.

    All rules are on by default. Symbol expansion is suppressed for non-English
    languages; the TTS provider handles symbol readings for those.
    """

    def __init__(
        self,
        *,
        language: str = "auto",
        strip_markdown: bool = True,
        strip_llm_metadata: bool = True,
        collapse_script_parens: bool = True,
        normalise_punctuation: bool = True,
        expand_symbols: bool = True,
        expand_ranges: bool = True,
        protect_structural: bool = True,
        respect_quotes: bool = True,
        respect_parens: bool = True,
        ssml_flavor: str = "none",
        verbalize_currency: bool = True,
        verbalize_numbers: bool = False,
        currency_hint: str | None = None,
    ) -> None:
        """Initialise the filter.

        Args:
            language: ISO 639-1 code or ``"auto"``. Drives symbol expansion
                (English only) and numeric-range separator word ("to", "से",
                "から", etc.).
            strip_markdown: Strip Markdown syntax before TTS. Default ``True``.
            strip_llm_metadata: Strip common LLM state-leak shapes such as
                ``(SESSION STATE: intro_delivered=true)`` or
                ``STATE: language=en``. Safety net — the real fix is a
                clearer prompt. Default ``True``.
            collapse_script_parens: When the LLM writes both an English word
                and its non-Latin gloss (``Tally (टैली)``, ``one click
                (एक क्लिक)``), collapse to just the gloss so the TTS doesn't
                speak the phrase twice. Word-count aware: ``Extra manpower
                (मैनपावर)`` → ``Extra मैनपावर``. Default ``True``.
            normalise_punctuation: Collapse ``"..."`` → ``"…"`` etc. Default ``True``.
            expand_symbols: Rewrite ``&``, ``%``, ``#N`` → words (English only).
                Default ``True``.
            expand_ranges: Rewrite short digit ranges ``2-3`` → ``2 to 3``
                using the locale's separator word. Default ``True``.
            protect_structural: Preserve URLs / paths / versions / emails
                through the Markdown and symbol stages. Default ``True``.
            respect_quotes: (Reserved — currently a no-op; preserved state
                is tracked per turn so future logic can use it.)
            respect_parens: (Reserved — same as above.)
        """
        self._language = language
        self._strip_markdown = strip_markdown
        self._strip_llm_metadata = strip_llm_metadata
        self._collapse_script_parens = collapse_script_parens
        self._normalise_punctuation = normalise_punctuation
        self._expand_symbols = expand_symbols
        self._expand_ranges = expand_ranges
        self._protect_structural = protect_structural
        self._respect_quotes = respect_quotes
        self._respect_parens = respect_parens
        self._ssml_flavor = (ssml_flavor or "none").lower()
        self._verbalize_currency = verbalize_currency
        self._verbalize_numbers = verbalize_numbers
        self._currency_hint = currency_hint.lower() if currency_hint else None

        self._buffer: str = ""
        self._in_code_fence: bool = False
        self._placeholder_counter: int = 0
        self._placeholder_map: dict[str, str] = {}

    @classmethod
    def for_language(cls, language: str | None) -> "BasicTextFilter":
        """Build a filter pre-configured for ``language`` — used by ``Pipeline``
        auto-wiring.

        """
        lang = language or "auto"
        return cls(
            language=lang,
            verbalize_numbers=lang in _VERBALIZE_LANGS,
            currency_hint=_CURRENCY_BY_LANG.get(lang),
        )

    async def filter(self, chunks: AsyncIterator[str]) -> AsyncIterator[str]:
        """Transform an incoming chunk stream."""
        await self.reset()
        try:
            async for chunk in chunks:
                if not chunk:
                    continue
                logger.debug("[chunking] filter ← raw: %r", chunk)
                self._buffer += chunk
                emitted_before_fence: list[str] = []
                while True:
                    if self._in_code_fence:
                        close_idx = self._buffer.find("```")
                        if close_idx == -1:
                            self._buffer = ""
                            break
                        self._buffer = self._buffer[close_idx + 3 :]
                        self._in_code_fence = False
                        continue 

                    open_idx = self._buffer.find("```")
                    if open_idx == -1:
                        break 
                    close_idx = self._buffer.find("```", open_idx + 3)
                    if close_idx != -1:
                        break 
                    prefix = self._buffer[:open_idx]
                    self._buffer = ""
                    self._in_code_fence = True
                    if prefix:
                        processed = self._process(prefix)
                        if processed:
                            emitted_before_fence.append(processed)
                    break

                for piece in emitted_before_fence:
                    logger.debug("[chunking] filter → tokenizer: %r", piece)
                    yield piece

                if self._in_code_fence or not self._buffer:
                    continue

                safe_cut = self._find_emit_boundary(self._buffer)
                if safe_cut <= 0:
                    if len(self._buffer) > _MAX_BUFFER:
                        processed = self._process(self._buffer)
                        self._buffer = ""
                        if processed:
                            logger.debug("[chunking] filter → tokenizer: %r", processed)
                            yield processed
                    continue
                processed = self._process(self._buffer[:safe_cut])
                self._buffer = self._buffer[safe_cut:]
                if processed:
                    logger.debug("[chunking] filter → tokenizer: %r", processed)
                    yield processed

            if self._in_code_fence:
                self._buffer = ""
                self._in_code_fence = False
            if self._buffer:
                processed = self._process(self._buffer)
                self._buffer = ""
                if processed:
                    logger.debug("[chunking] filter → tokenizer (drain): %r", processed)
                    yield processed
        except Exception:
            logger.error("BasicTextFilter errored; yielding buffer raw", exc_info=True)
            if self._buffer:
                yield self._buffer
                self._buffer = ""

    async def reset(self) -> None:
        """Reset per-turn state."""
        self._buffer = ""
        self._in_code_fence = False
        self._placeholder_counter = 0
        self._placeholder_map = {}

    def _find_emit_boundary(self, text: str) -> int:
        """Return the length of the prefix that can be safely emitted.

        Assumes fence handling has already happened: this only worries about
        balanced inline Markdown markers (``**``, ``__``, backticks, brackets).

        Returns 0 when no safe whitespace boundary exists yet.
        """
        if not text:
            return 0
        for idx in range(len(text) - 1, -1, -1):
            if not text[idx].isspace():
                continue
            candidate = text[: idx + 1]
            if self._has_balanced_markers(candidate):
                return idx + 1
        return 0

    @staticmethod
    def _has_balanced_markers(text: str) -> bool:
        """Cheap balance check for common Markdown / grouping markers.

        Ensures we never emit a prefix that leaves an opener dangling —
        needed so that:
        - inline ``**bold**`` / ``_italic_`` / ``` `code` ``` all close,
        - Markdown links ``[text](url)`` both brackets close,
        - and, critically, parentheses balance. The last matters for the
          metadata stripper: an LLM-leaked ``(SESSION STATE: …)`` must
          reach the processor as one intact block so the regex can delete it.
        """
        if text.count("**") % 2 != 0:
            return False
        if text.count("__") % 2 != 0:
            return False
        bt_count = sum(1 for c in text if c == "`")
        if bt_count % 2 != 0:
            return False
        if text.count("[") != text.count("]"):
            return False
        if text.count("(") != text.count(")"):
            return False
        return True

    def _process(self, text: str) -> str:
        if not text:
            return ""

        if self._protect_structural:
            text = self._protect(text)

        if self._strip_llm_metadata:
            text = self._strip_metadata(text)

        if self._strip_markdown:
            text = self._strip_md(text)

        if self._collapse_script_parens:
            text = self._collapse_script_parens_fn(text)

        if self._normalise_punctuation:
            text = self._normalise_punct(text)

        if self._ssml_flavor == "cartesia":
            text = self._inject_cartesia_ssml(text)
        elif self._ssml_flavor == "digits":
            text = self._inject_digits_spaced(text)

        if self._expand_ranges:
            text = self._expand_numeric_ranges(text)

        if self._verbalize_currency:
            text = expand_currency(
                text,
                language=self._language or "en",
                hint=self._currency_hint,
            )
        if self._verbalize_numbers:
            text = expand_cardinals(text, language=self._language or "en")

        if self._expand_symbols and self._language_is_english():
            text = self._expand(text)

        if self._protect_structural:
            text = self._restore(text)

        return text

    def _language_is_english(self) -> bool:
        lang = (self._language or "").lower()
        return lang in ("en", "auto", "")

    def _protect(self, text: str) -> str:
        def _substitute(match: re.Match[str]) -> str:
            key = chr(_FILTER_PLACEHOLDER_BASE + self._placeholder_counter)
            self._placeholder_counter += 1
            self._placeholder_map[key] = match.group(0)
            return key

        for regex in (URL_REGEX, EMAIL_REGEX, PATH_REGEX, VERSION_REGEX):
            text = regex.sub(_substitute, text)
        return text

    def _restore(self, text: str) -> str:
        if not self._placeholder_map:
            return text
        for key, value in self._placeholder_map.items():
            if key in text:
                text = text.replace(key, value)
        return text


    @staticmethod
    def _strip_metadata(text: str) -> str:
        """Remove common LLM state-leak shapes.

        Targets patterns the model produces when it misinterprets a
        ``SESSION STATE`` / ``INTERNAL`` block in the system prompt as
        something to announce: parenthesised ALL-CAPS key/value dumps, and
        bare ``KEYWORD: key=value`` lines. Designed to never match normal
        parentheticals — the key detector is ``= inside all-caps-prefixed``.
        """
        text = METADATA_PARENS_REGEX.sub("", text)
        text = METADATA_PREFIX_REGEX.sub("", text)
        return text

    @staticmethod
    def _collapse_script_parens_fn(text: str) -> str:
        """Collapse ``<Latin word(s)> (<non-Latin gloss>)`` → ``<gloss>``.

        Walks each ``(…)`` containing a non-ASCII character, counts its
        words, and removes exactly that many Latin-script words from the
        immediately-preceding text. Preserves whitespace and non-Latin
        context around the match.
        """
        matches = list(SCRIPT_MIXED_PAREN_REGEX.finditer(text))
        if not matches:
            return text

        result: list[str] = []
        pos = 0
        for m in matches:
            paren_start = m.start()
            paren_end = m.end()
            content = m.group(1).strip()
            if not content:
                result.append(text[pos:paren_end])
                pos = paren_end
                continue

            n_words = len(content.split())

            before = text[pos:paren_start]
            idx = len(before)
            removed = 0
            while removed < n_words:
                while idx > 0 and before[idx - 1].isspace():
                    idx -= 1
                end = idx
                while (
                    idx > 0
                    and before[idx - 1].isascii()
                    and before[idx - 1].isalnum()
                ):
                    idx -= 1
                if end == idx:
                    break 
                removed += 1

            if removed == n_words:
                kept = before[:idx]
                if kept and not kept[-1:].isspace():
                    result.append(kept + " ")
                else:
                    result.append(kept)
                result.append(content)
            else:
                result.append(before)
                result.append(text[paren_start:paren_end])
            pos = paren_end

        result.append(text[pos:])
        return "".join(result)

    @staticmethod
    def _strip_md(text: str) -> str:
        text = MD_FENCED_CODE_REGEX.sub("", text)
        text = MD_INLINE_CODE_REGEX.sub("", text)
        text = MD_IMAGE_REGEX.sub("", text)
        text = MD_LINK_REGEX.sub(r"\1", text)
        text = MD_HEADING_REGEX.sub("", text)
        text = MD_LIST_MARKER_REGEX.sub("", text)
        text = MD_BLOCKQUOTE_REGEX.sub("", text)
        text = MD_HR_REGEX.sub("", text)
        text = MD_TABLE_SEP_REGEX.sub("", text)
        text = MD_TABLE_PIPE_REGEX.sub(" ", text)
        text = MD_BOLD_STAR_REGEX.sub(r"\1", text)
        text = MD_BOLD_UNDER_REGEX.sub(r"\1", text)
        text = MD_ITALIC_STAR_REGEX.sub(r"\1", text)
        text = MD_ITALIC_UNDER_REGEX.sub(r"\1", text)
        return text

    @staticmethod
    def _normalise_punct(text: str) -> str:
        text = PUNCT_ELLIPSIS_REGEX.sub("…", text)
        text = PUNCT_SPACED_DASH_REGEX.sub(" — ", text)
        return text

    @staticmethod
    def _expand(text: str) -> str:
        for regex, replacement in SYMBOL_EXPANSIONS_EN:
            text = regex.sub(replacement, text)
        return text

    @staticmethod
    def _inject_digits_spaced(text: str) -> str:
        """Replace phones and 6+ digit runs with space-separated digits.

        Example: ``+91 98765 43210`` → ``9 1 9 8 7 6 5 4 3 2 1 0``.

        This is the universal fallback when the TTS doesn't support
        Cartesia's ``<spell>`` tag (Sarvam AI, ElevenLabs, Google TTS,
        Azure, AWS Polly, etc.). Every modern TTS reads space-separated
        digits character-by-character naturally — in the active language
        (Hindi TTS says "नौ आठ सात…", English TTS says "nine eight seven…").
        The ``+``, ``(``, ``)``, ``-``, and interior spaces of phone
        patterns are stripped so the TTS doesn't read "plus paren…" etc.
        """
        def _phone_to_digits(m: re.Match[str]) -> str:
            digits = re.sub(r"\D", "", m.group(0))
            return " ".join(digits)

        text = SPELL_PHONE_REGEX.sub(_phone_to_digits, text)
        text = SPELL_LONG_DIGITS_REGEX.sub(
            lambda m: " ".join(m.group(0)),
            text,
        )
        return text


    @staticmethod
    def _inject_cartesia_ssml(text: str) -> str:
        """Wrap phone numbers and long digit runs in ``<spell>…</spell>``.

        Cartesia Sonic-3 reads unwrapped digit strings as natural-language
        numbers ("one thousand two hundred thirty four"). For IDs, account
        numbers, OTPs, and phone numbers you almost always want digit-by-digit.

        The order matters: phone regex runs first (more specific), then the
        long-digit fallback for any standalone 7+ digit run.
        """
        text = SPELL_PHONE_REGEX.sub(lambda m: f"<spell>{m.group(0)}</spell>", text)
        text = SPELL_LONG_DIGITS_REGEX.sub(
            lambda m: f"<spell>{m.group(0)}</spell>", text
        )
        return text

    def _expand_numeric_ranges(self, text: str) -> str:
        """Rewrite ``N-M`` as ``N <sep> M`` using the locale's separator word.

        Regex width is adaptive:

        * With ``ssml_flavor="cartesia"`` (phones already wrapped in
          ``<spell>…</spell>``) — uses the WIDE regex (up to 4 digits per
          side), so ``500-1000 entries`` becomes ``500 से 1000 entries``.
        * Otherwise — uses the narrow 3-digit regex so an English phone
          like ``555-1234`` is not mistaken for a range.
        * For non-Latin agents (Hindi, Tamil, etc.), we also use the wide
          regex because phone numbers are rare in those prose contexts,
          while range patterns (दाम पांच सौ से हज़ार, etc.) are common.
        """
        lang = (self._language or "").lower()
        separator = RANGE_SEPARATOR_BY_LANG.get(lang)
        if separator is None:
            return text
        regex = (
            RANGE_REGEX_WIDE
            if self._ssml_flavor == "cartesia" or not self._language_is_english()
            else RANGE_REGEX
        )
        return regex.sub(
            lambda m: f"{m.group(1)} {separator} {m.group(2)}",
            text,
        )

Default text filter with six independently-toggleable rules.

All rules are on by default. Symbol expansion is suppressed for non-English languages; the TTS provider handles symbol readings for those.

Initialise the filter.

Args

language
ISO 639-1 code or "auto". Drives symbol expansion (English only) and numeric-range separator word ("to", "से", "から", etc.).
strip_markdown
Strip Markdown syntax before TTS. Default True.
strip_llm_metadata
Strip common LLM state-leak shapes such as (SESSION STATE: intro_delivered=true) or STATE: language=en. Safety net — the real fix is a clearer prompt. Default True.
collapse_script_parens
When the LLM writes both an English word and its non-Latin gloss (Tally (टैली), one click (एक क्लिक)), collapse to just the gloss so the TTS doesn't speak the phrase twice. Word-count aware: Extra manpower (मैनपावर)Extra मैनपावर<code>. Default </code>True.
normalise_punctuation
Collapse "...""…" etc. Default True.
expand_symbols
Rewrite &, %, #N → words (English only). Default True.
expand_ranges
Rewrite short digit ranges 2-32 to 3 using the locale's separator word. Default True.
protect_structural
Preserve URLs / paths / versions / emails through the Markdown and symbol stages. Default True.
respect_quotes
(Reserved — currently a no-op; preserved state is tracked per turn so future logic can use it.)
respect_parens
(Reserved — same as above.)

Ancestors

Static methods

def for_language(language: str | None) ‑> BasicTextFilter

Build a filter pre-configured for language — used by Pipeline auto-wiring.

Methods

async def filter(self, chunks: AsyncIterator[str]) ‑> AsyncIterator[str]
Expand source code
async def filter(self, chunks: AsyncIterator[str]) -> AsyncIterator[str]:
    """Transform an incoming chunk stream."""
    await self.reset()
    try:
        async for chunk in chunks:
            if not chunk:
                continue
            logger.debug("[chunking] filter ← raw: %r", chunk)
            self._buffer += chunk
            emitted_before_fence: list[str] = []
            while True:
                if self._in_code_fence:
                    close_idx = self._buffer.find("```")
                    if close_idx == -1:
                        self._buffer = ""
                        break
                    self._buffer = self._buffer[close_idx + 3 :]
                    self._in_code_fence = False
                    continue 

                open_idx = self._buffer.find("```")
                if open_idx == -1:
                    break 
                close_idx = self._buffer.find("```", open_idx + 3)
                if close_idx != -1:
                    break 
                prefix = self._buffer[:open_idx]
                self._buffer = ""
                self._in_code_fence = True
                if prefix:
                    processed = self._process(prefix)
                    if processed:
                        emitted_before_fence.append(processed)
                break

            for piece in emitted_before_fence:
                logger.debug("[chunking] filter → tokenizer: %r", piece)
                yield piece

            if self._in_code_fence or not self._buffer:
                continue

            safe_cut = self._find_emit_boundary(self._buffer)
            if safe_cut <= 0:
                if len(self._buffer) > _MAX_BUFFER:
                    processed = self._process(self._buffer)
                    self._buffer = ""
                    if processed:
                        logger.debug("[chunking] filter → tokenizer: %r", processed)
                        yield processed
                continue
            processed = self._process(self._buffer[:safe_cut])
            self._buffer = self._buffer[safe_cut:]
            if processed:
                logger.debug("[chunking] filter → tokenizer: %r", processed)
                yield processed

        if self._in_code_fence:
            self._buffer = ""
            self._in_code_fence = False
        if self._buffer:
            processed = self._process(self._buffer)
            self._buffer = ""
            if processed:
                logger.debug("[chunking] filter → tokenizer (drain): %r", processed)
                yield processed
    except Exception:
        logger.error("BasicTextFilter errored; yielding buffer raw", exc_info=True)
        if self._buffer:
            yield self._buffer
            self._buffer = ""

Transform an incoming chunk stream.

async def reset(self) ‑> None
Expand source code
async def reset(self) -> None:
    """Reset per-turn state."""
    self._buffer = ""
    self._in_code_fence = False
    self._placeholder_counter = 0
    self._placeholder_map = {}

Reset per-turn state.

class BufferedSentenceChunkStream (*,
tokenize_fn: Callable[[str], list[str]],
strong_terminators: str,
min_sentence_len: int = 20,
idle_flush_ms: int = 150,
idle_min_chars: int = 12)
Expand source code
class BufferedSentenceChunkStream(SentenceChunkStream):
    """Stream adapter that wraps a one-shot tokenize function.

    Strategy:

    * Accumulate incoming text in a buffer.
    * After each ``push_text``, call the tokenize function on the buffer.
      When it returns more than one segment, every segment except the last is
      confirmed complete (they're followed by at least one more character in
      the buffer, which the tokenizer interprets as lookahead). Emit them.
      The last segment is the in-flight remainder; keep it as the new buffer.
    * An idle-flush background task watches for silence. If nothing has been
      pushed for ``idle_flush_ms`` and the buffer holds enough text, cut on a
      word or CJK-character boundary and emit. This bounds time-to-audio for
      terminator-less responses.
    * ``flush()`` and ``end_input()`` drain the buffer as a single trailing
      segment and close the iterator cleanly.
    """

    def __init__(
        self,
        *,
        tokenize_fn: Callable[[str], list[str]],
        strong_terminators: str,
        min_sentence_len: int = 20,
        idle_flush_ms: int = 150,
        idle_min_chars: int = 12,
    ) -> None:
        """Initialise the stream.

        Args:
            tokenize_fn: Callable that maps buffer string → list of segments.
                Typically ``partial(tokenizer.tokenize, language=...)``.
            strong_terminators: Character class used for fast lookahead checks.
            min_sentence_len: Passed through for consistency (tokenize_fn
                already knows this; kept here for the idle-flush heuristic).
            idle_flush_ms: Milliseconds of inactivity before a word-boundary
                cut fires. Clamped to at least 100 ms.
            idle_min_chars: Minimum buffer length before idle-flush is allowed
                to fire. Prevents emitting 1-3 char fragments on slow LLMs.
        """
        self._tokenize_fn = tokenize_fn
        self._strong_set = set(strong_terminators)
        self._min_sentence_len = max(1, int(min_sentence_len))
        self._idle_flush_s = max(0.1, idle_flush_ms / 1000.0)
        self._idle_min_chars = max(1, int(idle_min_chars))

        self._buffer: str = ""
        self._buffer_lock = asyncio.Lock()
        self._queue: asyncio.Queue[object] = asyncio.Queue()
        self._closed: bool = False
        self._last_push_monotonic: float = time.monotonic()
        self._idle_task: asyncio.Task[None] | None = None

    async def push_text(self, text: str) -> None:
        """Feed more text into the stream."""
        if self._closed:
            logger.debug("push_text on closed BufferedSentenceChunkStream; ignored")
            return
        if not text:
            return
        async with self._buffer_lock:
            self._buffer += text
            self._last_push_monotonic = time.monotonic()
            self._ensure_idle_task()
            await self._try_emit_locked()

    async def flush(self) -> None:
        """Emit any buffered text as a single trailing segment."""
        async with self._buffer_lock:
            await self._flush_locked()

    async def end_input(self) -> None:
        """Signal end of input; drain the buffer and close the iterator."""
        async with self._buffer_lock:
            await self._flush_locked()
            if not self._closed:
                self._closed = True
                await self._queue.put(_EOS)
        if self._idle_task is not None and not self._idle_task.done():
            self._idle_task.cancel()

    def __aiter__(self) -> AsyncIterator[str]:
        return self

    async def __anext__(self) -> str:
        item = await self._queue.get()
        if item is _EOS:
            raise StopAsyncIteration
        return item 

    async def _try_emit_locked(self) -> None:
        """Emit every confirmed-complete sentence from the current buffer.

        Called with ``_buffer_lock`` held.
        """
        if not self._buffer:
            return
        try:
            segments = self._tokenize_fn(self._buffer)
        except Exception: 
            logger.error("Tokenizer raised; flushing buffer as-is", exc_info=True)
            remainder = self._buffer.strip()
            self._buffer = ""
            if remainder:
                await self._queue.put(remainder)
            return

        if len(segments) <= 1:
            return

        for segment in segments[:-1]:
            stripped = segment.strip()
            if stripped:
                logger.debug("[chunking] tokenizer emit: %r", stripped)
                await self._queue.put(stripped)

        self._buffer = segments[-1]

    async def _flush_locked(self) -> None:
        """Force-emit any buffered text. Called with ``_buffer_lock`` held."""
        if not self._buffer:
            return
        try:
            segments = self._tokenize_fn(self._buffer)
        except Exception: 
            logger.error("Tokenizer raised during flush", exc_info=True)
            segments = [self._buffer]

        logger.debug("[chunking] flush: buffer=%r -> segments=%r", self._buffer, segments)
        
        self._buffer = ""
        for segment in segments:
            stripped = segment.strip()
            if stripped:
                logger.debug("[chunking] flush emit: %r", stripped)
                await self._queue.put(stripped)

    def _ensure_idle_task(self) -> None:
        """Spawn the idle-flush watchdog on first push."""
        if self._idle_task is None or self._idle_task.done():
            self._idle_task = asyncio.create_task(self._idle_monitor())

    async def _idle_monitor(self) -> None:
        """Fire word-boundary cuts when no push arrives for ``idle_flush_ms``."""
        try:
            while not self._closed:
                await asyncio.sleep(self._idle_flush_s)
                if self._closed:
                    return
                if time.monotonic() - self._last_push_monotonic < self._idle_flush_s:
                    continue
                await self._idle_flush()
        except asyncio.CancelledError:
            raise

    async def _idle_flush(self) -> None:
        """Emit a word-boundary cut from the current buffer, if it's long enough."""
        async with self._buffer_lock:
            if len(self._buffer) < self._idle_min_chars:
                return
            
            stripped = self._buffer.rstrip()
            if stripped and stripped[-1] in self._strong_set:
                self._buffer = ""
                logger.debug(
                    "Idle-flush emitting complete sentence %d chars", len(stripped)
                )
                await self._queue.put(stripped)
                return

            cut_index = self._find_fallback_cut(self._buffer)
            if cut_index <= 0:
                return
            
            remainder = self._buffer[cut_index:].strip()
            if len(remainder) < self._idle_min_chars:
                return

            segment = self._buffer[:cut_index].strip()
            self._buffer = self._buffer[cut_index:].lstrip()
            if segment:
                logger.debug("Idle-flush emitting %d chars", len(segment))
                await self._queue.put(segment)

    def _find_fallback_cut(self, text: str) -> int:
        """Pick the best place to cut ``text`` in the absence of a terminator.

        Prefers the last space. If there are no spaces (CJK / Thai / Lao /
        Myanmar / Khmer prose), cuts at the last character-range boundary.
        Returns 0 when no safe cut exists.
        """
        last_space = text.rfind(" ")
        if last_space > 0:
            return last_space

        last_cjk = -1
        for m in NO_SPACE_SCRIPTS_REGEX.finditer(text):
            last_cjk = m.end()
        if last_cjk > 0 and last_cjk < len(text):
            return last_cjk

        return 0

Stream adapter that wraps a one-shot tokenize function.

Strategy:

  • Accumulate incoming text in a buffer.
  • After each push_text, call the tokenize function on the buffer. When it returns more than one segment, every segment except the last is confirmed complete (they're followed by at least one more character in the buffer, which the tokenizer interprets as lookahead). Emit them. The last segment is the in-flight remainder; keep it as the new buffer.
  • An idle-flush background task watches for silence. If nothing has been pushed for idle_flush_ms and the buffer holds enough text, cut on a word or CJK-character boundary and emit. This bounds time-to-audio for terminator-less responses.
  • flush() and end_input() drain the buffer as a single trailing segment and close the iterator cleanly.

Initialise the stream.

Args

tokenize_fn
Callable that maps buffer string → list of segments. Typically partial(tokenizer.tokenize, language=...).
strong_terminators
Character class used for fast lookahead checks.
min_sentence_len
Passed through for consistency (tokenize_fn already knows this; kept here for the idle-flush heuristic).
idle_flush_ms
Milliseconds of inactivity before a word-boundary cut fires. Clamped to at least 100 ms.
idle_min_chars
Minimum buffer length before idle-flush is allowed to fire. Prevents emitting 1-3 char fragments on slow LLMs.

Ancestors

Methods

async def end_input(self) ‑> None
Expand source code
async def end_input(self) -> None:
    """Signal end of input; drain the buffer and close the iterator."""
    async with self._buffer_lock:
        await self._flush_locked()
        if not self._closed:
            self._closed = True
            await self._queue.put(_EOS)
    if self._idle_task is not None and not self._idle_task.done():
        self._idle_task.cancel()

Signal end of input; drain the buffer and close the iterator.

async def flush(self) ‑> None
Expand source code
async def flush(self) -> None:
    """Emit any buffered text as a single trailing segment."""
    async with self._buffer_lock:
        await self._flush_locked()

Emit any buffered text as a single trailing segment.

async def push_text(self, text: str) ‑> None
Expand source code
async def push_text(self, text: str) -> None:
    """Feed more text into the stream."""
    if self._closed:
        logger.debug("push_text on closed BufferedSentenceChunkStream; ignored")
        return
    if not text:
        return
    async with self._buffer_lock:
        self._buffer += text
        self._last_push_monotonic = time.monotonic()
        self._ensure_idle_task()
        await self._try_emit_locked()

Feed more text into the stream.

class EnglishHyphenator (patterns: str, exceptions: str = '')
Expand source code
class EnglishHyphenator:
    """Trie-based hyphenator using Liang's algorithm.

    Instantiate via :func:`_get_hyphenator` (cached), or call
    :func:`hyphenate_english` for one-shot use.
    """

    def __init__(self, patterns: str, exceptions: str = "") -> None:
        self.tree: dict[Any, Any] = {}
        for pattern in patterns.split():
            self._insert_pattern(pattern)

        self.exceptions: dict[str, list[int]] = {}
        for ex in exceptions.split():
            points = [0] + [int(h == "-") for h in re.split(r"[a-z]", ex)]
            self.exceptions[ex.replace("-", "")] = points

    def _insert_pattern(self, pattern: str) -> None:
        chars = re.sub(r"[0-9]", "", pattern)
        points = [int(d or 0) for d in re.split(r"[.a-z]", pattern)]

        t = self.tree
        for c in chars:
            if c not in t:
                t[c] = {}
            t = t[c]
        t[None] = points

    def hyphenate(self, word: str) -> list[str]:
        """Return the word split at every legal hyphenation point.

        Short words (≤ 4 chars) pass through unchanged. Words in the
        exception list use the stored split; everything else runs the
        trie walk.
        """
        if len(word) <= 4:
            return [word]

        if word.lower() in self.exceptions:
            points = self.exceptions[word.lower()]
        else:
            work = "." + word.lower() + "."
            points = [0] * (len(work) + 1)
            for i in range(len(work)):
                t = self.tree
                for c in work[i:]:
                    if c in t:
                        t = t[c]
                        if None in t:
                            p = t[None]
                            for j, p_j in enumerate(p):
                                points[i + j] = max(points[i + j], p_j)
                    else:
                        break
            # No hyphens in the first or last two chars.
            points[1] = points[2] = points[-2] = points[-3] = 0

        pieces = [""]
        for c, p in zip(word, points[2:]):
            pieces[-1] += c
            if p % 2:
                pieces.append("")
        return pieces

Trie-based hyphenator using Liang's algorithm.

Instantiate via :func:_get_hyphenator (cached), or call :func:hyphenate_english() for one-shot use.

Methods

def hyphenate(self, word: str) ‑> list[str]
Expand source code
def hyphenate(self, word: str) -> list[str]:
    """Return the word split at every legal hyphenation point.

    Short words (≤ 4 chars) pass through unchanged. Words in the
    exception list use the stored split; everything else runs the
    trie walk.
    """
    if len(word) <= 4:
        return [word]

    if word.lower() in self.exceptions:
        points = self.exceptions[word.lower()]
    else:
        work = "." + word.lower() + "."
        points = [0] * (len(work) + 1)
        for i in range(len(work)):
            t = self.tree
            for c in work[i:]:
                if c in t:
                    t = t[c]
                    if None in t:
                        p = t[None]
                        for j, p_j in enumerate(p):
                            points[i + j] = max(points[i + j], p_j)
                else:
                    break
        # No hyphens in the first or last two chars.
        points[1] = points[2] = points[-2] = points[-3] = 0

    pieces = [""]
    for c, p in zip(word, points[2:]):
        pieces[-1] += c
        if p % 2:
            pieces.append("")
    return pieces

Return the word split at every legal hyphenation point.

Short words (≤ 4 chars) pass through unchanged. Words in the exception list use the stored split; everything else runs the trie walk.

class IndicScriptTransliterator (*, source: str, target: str)
Expand source code
class IndicScriptTransliterator:
    """Thin wrapper around ``UnicodeIndicTransliterator``.

    Useful as a ``TextFilter``-adjacent utility when your LLM emits Hindi but
    the TTS speaks Telugu (or similar cross-Indic scenarios).

    Example::

        from videosdk.agents.tokenize import IndicScriptTransliterator

        trans = IndicScriptTransliterator(source="hi", target="te")
        out = trans.convert("नमस्ते दुनिया")
        # out == "నమస్తే దునియా" (approximate phonetic conversion)
    """

    def __init__(self, *, source: str, target: str) -> None:
        self._source = source
        self._target = target

    def convert(self, text: str) -> str:
        try:
            from indicnlp.transliterate.unicode_transliterate import (
                UnicodeIndicTransliterator,
            )
        except ImportError as exc:
            raise ImportError(
                "indic-nlp-library is missing — it ships as a dependency of "
                "videosdk-agents; reinstall with: uv pip install -U videosdk-agents"
            ) from exc
        return UnicodeIndicTransliterator.transliterate(text, self._source, self._target)

Thin wrapper around UnicodeIndicTransliterator.

Useful as a TextFilter-adjacent utility when your LLM emits Hindi but the TTS speaks Telugu (or similar cross-Indic scenarios).

Example::

from videosdk.agents.tokenize import IndicScriptTransliterator

trans = IndicScriptTransliterator(source="hi", target="te")
out = trans.convert("नमस्ते दुनिया")
# out == "నమస్తే దునియా" (approximate phonetic conversion)

Methods

def convert(self, text: str) ‑> str
Expand source code
def convert(self, text: str) -> str:
    try:
        from indicnlp.transliterate.unicode_transliterate import (
            UnicodeIndicTransliterator,
        )
    except ImportError as exc:
        raise ImportError(
            "indic-nlp-library is missing — it ships as a dependency of "
            "videosdk-agents; reinstall with: uv pip install -U videosdk-agents"
        ) from exc
    return UnicodeIndicTransliterator.transliterate(text, self._source, self._target)
class IndicSentenceChunker (*, language: str = 'hi', min_sentence_len: int = 1, idle_flush_ms: int = 400)
Expand source code
class IndicSentenceChunker(SentenceChunker):
    """Sentence chunker for Indic scripts using indic-nlp-library.

    Falls back gracefully for unsupported ``language`` values by returning the
    input text as a single segment — the ``BufferedSentenceChunkStream`` then
    relies on idle-flush for phrasing.
    """

    def __init__(
        self,
        *,
        language: str = "hi",
        min_sentence_len: int = 1,
        idle_flush_ms: int = 400,
    ) -> None:
        """Initialise the chunker.

        Args:
            language: Default ISO 639-1 code. Override per-turn via
                ``tokenize(..., language=...)`` or ``stream(language=...)``.
            min_sentence_len: Passed through to the ``BufferedSentenceChunkStream``
                idle-flush heuristic. Default 1 (Indic sentences can be very short).
            idle_flush_ms: Idle timeout before a word-boundary cut fires.
        """
        if language not in INDIC_LANGS:
            logger.warning(
                "IndicSentenceChunker language %r is not in the supported set "
                "%s; sentence splitting may degrade. Consider using "
                "BasicSentenceChunker for non-Indic languages.",
                language,
                sorted(INDIC_LANGS),
            )
        self._default_language = language
        self._min_sentence_len = int(min_sentence_len)
        self._idle_flush_ms = int(idle_flush_ms)
        self._split_fn: "_SentenceSplit | None" = None

    def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
        if not text or not text.strip():
            return []
        lang = self._resolve_language(language)
        split_fn = self._get_split_fn()
        try:
            sentences = split_fn(text, lang=lang)
        except Exception:  # pragma: no cover - defensive
            logger.warning(
                "indic-nlp-library sentence_split raised for lang=%r; returning "
                "input as single segment",
                lang,
                exc_info=True,
            )
            return [text.strip()]
        return [s.strip() for s in sentences if s and s.strip()]

    def tokenize_raw(self, text: str, *, language: str | None = None) -> list[str]:
        """Return segments with original whitespace preserved.

        Used by the stream adapter: the ``BufferedSentenceChunkStream`` re-uses
        the last segment as the continuation buffer, so losing edge whitespace
        would concatenate unrelated words (e.g. ``व्यापार`` + ``से`` →
        ``व्यापारसे``) when the next chunk arrives.

        indic-nlp-library's ``sentence_split`` returns stripped strings, so we
        map each stripped segment back to its position in the original text
        and slice raw ranges, preserving the whitespace that originally sat
        between sentences.
        """
        if not text:
            return []
        lang = self._resolve_language(language)
        split_fn = self._get_split_fn()
        try:
            stripped_sentences = split_fn(text, lang=lang)
        except Exception:  # pragma: no cover - defensive
            return [text]

        raw_segments: list[str] = []
        cursor = 0
        for s in stripped_sentences:
            core = s.strip() if s else ""
            if not core:
                continue
            idx = text.find(core, cursor)
            if idx < 0:
                # Shouldn't happen — defensive fallback keeps the stripped text.
                raw_segments.append(core)
                continue
            end = idx + len(core)
            raw_segments.append(text[cursor:end])
            cursor = end

        if cursor < len(text):
            if raw_segments:
                raw_segments[-1] = raw_segments[-1] + text[cursor:]
            else:
                raw_segments.append(text[cursor:])

        return raw_segments

    def stream(self, *, language: str | None = None) -> SentenceChunkStream:
        lang = self._resolve_language(language)
        tokenize_fn: Callable[[str], list[str]] = partial(
            self.tokenize_raw, language=lang
        )
        return BufferedSentenceChunkStream(
            tokenize_fn=tokenize_fn,
            strong_terminators="।॥.!?",  # Devanagari danda + Latin terminators
            min_sentence_len=self._min_sentence_len,
            idle_flush_ms=self._idle_flush_ms,
        )

    def _resolve_language(self, language: str | None) -> str:
        """Pick the language code to pass to the upstream library."""
        lang = (language or self._default_language or "hi").lower()
        if lang == "auto":
            return self._default_language
        return lang

    def _get_split_fn(self) -> "_SentenceSplit":
        """Lazy-load the upstream function on first use."""
        if self._split_fn is None:
            self._split_fn = _load_sentence_split()
        return self._split_fn

Sentence chunker for Indic scripts using indic-nlp-library.

Falls back gracefully for unsupported language values by returning the input text as a single segment — the BufferedSentenceChunkStream then relies on idle-flush for phrasing.

Initialise the chunker.

Args

language
Default ISO 639-1 code. Override per-turn via tokenize(..., language=...) or stream(language=...).
min_sentence_len
Passed through to the BufferedSentenceChunkStream idle-flush heuristic. Default 1 (Indic sentences can be very short).
idle_flush_ms
Idle timeout before a word-boundary cut fires.

Ancestors

Methods

def tokenize_raw(self, text: str, *, language: str | None = None) ‑> list[str]
Expand source code
def tokenize_raw(self, text: str, *, language: str | None = None) -> list[str]:
    """Return segments with original whitespace preserved.

    Used by the stream adapter: the ``BufferedSentenceChunkStream`` re-uses
    the last segment as the continuation buffer, so losing edge whitespace
    would concatenate unrelated words (e.g. ``व्यापार`` + ``से`` →
    ``व्यापारसे``) when the next chunk arrives.

    indic-nlp-library's ``sentence_split`` returns stripped strings, so we
    map each stripped segment back to its position in the original text
    and slice raw ranges, preserving the whitespace that originally sat
    between sentences.
    """
    if not text:
        return []
    lang = self._resolve_language(language)
    split_fn = self._get_split_fn()
    try:
        stripped_sentences = split_fn(text, lang=lang)
    except Exception:  # pragma: no cover - defensive
        return [text]

    raw_segments: list[str] = []
    cursor = 0
    for s in stripped_sentences:
        core = s.strip() if s else ""
        if not core:
            continue
        idx = text.find(core, cursor)
        if idx < 0:
            # Shouldn't happen — defensive fallback keeps the stripped text.
            raw_segments.append(core)
            continue
        end = idx + len(core)
        raw_segments.append(text[cursor:end])
        cursor = end

    if cursor < len(text):
        if raw_segments:
            raw_segments[-1] = raw_segments[-1] + text[cursor:]
        else:
            raw_segments.append(text[cursor:])

    return raw_segments

Return segments with original whitespace preserved.

Used by the stream adapter: the BufferedSentenceChunkStream re-uses the last segment as the continuation buffer, so losing edge whitespace would concatenate unrelated words (e.g. व्यापार + सेव्यापारसे) when the next chunk arrives.

indic-nlp-library's sentence_split returns stripped strings, so we map each stripped segment back to its position in the original text and slice raw ranges, preserving the whitespace that originally sat between sentences.

Inherited members

class SentenceChunkStream
Expand source code
class SentenceChunkStream(ABC):
    """Push-based stream adapter for sentence chunkers.

    A ``SentenceChunkStream`` is single-use: open it with
    ``SentenceChunker.stream()``, push text as deltas arrive, call
    ``end_input()`` when the upstream source closes, and iterate over it to
    receive sentence-sized strings.
    """

    @abstractmethod
    async def push_text(self, text: str) -> None:
        """Feed more text into the stream.

        Args:
            text: A chunk of text. May be a single character or a multi-word
                fragment. Need not align with word or sentence boundaries.
        """

    @abstractmethod
    async def flush(self) -> None:
        """Force-emit any buffered text as a single trailing sentence."""

    @abstractmethod
    async def end_input(self) -> None:
        """Signal that no more text will arrive. Drains the buffer and closes the stream."""

    @abstractmethod
    def __aiter__(self) -> AsyncIterator[str]:
        ...

Push-based stream adapter for sentence chunkers.

A SentenceChunkStream is single-use: open it with SentenceChunker.stream(), push text as deltas arrive, call end_input() when the upstream source closes, and iterate over it to receive sentence-sized strings.

Ancestors

  • abc.ABC

Subclasses

Methods

async def end_input(self) ‑> None
Expand source code
@abstractmethod
async def end_input(self) -> None:
    """Signal that no more text will arrive. Drains the buffer and closes the stream."""

Signal that no more text will arrive. Drains the buffer and closes the stream.

async def flush(self) ‑> None
Expand source code
@abstractmethod
async def flush(self) -> None:
    """Force-emit any buffered text as a single trailing sentence."""

Force-emit any buffered text as a single trailing sentence.

async def push_text(self, text: str) ‑> None
Expand source code
@abstractmethod
async def push_text(self, text: str) -> None:
    """Feed more text into the stream.

    Args:
        text: A chunk of text. May be a single character or a multi-word
            fragment. Need not align with word or sentence boundaries.
    """

Feed more text into the stream.

Args

text
A chunk of text. May be a single character or a multi-word fragment. Need not align with word or sentence boundaries.
class SentenceChunker
Expand source code
class SentenceChunker(ABC):
    """Abstract chunker that splits text into sentence-sized segments for TTS."""

    @abstractmethod
    def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
        """Split the given text into sentences in one shot.

        Args:
            text: Full text to split.
            language: Optional ISO 639-1 language hint. When omitted, the
                chunker uses its internal heuristic (usually script detection).

        Returns:
            A list of sentence-sized strings with leading/trailing whitespace stripped.
        """

    @abstractmethod
    def stream(self, *, language: str | None = None) -> SentenceChunkStream:
        """Open a push-based stream for incremental chunking.

        Args:
            language: Optional ISO 639-1 language hint forwarded to ``tokenize``.

        Returns:
            A fresh ``SentenceChunkStream`` instance.
        """

Abstract chunker that splits text into sentence-sized segments for TTS.

Ancestors

  • abc.ABC

Subclasses

Methods

def stream(self, *, language: str | None = None) ‑> SentenceChunkStream
Expand source code
@abstractmethod
def stream(self, *, language: str | None = None) -> SentenceChunkStream:
    """Open a push-based stream for incremental chunking.

    Args:
        language: Optional ISO 639-1 language hint forwarded to ``tokenize``.

    Returns:
        A fresh ``SentenceChunkStream`` instance.
    """

Open a push-based stream for incremental chunking.

Args

language
Optional ISO 639-1 language hint forwarded to tokenize.

Returns

A fresh SentenceChunkStream instance.

def tokenize(self, text: str, *, language: str | None = None) ‑> list[str]
Expand source code
@abstractmethod
def tokenize(self, text: str, *, language: str | None = None) -> list[str]:
    """Split the given text into sentences in one shot.

    Args:
        text: Full text to split.
        language: Optional ISO 639-1 language hint. When omitted, the
            chunker uses its internal heuristic (usually script detection).

    Returns:
        A list of sentence-sized strings with leading/trailing whitespace stripped.
    """

Split the given text into sentences in one shot.

Args

text
Full text to split.
language
Optional ISO 639-1 language hint. When omitted, the chunker uses its internal heuristic (usually script detection).

Returns

A list of sentence-sized strings with leading/trailing whitespace stripped.

class TextFilter
Expand source code
class TextFilter(ABC):
    """Pre-chunking text transformation.

    Filters sit *before* the chunker. They may be stateful across a turn
    (e.g. tracking whether the stream is currently inside a Markdown code
    fence) and are reset between turns via ``reset()``.
    """

    @abstractmethod
    def filter(self, chunks: AsyncIterator[str]) -> AsyncIterator[str]:
        """Transform an input stream of text chunks.

        Args:
            chunks: Async iterator of raw LLM deltas.

        Yields:
            Filtered text chunks ready to be consumed by a ``SentenceChunker``.
        """

    @abstractmethod
    async def reset(self) -> None:
        """Reset internal state between turns."""

Pre-chunking text transformation.

Filters sit before the chunker. They may be stateful across a turn (e.g. tracking whether the stream is currently inside a Markdown code fence) and are reset between turns via reset().

Ancestors

  • abc.ABC

Subclasses

Methods

def filter(self, chunks: AsyncIterator[str]) ‑> AsyncIterator[str]
Expand source code
@abstractmethod
def filter(self, chunks: AsyncIterator[str]) -> AsyncIterator[str]:
    """Transform an input stream of text chunks.

    Args:
        chunks: Async iterator of raw LLM deltas.

    Yields:
        Filtered text chunks ready to be consumed by a ``SentenceChunker``.
    """

Transform an input stream of text chunks.

Args

chunks
Async iterator of raw LLM deltas.

Yields

Filtered text chunks ready to be consumed by a SentenceChunker.

async def reset(self) ‑> None
Expand source code
@abstractmethod
async def reset(self) -> None:
    """Reset internal state between turns."""

Reset internal state between turns.