"""In-memory deterministic pipeline: HTML → sections → metrics → scores."""
from __future__ import annotations
from dataclasses import asdict, dataclass, field
from typing import Any
from disclosure_alpha.analytics_config import PipelineConfig, build_versions, resolve_pipeline_config
from disclosure_alpha.confidence import ConfidenceInput, compute_confidence_detailed, compute_overall_confidence
from disclosure_alpha.deterministic_scoring import (
DeterministicAggregationResult,
aggregate_deterministic_matrix,
aggregate_deterministic_matrix_v2,
)
from disclosure_alpha.diff_engine import compute_section_diff
from disclosure_alpha.section_extractor import (
ExtractedSection,
FilingDocument,
extract_sections,
)
from disclosure_alpha.text_metrics import (
SectionTextInput,
compute_density_metrics,
compute_text_metrics,
detect_section_flags,
)
from disclosure_alpha.edgar.types import EdgarError
from disclosure_alpha.version import SCORING_MODEL_VERSION
[docs]
@dataclass
class FilingBundle:
ref: Any # FilingRef — lazy import to avoid circular deps
html: str
prior_html: str | None
prior_accession: str | None
[docs]
@dataclass
class FilingSectionsResult:
sections: list[ExtractedSection]
filing: dict[str, Any] = field(default_factory=dict)
versions: dict[str, str] = field(default_factory=dict)
[docs]
@dataclass
class FilingMetricsResult:
metrics: MetricsResult
filing: dict[str, Any] = field(default_factory=dict)
versions: dict[str, str] = field(default_factory=dict)
[docs]
@dataclass
class MetricsResult:
section_metrics: dict[str, dict[str, float]]
section_diffs: dict[str, float | None]
section_flags: dict[str, dict[str, bool]]
section_densities: dict[str, dict[str, float]]
language_deltas: dict[str, dict[str, float]]
section_diffs_v2: dict[str, float | None] = field(default_factory=dict)
extraction_confs: list[float] = field(default_factory=list)
diff_confs: list[float] = field(default_factory=list)
extraction_warnings: list[str] = field(default_factory=list)
required_sections_present: bool = True
has_prior: bool = True
[docs]
@dataclass
class FilingScoreResult:
sections: list[ExtractedSection]
metrics: MetricsResult
scores: DeterministicAggregationResult
versions: dict[str, str] = field(default_factory=dict)
filing: dict[str, Any] = field(default_factory=dict)
[docs]
def to_dict(self) -> dict[str, Any]:
out = {
"sections": [asdict(s) for s in self.sections],
"metrics": asdict(self.metrics),
"scores": {
"overall_disclosure_risk_score": self.scores.overall_disclosure_risk_score,
"score_coverage_ratio": self.scores.score_coverage_ratio,
"confidence_score": self.scores.confidence_score,
"missing_components": self.scores.missing_components,
"components": asdict(self.scores.components),
"aggregates": asdict(self.scores.aggregates),
"provenance": [p.to_dict() for p in self.scores.provenance],
},
"versions": self.versions,
}
if self.filing:
out["filing"] = self.filing
return out
def _metrics_dict(metrics) -> dict[str, float]:
return {
"negative_word_ratio": float(metrics.negative_word_ratio or 0),
"uncertainty_word_ratio": float(metrics.uncertainty_word_ratio or 0),
"litigious_word_ratio": float(metrics.litigious_word_ratio or 0),
"modal_word_ratio": float(metrics.modal_word_ratio or 0),
"weak_modal_word_ratio": float(getattr(metrics, "weak_modal_word_ratio", 0) or 0),
"moderate_modal_word_ratio": float(getattr(metrics, "moderate_modal_word_ratio", 0) or 0),
"strong_modal_word_ratio": float(getattr(metrics, "strong_modal_word_ratio", 0) or 0),
"legal_regulatory_phrase_ratio": float(
getattr(metrics, "legal_regulatory_phrase_ratio", 0) or 0
),
"constraining_word_ratio": float(metrics.constraining_word_ratio or 0),
"boilerplate_phrase_ratio": float(metrics.boilerplate_phrase_ratio or 0),
"numeric_specificity_score": float(metrics.numeric_specificity_score or 0),
"company_specificity_score": float(metrics.company_specificity_score or 0),
"readability_score": float(metrics.readability_score or 0),
}
def _prior_by_name(
prior_sections: list[ExtractedSection] | None, section_name: str
) -> ExtractedSection | None:
if not prior_sections:
return None
for section in prior_sections:
if section.section_name == section_name:
return section
return None
[docs]
def compute_section_metrics(
sections: list[ExtractedSection],
prior_sections: list[ExtractedSection] | None = None,
) -> MetricsResult:
section_metrics: dict[str, dict[str, float]] = {}
section_diffs: dict[str, float | None] = {}
section_flags: dict[str, dict[str, bool]] = {}
section_densities: dict[str, dict[str, float]] = {}
language_deltas: dict[str, dict[str, float]] = {}
section_diffs_v2: dict[str, float | None] = {}
extraction_confs: list[float] = []
diff_confs: list[float] = []
for section in sections:
extraction_confs.append(float(section.extraction_confidence or 0.5))
text = section.cleaned_text or ""
metrics = compute_text_metrics(SectionTextInput(section.section_name, text))
section_metrics[section.section_name] = _metrics_dict(metrics)
section_flags[section.section_name] = detect_section_flags(text, section.section_name)
section_densities[section.section_name] = compute_density_metrics(text, section.section_name)
prior = _prior_by_name(prior_sections, section.section_name)
diff = compute_section_diff(
current_text=text,
prior_text=prior.cleaned_text if prior else None,
current_section_id=section.section_name,
prior_section_id=prior.section_name if prior else None,
)
if diff.disclosure_change_score is not None:
section_diffs[section.section_name] = float(diff.disclosure_change_score)
change_v2 = getattr(diff, "disclosure_change_score_v2", None)
if change_v2 is not None:
section_diffs_v2[section.section_name] = float(change_v2)
if diff.language_deltas:
language_deltas[section.section_name] = dict(diff.language_deltas)
if diff.confidence_score is not None:
diff_confs.append(float(diff.confidence_score))
return MetricsResult(
section_metrics=section_metrics,
section_diffs=section_diffs,
section_flags=section_flags,
section_densities=section_densities,
language_deltas=language_deltas,
section_diffs_v2=section_diffs_v2,
extraction_confs=extraction_confs,
diff_confs=diff_confs,
has_prior=prior_sections is not None and len(prior_sections) > 0,
)
[docs]
def score_deterministic(
metrics: MetricsResult,
*,
config: PipelineConfig | None = None,
) -> DeterministicAggregationResult:
"""Legacy v1 aggregation (`deterministic_scoring_v1`). Prefer score_for_model()."""
pipeline_config = resolve_pipeline_config(config)
result = aggregate_deterministic_matrix(
section_metrics=metrics.section_metrics,
section_diffs=metrics.section_diffs,
section_flags=metrics.section_flags,
language_deltas=metrics.language_deltas,
section_densities=metrics.section_densities,
config=pipeline_config.scoring,
)
avg_diff_conf = (
sum(metrics.diff_confs) / len(metrics.diff_confs) if metrics.diff_confs else None
)
result.confidence_score = compute_overall_confidence(
extraction_confidences=metrics.extraction_confs,
coverage_ratio=result.score_coverage_ratio,
diff_confidence=avg_diff_conf,
extraction_warnings=metrics.extraction_warnings,
required_sections_present=metrics.required_sections_present,
has_prior=metrics.has_prior,
)
return result
[docs]
def score_deterministic_v2(
metrics: MetricsResult,
*,
config: PipelineConfig | None = None,
form_type: str | None = None,
) -> DeterministicAggregationResult:
"""v2 aggregation (`deterministic_scoring_v2`)."""
pipeline_config = resolve_pipeline_config(config)
result = aggregate_deterministic_matrix_v2(
section_metrics=metrics.section_metrics,
section_diffs=metrics.section_diffs,
section_flags=metrics.section_flags,
language_deltas=metrics.language_deltas,
section_densities=metrics.section_densities,
section_diffs_v2=metrics.section_diffs_v2,
calibration_context=pipeline_config.resolved_calibration(form_type=form_type),
config=pipeline_config.scoring,
)
avg_diff_conf = (
sum(metrics.diff_confs) / len(metrics.diff_confs) if metrics.diff_confs else None
)
result.confidence_score, _ = compute_confidence_detailed(
ConfidenceInput(
extraction_confidences=metrics.extraction_confs,
extraction_warnings=metrics.extraction_warnings,
coverage_ratio=result.score_coverage_ratio,
required_sections_present=metrics.required_sections_present,
diff_confidence=avg_diff_conf,
has_prior=metrics.has_prior,
)
)
return result
[docs]
def score_for_model(
metrics: MetricsResult,
scoring_model_version: str | None = None,
*,
config: PipelineConfig | None = None,
form_type: str | None = None,
) -> DeterministicAggregationResult:
"""Score metrics with the requested model (default: SCORING_MODEL_VERSION / v2)."""
from disclosure_alpha.validation.scoring_version import is_v1_scoring
pipeline_config = resolve_pipeline_config(config, scoring_model_version=scoring_model_version)
if is_v1_scoring(pipeline_config.scoring_model_version):
return score_deterministic(metrics, config=pipeline_config)
return score_deterministic_v2(metrics, config=pipeline_config, form_type=form_type)
[docs]
def score_filing_html(
html: str,
form_type: str,
*,
prior_html: str | None = None,
prior_form_type: str | None = None,
cik: str = "",
accession_number: str = "",
config: PipelineConfig | None = None,
) -> FilingScoreResult:
sections = extract_sections_from_html(
html, form_type, cik=cik, accession_number=accession_number
)
prior_sections = None
if prior_html:
prior_sections = extract_sections_from_html(
prior_html,
prior_form_type or form_type,
cik=cik,
accession_number="prior",
)
metrics = compute_section_metrics(sections, prior_sections)
pipeline_config = resolve_pipeline_config(config)
scores = score_for_model(metrics, config=pipeline_config, form_type=form_type)
return FilingScoreResult(
sections=sections,
metrics=metrics,
scores=scores,
versions=build_versions(pipeline_config),
)
def _filing_meta(ref, *, prior_accession: str | None = None) -> dict[str, Any]:
return {
"ticker": ref.ticker,
"cik": ref.cik,
"accession_number": ref.accession_number,
"form_type": ref.form_type,
"fiscal_year": ref.fiscal_year,
"quarter": ref.quarter,
"filing_date": ref.filing_date,
"report_date": ref.report_date,
"prior_accession_number": prior_accession,
}
def _filter_section_dict(d: dict[str, Any], section_names: set[str]) -> dict[str, Any]:
return {k: v for k, v in d.items() if k in section_names}
[docs]
def filter_metrics_result(metrics: MetricsResult, section_names: set[str]) -> MetricsResult:
return MetricsResult(
section_metrics=_filter_section_dict(metrics.section_metrics, section_names),
section_diffs=_filter_section_dict(metrics.section_diffs, section_names),
section_flags=_filter_section_dict(metrics.section_flags, section_names),
section_densities=_filter_section_dict(metrics.section_densities, section_names),
language_deltas=_filter_section_dict(metrics.language_deltas, section_names),
section_diffs_v2=_filter_section_dict(metrics.section_diffs_v2, section_names),
extraction_confs=metrics.extraction_confs,
diff_confs=metrics.diff_confs,
extraction_warnings=metrics.extraction_warnings,
required_sections_present=metrics.required_sections_present,
has_prior=metrics.has_prior,
)
[docs]
def filter_sections(
sections: list[ExtractedSection], section_names: set[str]
) -> list[ExtractedSection]:
return [s for s in sections if s.section_name in section_names]
[docs]
def load_filing_bundle(
ticker: str,
fiscal_year: int,
*,
form_type: str = "10-K",
quarter: str | None = None,
use_cache: bool = True,
compare_prior: bool = True,
) -> FilingBundle:
from disclosure_alpha.edgar.resolver import (
load_filing_html,
resolve_filing,
resolve_prior_filing,
)
from disclosure_alpha.edgar.types import FilingNotFoundError
ref = resolve_filing(ticker, fiscal_year, form_type, quarter, use_cache=use_cache)
html = load_filing_html(ref, use_cache=use_cache)
prior_html = None
prior_accession = None
if compare_prior:
try:
prior_ref = resolve_prior_filing(ref, use_cache=use_cache)
if prior_ref:
prior_html = load_filing_html(prior_ref, use_cache=use_cache)
prior_accession = prior_ref.accession_number
except FilingNotFoundError:
pass
return FilingBundle(
ref=ref,
html=html,
prior_html=prior_html,
prior_accession=prior_accession,
)
[docs]
def sections_filing_ticker(
ticker: str,
fiscal_year: int,
*,
form_type: str = "10-K",
quarter: str | None = None,
use_cache: bool = True,
compare_prior: bool = True,
) -> FilingSectionsResult:
bundle = load_filing_bundle(
ticker,
fiscal_year,
form_type=form_type,
quarter=quarter,
use_cache=use_cache,
compare_prior=compare_prior,
)
sections = extract_sections_from_html(
bundle.html,
bundle.ref.form_type,
cik=bundle.ref.cik,
accession_number=bundle.ref.accession_number,
)
return FilingSectionsResult(
sections=sections,
filing=_filing_meta(bundle.ref, prior_accession=bundle.prior_accession),
versions=build_versions(),
)
[docs]
def metrics_filing_ticker(
ticker: str,
fiscal_year: int,
*,
form_type: str = "10-K",
quarter: str | None = None,
use_cache: bool = True,
compare_prior: bool = True,
) -> FilingMetricsResult:
bundle = load_filing_bundle(
ticker,
fiscal_year,
form_type=form_type,
quarter=quarter,
use_cache=use_cache,
compare_prior=compare_prior,
)
sections = extract_sections_from_html(
bundle.html,
bundle.ref.form_type,
cik=bundle.ref.cik,
accession_number=bundle.ref.accession_number,
)
prior_sections = None
if bundle.prior_html:
prior_sections = extract_sections_from_html(
bundle.prior_html,
bundle.ref.form_type,
cik=bundle.ref.cik,
accession_number="prior",
)
metrics = compute_section_metrics(sections, prior_sections)
return FilingMetricsResult(
metrics=metrics,
filing=_filing_meta(bundle.ref, prior_accession=bundle.prior_accession),
versions=build_versions(),
)
[docs]
def score_filing_ticker(
ticker: str,
fiscal_year: int,
*,
form_type: str = "10-K",
quarter: str | None = None,
use_cache: bool = True,
compare_prior: bool = True,
config: PipelineConfig | None = None,
) -> FilingScoreResult:
bundle = load_filing_bundle(
ticker,
fiscal_year,
form_type=form_type,
quarter=quarter,
use_cache=use_cache,
compare_prior=compare_prior,
)
result = score_filing_html(
bundle.html,
bundle.ref.form_type,
prior_html=bundle.prior_html,
cik=bundle.ref.cik,
accession_number=bundle.ref.accession_number,
config=config,
)
result.filing = _filing_meta(bundle.ref, prior_accession=bundle.prior_accession)
return result
[docs]
@dataclass
class PanelTickerResult:
ticker: str
status: str
filing: dict[str, Any] | None = None
scores: DeterministicAggregationResult | None = None
error: str | None = None
[docs]
@dataclass
class PanelBatchResult:
results: list[PanelTickerResult]
summary: dict[str, int]
versions: dict[str, str]
[docs]
def score_panel_tickers(
tickers: list[str],
fiscal_year: int,
*,
form_type: str = "10-K",
quarter: str | None = None,
use_cache: bool = True,
compare_prior: bool = True,
scoring_model_version: str = SCORING_MODEL_VERSION,
config: PipelineConfig | None = None,
) -> PanelBatchResult:
"""Score many tickers sequentially; per-ticker errors do not fail the batch."""
pipeline_config = resolve_pipeline_config(config, scoring_model_version=scoring_model_version)
results: list[PanelTickerResult] = []
ok = 0
failed = 0
for raw in tickers:
ticker = raw.strip().upper()
try:
scored = score_filing_ticker(
ticker,
fiscal_year,
form_type=form_type,
quarter=quarter,
use_cache=use_cache,
compare_prior=compare_prior,
config=pipeline_config,
)
scores = score_for_model(
scored.metrics,
config=pipeline_config,
form_type=scored.filing.get("form_type") if scored.filing else form_type,
)
results.append(
PanelTickerResult(
ticker=ticker,
status="ok",
filing=scored.filing,
scores=scores,
)
)
ok += 1
except (EdgarError, ValueError, FileNotFoundError) as exc:
results.append(
PanelTickerResult(
ticker=ticker,
status="error",
error=str(exc),
)
)
failed += 1
return PanelBatchResult(
results=results,
summary={"ok": ok, "failed": failed},
versions=build_versions(pipeline_config),
)