Skip to content

Exporters API

src.exporters

Initialize processors package and register all processors.

__all__ = ['BaseExporter', 'CSVExporter'] module-attribute

BaseExporter

Bases: ABC

Abstract base class for data exporters.

Source code in src/exporters/base.py
class BaseExporter(ABC):
    """Abstract base class for data exporters."""

    def __init__(self, config: ExportConfig):
        self.config = config
        self._ensure_directories()
        self.monthly_notes_cache = {}

    def _ensure_directories(self) -> None:
        """Create necessary directory structure."""
        self.config.output_dir.mkdir(parents=True, exist_ok=True)
        if self.config.split_by_month:
            (self.config.output_dir / "monthly").mkdir(exist_ok=True)

    def _generate_type_stats(
        self, data: pd.DataFrame, data_type: Optional[DataType] = None
    ) -> List[str]:
        """Generate statistics for a specific data type."""
        stats = []

        if data_type == DataType.CGM or "cgm_primary" in data.columns:
            missing_count = data.get("missing_cgm", data.get("missing", 0)).sum()
            total_readings = len(data)
            total_na = data["cgm_primary"].isna().sum()
            initial_completeness = (
                (total_readings - missing_count) / total_readings
            ) * 100
            remaining_completeness = (
                (total_readings - total_na) / total_readings
            ) * 100

            stats.extend(
                [
                    "CGM Processing Notes:",
                    f"  Processed {total_readings} total CGM readings",
                    f"  Found {missing_count} missing or interpolated values",
                    f"  Initial CGM completeness: {initial_completeness:.2f}%",
                    f"  CGM completeness after interpolation: {remaining_completeness:.2f}%",
                ]
            )

        if data_type == DataType.BGM or any(
            col.startswith("bgm_") for col in data.columns
        ):
            bgm_cols = [
                col
                for col in data.columns
                if col.startswith("bgm_") and not col.endswith(("_clipped", "_mmol"))
            ]
            for bgm_col in bgm_cols:
                clipped_col = f"{bgm_col}_clipped"
                total_readings = data[bgm_col].notna().sum()

                if clipped_col in data.columns:
                    clipped_readings = data[clipped_col].sum()
                    clipped_percent = (
                        (clipped_readings / total_readings * 100)
                        if total_readings > 0
                        else 0
                    )

                    stats.extend(
                        [
                            f"BGM Processing Notes ({bgm_col}):",
                            f"  Processed {total_readings} total BGM readings",
                            f"  Found {clipped_readings} clipped values ({clipped_percent:.1f}%)",
                        ]
                    )
                else:
                    stats.extend(
                        [
                            f"BGM Processing Notes ({bgm_col}):",
                            f"  Processed {total_readings} total BGM readings",
                        ]
                    )

        if data_type == DataType.INSULIN or any(
            col in data.columns for col in ["dose", "basal_dose", "bolus_dose"]
        ):
            if data_type == DataType.INSULIN:
                basal_count = (
                    data["is_basal"].sum() if "is_basal" in data.columns else 0
                )
                bolus_count = (
                    data["is_bolus"].sum() if "is_bolus" in data.columns else 0
                )
                total_count = len(data)
                stats.extend(
                    [
                        "INSULIN Processing Notes:",
                        f"  Found {total_count} total doses",
                        f"  {basal_count} basal doses",
                        f"  {bolus_count} bolus doses",
                    ]
                )
            else:
                basal_count = (data["basal_dose"] > 0).sum()
                bolus_count = (data["bolus_dose"] > 0).sum()
                stats.extend(
                    [
                        "INSULIN Processing Notes:",
                        f"  Found {basal_count + bolus_count} total doses",
                        f"  {basal_count} basal doses",
                        f"  {bolus_count} bolus doses",
                    ]
                )

        if data_type == DataType.CARBS or "carbs_primary" in data.columns:
            carb_entries = (
                (data["carbs_primary"] > 0).sum()
                if "carbs_primary" in data.columns
                else (data > 0).sum()
            )
            stats.extend(
                ["CARBS Processing Notes:", f"  Found {carb_entries} carb entries"]
            )

        if data_type == DataType.NOTES or "notes_primary" in data.columns:
            note_count = (
                data["notes_primary"].notna().sum()
                if "notes_primary" in data.columns
                else data.notna().sum()
            )
            stats.extend(
                ["NOTES Processing Notes:", f"  Found {note_count} notes entries"]
            )

        return stats

    def _handle_monthly_exports(
        self, data: ProcessedTypeData, data_type: DataType
    ) -> None:
        """Handle monthly data splits and exports."""
        monthly_base = self.config.output_dir / "monthly"

        for timestamp, group in data.dataframe.groupby(pd.Grouper(freq="ME")):
            if not group.empty:
                month_str = pd.Timestamp(timestamp).strftime("%Y-%m")
                month_dir = monthly_base / month_str
                month_dir.mkdir(parents=True, exist_ok=True)

                # Generate fresh stats just for this month's data
                monthly_stats = [
                    f"Period: {month_str}",
                    f"Records: {len(group)}",
                    *self._generate_type_stats(group, data_type),
                ]

                # Create monthly data with new stats, but keep original source_units
                monthly_data = ProcessedTypeData(
                    dataframe=group,
                    source_units=data.source_units,
                    processing_notes=monthly_stats,  # Only using the fresh monthly stats
                )

                if self.config.include_processing_notes:
                    self.export_processing_notes(monthly_data, month_dir)
                self.export_monthly_split(monthly_data, data_type, month_dir)

    def _handle_monthly_aligned_exports(self, data: AlignmentResult) -> None:
        """Handle monthly splits for aligned data."""
        monthly_base = self.config.output_dir / "monthly"

        for timestamp, group in data.dataframe.groupby(pd.Grouper(freq="ME")):
            if not group.empty:
                month_str = pd.Timestamp(timestamp).strftime("%Y-%m")
                month_dir = monthly_base / month_str
                month_dir.mkdir(parents=True, exist_ok=True)
                month_dir.mkdir(parents=True, exist_ok=True)

                monthly_notes = [
                    f"Period: {month_str}",
                    f"Records: {len(group)}",
                    *self._generate_type_stats(group),
                ]

                monthly_aligned = AlignmentResult(
                    dataframe=group,
                    start_time=group.index.min(),
                    end_time=group.index.max(),
                    frequency=data.frequency,
                    processing_notes=monthly_notes,
                    source_units=data.source_units,
                )

                self.export_aligned_monthly_split(monthly_aligned, month_dir)
                if self.config.include_processing_notes:
                    self.export_processing_notes(monthly_aligned, month_dir)

    def _accumulate_monthly_stats(
        self, month_str: str, group: pd.DataFrame, data_type: DataType
    ) -> None:
        """Accumulate statistics for a month across all data types."""
        if month_str not in self.monthly_notes_cache:
            self.monthly_notes_cache[month_str] = {
                "period": month_str,
                "record_count": len(group),
                "stats": [],
            }

        type_stats = self._generate_type_stats(group, data_type)
        if type_stats:
            self.monthly_notes_cache[month_str]["stats"].extend(type_stats)

    @abstractmethod
    def export_complete_dataset(
        self, data: ProcessedTypeData, data_type: DataType, output_dir: Path
    ) -> None:
        """Export complete dataset for a specific data type."""

    @abstractmethod
    def export_monthly_split(
        self, data: ProcessedTypeData, data_type: DataType, month_dir: Path
    ) -> None:
        """Export monthly split for a specific data type."""

    @abstractmethod
    def export_processing_notes(
        self, data: Union[ProcessedTypeData, AlignmentResult], output_path: Path
    ) -> None:
        """Export processing notes."""

    @abstractmethod
    def export_aligned_complete_dataset(
        self, data: AlignmentResult, output_dir: Path
    ) -> None:
        """Export complete aligned dataset."""

    @abstractmethod
    def export_aligned_monthly_split(
        self, data: AlignmentResult, month_dir: Path
    ) -> None:
        """Export monthly split for aligned data."""

    def export_data(
        self,
        processed_data: Dict[DataType, ProcessedTypeData],
        aligned_data: Optional[AlignmentResult] = None,
    ) -> None:
        """Export all processed data and aligned data if available."""
        if not processed_data and not aligned_data:
            return

        # Reset monthly notes cache
        self.monthly_notes_cache = {}

        # Get date range from either source
        date_range = self.get_date_range(
            next(iter(processed_data.values())) if processed_data else aligned_data
        )
        complete_dir = self.config.output_dir / f"{date_range}_complete"
        complete_dir.mkdir(parents=True, exist_ok=True)

        # Export individual datasets
        for data_type, type_data in processed_data.items():
            self.export_complete_dataset(type_data, data_type, complete_dir)
            if self.config.include_processing_notes:
                self.export_processing_notes(type_data, complete_dir)
            if self.config.split_by_month:
                self._handle_monthly_exports(type_data, data_type)

        # Export aligned data if available
        if aligned_data:
            self.export_aligned_complete_dataset(aligned_data, complete_dir)
            if self.config.include_processing_notes:
                self.export_processing_notes(aligned_data, complete_dir)
            if self.config.split_by_month:
                self._handle_monthly_aligned_exports(aligned_data)

    @staticmethod
    def get_date_range(data: Union[ProcessedTypeData, AlignmentResult]) -> str:
        """Get date range string from data."""
        df = data.dataframe
        if not isinstance(df.index, pd.DatetimeIndex):
            raise ValueError("DataFrame must have DatetimeIndex")
        start = df.index.min().strftime("%Y-%m-%d")
        end = df.index.max().strftime("%Y-%m-%d")
        return f"{start}_to_{end}"

config = config instance-attribute

monthly_notes_cache = {} instance-attribute

__init__(config: ExportConfig)

Source code in src/exporters/base.py
def __init__(self, config: ExportConfig):
    self.config = config
    self._ensure_directories()
    self.monthly_notes_cache = {}

_ensure_directories() -> None

Create necessary directory structure.

Source code in src/exporters/base.py
def _ensure_directories(self) -> None:
    """Create necessary directory structure."""
    self.config.output_dir.mkdir(parents=True, exist_ok=True)
    if self.config.split_by_month:
        (self.config.output_dir / "monthly").mkdir(exist_ok=True)

_generate_type_stats(data: pd.DataFrame, data_type: Optional[DataType] = None) -> List[str]

Generate statistics for a specific data type.

Source code in src/exporters/base.py
def _generate_type_stats(
    self, data: pd.DataFrame, data_type: Optional[DataType] = None
) -> List[str]:
    """Generate statistics for a specific data type."""
    stats = []

    if data_type == DataType.CGM or "cgm_primary" in data.columns:
        missing_count = data.get("missing_cgm", data.get("missing", 0)).sum()
        total_readings = len(data)
        total_na = data["cgm_primary"].isna().sum()
        initial_completeness = (
            (total_readings - missing_count) / total_readings
        ) * 100
        remaining_completeness = (
            (total_readings - total_na) / total_readings
        ) * 100

        stats.extend(
            [
                "CGM Processing Notes:",
                f"  Processed {total_readings} total CGM readings",
                f"  Found {missing_count} missing or interpolated values",
                f"  Initial CGM completeness: {initial_completeness:.2f}%",
                f"  CGM completeness after interpolation: {remaining_completeness:.2f}%",
            ]
        )

    if data_type == DataType.BGM or any(
        col.startswith("bgm_") for col in data.columns
    ):
        bgm_cols = [
            col
            for col in data.columns
            if col.startswith("bgm_") and not col.endswith(("_clipped", "_mmol"))
        ]
        for bgm_col in bgm_cols:
            clipped_col = f"{bgm_col}_clipped"
            total_readings = data[bgm_col].notna().sum()

            if clipped_col in data.columns:
                clipped_readings = data[clipped_col].sum()
                clipped_percent = (
                    (clipped_readings / total_readings * 100)
                    if total_readings > 0
                    else 0
                )

                stats.extend(
                    [
                        f"BGM Processing Notes ({bgm_col}):",
                        f"  Processed {total_readings} total BGM readings",
                        f"  Found {clipped_readings} clipped values ({clipped_percent:.1f}%)",
                    ]
                )
            else:
                stats.extend(
                    [
                        f"BGM Processing Notes ({bgm_col}):",
                        f"  Processed {total_readings} total BGM readings",
                    ]
                )

    if data_type == DataType.INSULIN or any(
        col in data.columns for col in ["dose", "basal_dose", "bolus_dose"]
    ):
        if data_type == DataType.INSULIN:
            basal_count = (
                data["is_basal"].sum() if "is_basal" in data.columns else 0
            )
            bolus_count = (
                data["is_bolus"].sum() if "is_bolus" in data.columns else 0
            )
            total_count = len(data)
            stats.extend(
                [
                    "INSULIN Processing Notes:",
                    f"  Found {total_count} total doses",
                    f"  {basal_count} basal doses",
                    f"  {bolus_count} bolus doses",
                ]
            )
        else:
            basal_count = (data["basal_dose"] > 0).sum()
            bolus_count = (data["bolus_dose"] > 0).sum()
            stats.extend(
                [
                    "INSULIN Processing Notes:",
                    f"  Found {basal_count + bolus_count} total doses",
                    f"  {basal_count} basal doses",
                    f"  {bolus_count} bolus doses",
                ]
            )

    if data_type == DataType.CARBS or "carbs_primary" in data.columns:
        carb_entries = (
            (data["carbs_primary"] > 0).sum()
            if "carbs_primary" in data.columns
            else (data > 0).sum()
        )
        stats.extend(
            ["CARBS Processing Notes:", f"  Found {carb_entries} carb entries"]
        )

    if data_type == DataType.NOTES or "notes_primary" in data.columns:
        note_count = (
            data["notes_primary"].notna().sum()
            if "notes_primary" in data.columns
            else data.notna().sum()
        )
        stats.extend(
            ["NOTES Processing Notes:", f"  Found {note_count} notes entries"]
        )

    return stats

_handle_monthly_exports(data: ProcessedTypeData, data_type: DataType) -> None

Handle monthly data splits and exports.

Source code in src/exporters/base.py
def _handle_monthly_exports(
    self, data: ProcessedTypeData, data_type: DataType
) -> None:
    """Handle monthly data splits and exports."""
    monthly_base = self.config.output_dir / "monthly"

    for timestamp, group in data.dataframe.groupby(pd.Grouper(freq="ME")):
        if not group.empty:
            month_str = pd.Timestamp(timestamp).strftime("%Y-%m")
            month_dir = monthly_base / month_str
            month_dir.mkdir(parents=True, exist_ok=True)

            # Generate fresh stats just for this month's data
            monthly_stats = [
                f"Period: {month_str}",
                f"Records: {len(group)}",
                *self._generate_type_stats(group, data_type),
            ]

            # Create monthly data with new stats, but keep original source_units
            monthly_data = ProcessedTypeData(
                dataframe=group,
                source_units=data.source_units,
                processing_notes=monthly_stats,  # Only using the fresh monthly stats
            )

            if self.config.include_processing_notes:
                self.export_processing_notes(monthly_data, month_dir)
            self.export_monthly_split(monthly_data, data_type, month_dir)

_handle_monthly_aligned_exports(data: AlignmentResult) -> None

Handle monthly splits for aligned data.

Source code in src/exporters/base.py
def _handle_monthly_aligned_exports(self, data: AlignmentResult) -> None:
    """Handle monthly splits for aligned data."""
    monthly_base = self.config.output_dir / "monthly"

    for timestamp, group in data.dataframe.groupby(pd.Grouper(freq="ME")):
        if not group.empty:
            month_str = pd.Timestamp(timestamp).strftime("%Y-%m")
            month_dir = monthly_base / month_str
            month_dir.mkdir(parents=True, exist_ok=True)
            month_dir.mkdir(parents=True, exist_ok=True)

            monthly_notes = [
                f"Period: {month_str}",
                f"Records: {len(group)}",
                *self._generate_type_stats(group),
            ]

            monthly_aligned = AlignmentResult(
                dataframe=group,
                start_time=group.index.min(),
                end_time=group.index.max(),
                frequency=data.frequency,
                processing_notes=monthly_notes,
                source_units=data.source_units,
            )

            self.export_aligned_monthly_split(monthly_aligned, month_dir)
            if self.config.include_processing_notes:
                self.export_processing_notes(monthly_aligned, month_dir)

_accumulate_monthly_stats(month_str: str, group: pd.DataFrame, data_type: DataType) -> None

Accumulate statistics for a month across all data types.

Source code in src/exporters/base.py
def _accumulate_monthly_stats(
    self, month_str: str, group: pd.DataFrame, data_type: DataType
) -> None:
    """Accumulate statistics for a month across all data types."""
    if month_str not in self.monthly_notes_cache:
        self.monthly_notes_cache[month_str] = {
            "period": month_str,
            "record_count": len(group),
            "stats": [],
        }

    type_stats = self._generate_type_stats(group, data_type)
    if type_stats:
        self.monthly_notes_cache[month_str]["stats"].extend(type_stats)

export_complete_dataset(data: ProcessedTypeData, data_type: DataType, output_dir: Path) -> None abstractmethod

Export complete dataset for a specific data type.

Source code in src/exporters/base.py
@abstractmethod
def export_complete_dataset(
    self, data: ProcessedTypeData, data_type: DataType, output_dir: Path
) -> None:
    """Export complete dataset for a specific data type."""

export_monthly_split(data: ProcessedTypeData, data_type: DataType, month_dir: Path) -> None abstractmethod

Export monthly split for a specific data type.

Source code in src/exporters/base.py
@abstractmethod
def export_monthly_split(
    self, data: ProcessedTypeData, data_type: DataType, month_dir: Path
) -> None:
    """Export monthly split for a specific data type."""

export_processing_notes(data: Union[ProcessedTypeData, AlignmentResult], output_path: Path) -> None abstractmethod

Export processing notes.

Source code in src/exporters/base.py
@abstractmethod
def export_processing_notes(
    self, data: Union[ProcessedTypeData, AlignmentResult], output_path: Path
) -> None:
    """Export processing notes."""

export_aligned_complete_dataset(data: AlignmentResult, output_dir: Path) -> None abstractmethod

Export complete aligned dataset.

Source code in src/exporters/base.py
@abstractmethod
def export_aligned_complete_dataset(
    self, data: AlignmentResult, output_dir: Path
) -> None:
    """Export complete aligned dataset."""

export_aligned_monthly_split(data: AlignmentResult, month_dir: Path) -> None abstractmethod

Export monthly split for aligned data.

Source code in src/exporters/base.py
@abstractmethod
def export_aligned_monthly_split(
    self, data: AlignmentResult, month_dir: Path
) -> None:
    """Export monthly split for aligned data."""

export_data(processed_data: Dict[DataType, ProcessedTypeData], aligned_data: Optional[AlignmentResult] = None) -> None

Export all processed data and aligned data if available.

Source code in src/exporters/base.py
def export_data(
    self,
    processed_data: Dict[DataType, ProcessedTypeData],
    aligned_data: Optional[AlignmentResult] = None,
) -> None:
    """Export all processed data and aligned data if available."""
    if not processed_data and not aligned_data:
        return

    # Reset monthly notes cache
    self.monthly_notes_cache = {}

    # Get date range from either source
    date_range = self.get_date_range(
        next(iter(processed_data.values())) if processed_data else aligned_data
    )
    complete_dir = self.config.output_dir / f"{date_range}_complete"
    complete_dir.mkdir(parents=True, exist_ok=True)

    # Export individual datasets
    for data_type, type_data in processed_data.items():
        self.export_complete_dataset(type_data, data_type, complete_dir)
        if self.config.include_processing_notes:
            self.export_processing_notes(type_data, complete_dir)
        if self.config.split_by_month:
            self._handle_monthly_exports(type_data, data_type)

    # Export aligned data if available
    if aligned_data:
        self.export_aligned_complete_dataset(aligned_data, complete_dir)
        if self.config.include_processing_notes:
            self.export_processing_notes(aligned_data, complete_dir)
        if self.config.split_by_month:
            self._handle_monthly_aligned_exports(aligned_data)

get_date_range(data: Union[ProcessedTypeData, AlignmentResult]) -> str staticmethod

Get date range string from data.

Source code in src/exporters/base.py
@staticmethod
def get_date_range(data: Union[ProcessedTypeData, AlignmentResult]) -> str:
    """Get date range string from data."""
    df = data.dataframe
    if not isinstance(df.index, pd.DatetimeIndex):
        raise ValueError("DataFrame must have DatetimeIndex")
    start = df.index.min().strftime("%Y-%m-%d")
    end = df.index.max().strftime("%Y-%m-%d")
    return f"{start}_to_{end}"

CSVExporter

Bases: BaseExporter

CSV implementation of data exporter.

Source code in src/exporters/csv.py
class CSVExporter(BaseExporter):
    """CSV implementation of data exporter."""

    def export_complete_dataset(
        self, data: ProcessedTypeData, data_type: DataType, output_dir: Path
    ) -> None:
        """Export complete dataset as CSV."""
        filename = f"{data_type.name.lower()}.csv"
        data.dataframe.to_csv(output_dir / filename)

        if self.config.include_processing_notes:
            self.export_processing_notes(data, output_dir)

    def export_monthly_split(
        self, data: ProcessedTypeData, data_type: DataType, month_dir: Path
    ) -> None:
        """Export monthly split as CSV."""
        filename = f"{data_type.name.lower()}.csv"
        data.dataframe.to_csv(month_dir / filename)

    def export_aligned_complete_dataset(
        self, data: AlignmentResult, output_dir: Path
    ) -> None:
        """Export complete aligned dataset as CSV."""
        data.dataframe.to_csv(output_dir / "aligned_data.csv")

    def export_aligned_monthly_split(
        self, data: AlignmentResult, month_dir: Path
    ) -> None:
        """Export monthly split of aligned data as CSV."""
        data.dataframe.to_csv(month_dir / "aligned_data.csv")
        if self.config.include_processing_notes:
            self.export_processing_notes(data, month_dir)

    def export_processing_notes(
        self, data: Union[ProcessedTypeData, AlignmentResult], output_path: Path
    ) -> None:
        """Export processing notes as JSON."""
        common_data = {
            "export_date": pd.Timestamp.now().isoformat(),
            "date_range": f"{data.dataframe.index.min().strftime('%Y-%m-%d')} to {data.dataframe.index.max().strftime('%Y-%m-%d')}",
            "record_count": len(data.dataframe),
            "columns_present": list(data.dataframe.columns),
            "notes": data.processing_notes,
        }

        if isinstance(data, ProcessedTypeData):
            common_data["source_units"] = {
                k: v.value for k, v in data.source_units.items()
            }
        else:  # AlignmentResult
            common_data["frequency"] = data.frequency
            common_data["completeness"] = {
                col: f"{(data.dataframe[col].notna().mean() * 100):.2f}%"
                for col in data.dataframe.columns
            }
            common_data["source_units"] = {
                k: v.value for k, v in data.source_units.items()
            }

        with open(output_path / "processing_notes.json", "w", encoding="utf-8") as f:
            json.dump(common_data, f, indent=2, ensure_ascii=False)

export_complete_dataset(data: ProcessedTypeData, data_type: DataType, output_dir: Path) -> None

Export complete dataset as CSV.

Source code in src/exporters/csv.py
def export_complete_dataset(
    self, data: ProcessedTypeData, data_type: DataType, output_dir: Path
) -> None:
    """Export complete dataset as CSV."""
    filename = f"{data_type.name.lower()}.csv"
    data.dataframe.to_csv(output_dir / filename)

    if self.config.include_processing_notes:
        self.export_processing_notes(data, output_dir)

export_monthly_split(data: ProcessedTypeData, data_type: DataType, month_dir: Path) -> None

Export monthly split as CSV.

Source code in src/exporters/csv.py
def export_monthly_split(
    self, data: ProcessedTypeData, data_type: DataType, month_dir: Path
) -> None:
    """Export monthly split as CSV."""
    filename = f"{data_type.name.lower()}.csv"
    data.dataframe.to_csv(month_dir / filename)

export_aligned_complete_dataset(data: AlignmentResult, output_dir: Path) -> None

Export complete aligned dataset as CSV.

Source code in src/exporters/csv.py
def export_aligned_complete_dataset(
    self, data: AlignmentResult, output_dir: Path
) -> None:
    """Export complete aligned dataset as CSV."""
    data.dataframe.to_csv(output_dir / "aligned_data.csv")

export_aligned_monthly_split(data: AlignmentResult, month_dir: Path) -> None

Export monthly split of aligned data as CSV.

Source code in src/exporters/csv.py
def export_aligned_monthly_split(
    self, data: AlignmentResult, month_dir: Path
) -> None:
    """Export monthly split of aligned data as CSV."""
    data.dataframe.to_csv(month_dir / "aligned_data.csv")
    if self.config.include_processing_notes:
        self.export_processing_notes(data, month_dir)

export_processing_notes(data: Union[ProcessedTypeData, AlignmentResult], output_path: Path) -> None

Export processing notes as JSON.

Source code in src/exporters/csv.py
def export_processing_notes(
    self, data: Union[ProcessedTypeData, AlignmentResult], output_path: Path
) -> None:
    """Export processing notes as JSON."""
    common_data = {
        "export_date": pd.Timestamp.now().isoformat(),
        "date_range": f"{data.dataframe.index.min().strftime('%Y-%m-%d')} to {data.dataframe.index.max().strftime('%Y-%m-%d')}",
        "record_count": len(data.dataframe),
        "columns_present": list(data.dataframe.columns),
        "notes": data.processing_notes,
    }

    if isinstance(data, ProcessedTypeData):
        common_data["source_units"] = {
            k: v.value for k, v in data.source_units.items()
        }
    else:  # AlignmentResult
        common_data["frequency"] = data.frequency
        common_data["completeness"] = {
            col: f"{(data.dataframe[col].notna().mean() * 100):.2f}%"
            for col in data.dataframe.columns
        }
        common_data["source_units"] = {
            k: v.value for k, v in data.source_units.items()
        }

    with open(output_path / "processing_notes.json", "w", encoding="utf-8") as f:
        json.dump(common_data, f, indent=2, ensure_ascii=False)