Source code for disclosure_alpha.text_matching

"""Shared phrase and token matching helpers for text metrics and diff engine."""

from __future__ import annotations

import re

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

_SEVERITY_WINDOW = 10
_NUMERIC_TOKEN = re.compile(
    r"(?:\$)?\d[\d,]*(?:\.\d+)?%?|\b(?:january|february|march|april|may|june|july|august|"
    r"september|october|november|december)\s+\d{1,2},?\s+\d{4}\b|\b\d{4}\b",
    re.IGNORECASE,
)


[docs] def tokenize_words(text: str) -> list[str]: return re.findall(r"\b[a-zA-Z]+\b", text.lower())
def phrase_pattern(phrase: str) -> str: parts = [re.escape(part) for part in re.split(r"[\s-]+", phrase.lower()) if part] body = r"[\s-]+".join(parts) return rf"(?<![a-z0-9]){body}(?![a-z0-9])" def phrase_count(lower: str, phrase: str) -> int: return len(re.findall(phrase_pattern(phrase), lower)) def phrase_matches(lower: str, phrase: str) -> bool: return bool(re.search(phrase_pattern(phrase), lower)) def split_sentences(text: str) -> list[str]: if not text.strip(): return [] parts = re.split(r"[.!?]+\s+", text.strip()) return [p for p in parts if p.strip()] def boilerplate_hits(text: str, phrases: tuple[str, ...] | list[str]) -> int: """Count each phrase at most once per sentence.""" total = 0 for sent in split_sentences(text): lower = sent.lower() for phrase in phrases: if phrase_matches(lower, phrase): total += 1 return total def _keyword_token_indices(text: str, keyword: str) -> list[int]: words = tokenize_words(text) kw = keyword.lower() if " " not in kw and "-" not in kw: return [i for i, w in enumerate(words) if w == kw] lower = text.lower() indices: list[int] = [] for match in re.finditer(phrase_pattern(kw), lower): prefix = text[: match.start()] indices.append(len(tokenize_words(prefix))) return indices def topic_phrase_matches(text: str, keywords: list[str]) -> bool: lower = (text or "").lower() return any(phrase_matches(lower, kw) for kw in keywords) def extract_numeric_tokens(text: str) -> list[str]: """Normalize percentages, dollar amounts, dates, and counts for diff comparison.""" tokens: list[str] = [] for match in _NUMERIC_TOKEN.finditer(text or ""): raw = match.group(0).lower().replace(",", "").strip() if raw.startswith("$"): raw = raw[1:] if raw.endswith("%"): raw = f"pct:{raw[:-1]}" tokens.append(raw) return tokens def align_sentences( current_sentences: list[str], prior_sentences: list[str], *, match_threshold: float = 0.55, ) -> tuple[list[str], list[str], list[tuple[int, int, float]]]: """Match sentences via TF-IDF cosine similarity; return added, removed, matched triples.""" if not current_sentences and not prior_sentences: return [], [], [] if not current_sentences: return [], list(prior_sentences), [] if not prior_sentences: return list(current_sentences), [], [] all_sents = current_sentences + prior_sentences vec = TfidfVectorizer(max_features=2000) matrix = vec.fit_transform(all_sents) n_cur = len(current_sentences) cur_mat = matrix[:n_cur] prior_mat = matrix[n_cur:] sims = cosine_similarity(cur_mat, prior_mat) matched_prior: set[int] = set() matched_cur: set[int] = set() pairs: list[tuple[int, int, float]] = [] for ci in range(n_cur): best_pi = int(sims[ci].argmax()) best_sim = float(sims[ci, best_pi]) if best_sim >= match_threshold and best_pi not in matched_prior: matched_prior.add(best_pi) matched_cur.add(ci) pairs.append((ci, best_pi, best_sim)) added = [current_sentences[i] for i in range(n_cur) if i not in matched_cur] removed = [prior_sentences[i] for i in range(len(prior_sentences)) if i not in matched_prior] return added, removed, pairs def topic_intensity(text: str, topic: str, topic_keywords: dict[str, list[str]], severity_words: frozenset[str]) -> float: keywords = topic_keywords.get(topic, []) if not keywords: return 0.0 words = tokenize_words(text) hit_indices: list[int] = [] hits = 0 for kw in keywords: for idx in _keyword_token_indices(text, kw): hit_indices.append(idx) hits += 1 severity_hits = 0 for idx in hit_indices: start = max(0, idx - _SEVERITY_WINDOW) end = min(len(words), idx + _SEVERITY_WINDOW + 1) if any(w in severity_words for w in words[start:end]): severity_hits += 1 return hits + severity_hits * 0.5