Skip to content

Processors API

src.processors

Initialize processors package and register all processors.

__all__ = ['DataProcessor', 'CGMProcessor', 'BGMProcessor', 'CarbsProcessor', 'InsulinProcessor', 'NotesProcessor'] module-attribute

DataProcessor

Main processor class that handles processing of all data types.

Source code in src/processors/base.py
class DataProcessor:
    """Main processor class that handles processing of all data types."""

    _type_processors: Dict[DataType, Type[BaseTypeProcessor]] = {}

    # Define defaults for insulin classification centrally
    DEFAULT_INSULIN_BOLUS_LIMIT: float = 8.0
    DEFAULT_INSULIN_MAX_DOSE: float = 15.0

    @staticmethod
    def create_table_configs(
        detected_format: DeviceFormat,
    ) -> Dict[str, TableStructure]:
        """
        Creates a table configuration dictionary from detected format.

        Args:
            detected_format: Format object containing files and their table configurations

        Returns:
            Dict[str, TableStructure]: Dictionary mapping table names to their structures
        """
        try:
            return {
                table.name: table
                for file_config in detected_format.files
                for table in file_config.tables
            }
        except Exception as e:
            logger.error("Failed to create table configurations: %s", str(e))
            raise ProcessingError("Failed to create table configurations") from e

    def process_tables(
        self,
        table_data: Dict[str, TableData],
        detected_format: DeviceFormat,
        interpolation_limit: Optional[
            Any
        ] = None,  # Optional parameter for CGM processor
        bolus_limit: Optional[
            float
        ] = None,  # Optional parameters for insulin processor
        max_dose: Optional[float] = None,
    ) -> Dict[DataType, ProcessedTypeData]:
        """
        Process all tables according to their configuration.

        Args:
            table_data: Dictionary mapping table names to their data
            detected_format: Format object containing table configurations
            interpolation_limit: Max length of gaps to interpolate
            bolus_limit: Maximum insulin dose to be classified as bolus(default = 8)
            max_dose: Maximum insulin dose - all over will be discarded

        Returns:
            Dict[DataType, ProcessedTypeData]: Processed data organized by type
        """
        table_configs = self.create_table_configs(detected_format)

        # Rest of your existing process_tables implementation
        type_data: Dict[DataType, List[ColumnData]] = {}

        for table_name, data in table_data.items():
            config = table_configs[table_name]

            # Group columns by data type
            for column in config.columns:
                if column.data_type:
                    # Include insulin meta data with insulin data
                    target_type = (
                        DataType.INSULIN
                        if column.data_type == DataType.INSULIN_META
                        else column.data_type
                    )

                    df_subset = data.dataframe[[column.source_name]].copy()
                    df_subset.columns = ["value"]

                    column_data = ColumnData(
                        dataframe=df_subset,
                        unit=column.unit,
                        config=column,
                        is_primary=column.is_primary,
                    )

                    if target_type not in type_data:
                        type_data[target_type] = []

                    type_data[target_type].append(column_data)

        # Process each data type
        results = {}
        for data_type, columns in type_data.items():
            try:
                processor = self.get_processor_for_type(data_type)

                # Inject optional parameters based on processor type
                if data_type == DataType.CGM and interpolation_limit is not None:
                    result = processor.process_type(columns, interpolation_limit)
                elif data_type == DataType.INSULIN:
                    # Use provided values or defaults, but never None
                    final_bolus_limit = (
                        bolus_limit
                        if bolus_limit is not None
                        else self.DEFAULT_INSULIN_BOLUS_LIMIT
                    )
                    final_max_dose = (
                        max_dose
                        if max_dose is not None
                        else self.DEFAULT_INSULIN_MAX_DOSE
                    )
                    result = processor.process_type(
                        columns, final_bolus_limit, final_max_dose
                    )
                else:
                    result = processor.process_type(columns)

                if not result.dataframe.empty:
                    results[data_type] = result

                    col_count = len(columns)
                    primary_count = sum(1 for c in columns if c.is_primary)
                    logger.info(
                        "    \u2713 Processed %s: %d primary and %d secondary columns",
                        data_type.name,
                        primary_count,
                        col_count - primary_count,
                    )

            except ProcessingError as e:
                logger.error("Error processing %s: %s", data_type, str(e))
                continue

        return results

    @classmethod
    def register_processor(cls, data_type: DataType):
        """Register a processor class for a specific data type."""

        def wrapper(processor_cls: Type[BaseTypeProcessor]):
            cls._type_processors[data_type] = processor_cls
            return processor_cls

        return wrapper

    def get_processor_for_type(self, data_type: DataType) -> BaseTypeProcessor:
        """Get appropriate processor instance for the data type."""
        processor_cls = self._type_processors.get(data_type)
        if processor_cls is None:
            raise ProcessingError(
                f"No processor registered for data type: {data_type.value}"
            )
        return processor_cls()

_type_processors: Dict[DataType, Type[BaseTypeProcessor]] = {} class-attribute instance-attribute

DEFAULT_INSULIN_BOLUS_LIMIT: float = 8.0 class-attribute instance-attribute

DEFAULT_INSULIN_MAX_DOSE: float = 15.0 class-attribute instance-attribute

create_table_configs(detected_format: DeviceFormat) -> Dict[str, TableStructure] staticmethod

Creates a table configuration dictionary from detected format.

Parameters:

Name Type Description Default
detected_format DeviceFormat

Format object containing files and their table configurations

required

Returns:

Type Description
Dict[str, TableStructure]

Dict[str, TableStructure]: Dictionary mapping table names to their structures

Source code in src/processors/base.py
@staticmethod
def create_table_configs(
    detected_format: DeviceFormat,
) -> Dict[str, TableStructure]:
    """
    Creates a table configuration dictionary from detected format.

    Args:
        detected_format: Format object containing files and their table configurations

    Returns:
        Dict[str, TableStructure]: Dictionary mapping table names to their structures
    """
    try:
        return {
            table.name: table
            for file_config in detected_format.files
            for table in file_config.tables
        }
    except Exception as e:
        logger.error("Failed to create table configurations: %s", str(e))
        raise ProcessingError("Failed to create table configurations") from e

process_tables(table_data: Dict[str, TableData], detected_format: DeviceFormat, interpolation_limit: Optional[Any] = None, bolus_limit: Optional[float] = None, max_dose: Optional[float] = None) -> Dict[DataType, ProcessedTypeData]

Process all tables according to their configuration.

Parameters:

Name Type Description Default
table_data Dict[str, TableData]

Dictionary mapping table names to their data

required
detected_format DeviceFormat

Format object containing table configurations

required
interpolation_limit Optional[Any]

Max length of gaps to interpolate

None
bolus_limit Optional[float]

Maximum insulin dose to be classified as bolus(default = 8)

None
max_dose Optional[float]

Maximum insulin dose - all over will be discarded

None

Returns:

Type Description
Dict[DataType, ProcessedTypeData]

Dict[DataType, ProcessedTypeData]: Processed data organized by type

Source code in src/processors/base.py
def process_tables(
    self,
    table_data: Dict[str, TableData],
    detected_format: DeviceFormat,
    interpolation_limit: Optional[
        Any
    ] = None,  # Optional parameter for CGM processor
    bolus_limit: Optional[
        float
    ] = None,  # Optional parameters for insulin processor
    max_dose: Optional[float] = None,
) -> Dict[DataType, ProcessedTypeData]:
    """
    Process all tables according to their configuration.

    Args:
        table_data: Dictionary mapping table names to their data
        detected_format: Format object containing table configurations
        interpolation_limit: Max length of gaps to interpolate
        bolus_limit: Maximum insulin dose to be classified as bolus(default = 8)
        max_dose: Maximum insulin dose - all over will be discarded

    Returns:
        Dict[DataType, ProcessedTypeData]: Processed data organized by type
    """
    table_configs = self.create_table_configs(detected_format)

    # Rest of your existing process_tables implementation
    type_data: Dict[DataType, List[ColumnData]] = {}

    for table_name, data in table_data.items():
        config = table_configs[table_name]

        # Group columns by data type
        for column in config.columns:
            if column.data_type:
                # Include insulin meta data with insulin data
                target_type = (
                    DataType.INSULIN
                    if column.data_type == DataType.INSULIN_META
                    else column.data_type
                )

                df_subset = data.dataframe[[column.source_name]].copy()
                df_subset.columns = ["value"]

                column_data = ColumnData(
                    dataframe=df_subset,
                    unit=column.unit,
                    config=column,
                    is_primary=column.is_primary,
                )

                if target_type not in type_data:
                    type_data[target_type] = []

                type_data[target_type].append(column_data)

    # Process each data type
    results = {}
    for data_type, columns in type_data.items():
        try:
            processor = self.get_processor_for_type(data_type)

            # Inject optional parameters based on processor type
            if data_type == DataType.CGM and interpolation_limit is not None:
                result = processor.process_type(columns, interpolation_limit)
            elif data_type == DataType.INSULIN:
                # Use provided values or defaults, but never None
                final_bolus_limit = (
                    bolus_limit
                    if bolus_limit is not None
                    else self.DEFAULT_INSULIN_BOLUS_LIMIT
                )
                final_max_dose = (
                    max_dose
                    if max_dose is not None
                    else self.DEFAULT_INSULIN_MAX_DOSE
                )
                result = processor.process_type(
                    columns, final_bolus_limit, final_max_dose
                )
            else:
                result = processor.process_type(columns)

            if not result.dataframe.empty:
                results[data_type] = result

                col_count = len(columns)
                primary_count = sum(1 for c in columns if c.is_primary)
                logger.info(
                    "    \u2713 Processed %s: %d primary and %d secondary columns",
                    data_type.name,
                    primary_count,
                    col_count - primary_count,
                )

        except ProcessingError as e:
            logger.error("Error processing %s: %s", data_type, str(e))
            continue

    return results

register_processor(data_type: DataType) classmethod

Register a processor class for a specific data type.

Source code in src/processors/base.py
@classmethod
def register_processor(cls, data_type: DataType):
    """Register a processor class for a specific data type."""

    def wrapper(processor_cls: Type[BaseTypeProcessor]):
        cls._type_processors[data_type] = processor_cls
        return processor_cls

    return wrapper

get_processor_for_type(data_type: DataType) -> BaseTypeProcessor

Get appropriate processor instance for the data type.

Source code in src/processors/base.py
def get_processor_for_type(self, data_type: DataType) -> BaseTypeProcessor:
    """Get appropriate processor instance for the data type."""
    processor_cls = self._type_processors.get(data_type)
    if processor_cls is None:
        raise ProcessingError(
            f"No processor registered for data type: {data_type.value}"
        )
    return processor_cls()

BGMProcessor

Bases: BaseTypeProcessor

Processes BGM data with validation and cleaning.

Source code in src/processors/bgm.py
@DataProcessor.register_processor(DataType.BGM)
class BGMProcessor(BaseTypeProcessor):
    """Processes BGM data with validation and cleaning."""

    def process_type(
        self,
        columns: List[ColumnData],
    ) -> ProcessedTypeData:
        """Process all BGM data from various sources.

        Args:
            columns: List of ColumnData containing all BGM data columns

        Returns:
            ProcessedTypeData containing combined and cleaned BGM data
        """
        processing_notes = []

        try:
            # Validate we have at least one primary column
            if not any(col.is_primary for col in columns):
                raise ProcessingError("No primary BGM column found")

            # Create initial dataframe
            combined_df = pd.DataFrame(index=pd.DatetimeIndex([]))
            column_units = {}

            # First pass - combine all data and convert units
            for idx, col_data in enumerate(
                sorted(columns, key=lambda x: (not x.is_primary))
            ):
                # Convert to mg/dL if needed
                df = col_data.dataframe.copy()

                # Filter out invalid/zero values before unit conversion
                valid_mask = df["value"] > 0.0
                invalid_count = (~valid_mask).sum()
                if invalid_count > 0:
                    processing_notes.append(
                        f"Removed {invalid_count} invalid/zero values from column {idx + 1}"
                    )
                df = df[valid_mask]

                if col_data.unit == Unit.MMOL:
                    df["value"] = df["value"] * 18.0182
                    processing_notes.append(
                        f"Converted BGM column {idx + 1} from {Unit.MMOL.value} to {Unit.MGDL.value}"
                    )

                # Generate column name
                new_name = self._generate_column_name(
                    DataType.BGM, col_data.is_primary, idx
                )
                df.columns = [new_name]

                # Add clipped flag column before any clipping occurs
                clipped_name = f"{new_name}_clipped"
                df[clipped_name] = False

                # Identify values outside valid range
                below_range = df[new_name] < 39.64
                above_range = df[new_name] > 360.36

                # Update clipped flags
                df.loc[below_range | above_range, clipped_name] = True

                # Clip values to valid range
                df[new_name] = df[new_name].clip(lower=39.64, upper=360.36)

                # Log clipping statistics
                clipped_count = (below_range | above_range).sum()
                if clipped_count > 0:
                    processing_notes.append(
                        f"Clipped {clipped_count} values in column {new_name} "
                        f"({below_range.sum()} below range, {above_range.sum()} above range)"
                    )

                # Merge with existing data
                if combined_df.empty:
                    combined_df = df
                else:
                    combined_df = combined_df.join(df, how="outer")

                column_units[new_name] = Unit.MGDL

            # Handle duplicate timestamps by keeping the last value
            duplicates = combined_df.index.duplicated(keep="last")
            if duplicates.any():
                dup_count = duplicates.sum()
                processing_notes.append(
                    f"Found {dup_count} duplicate timestamps - keeping last values"
                )
                combined_df = combined_df[~duplicates]

            # Create mmol/L columns for each mg/dL column (excluding clipped flag columns)
            value_columns = [
                col for col in combined_df.columns if not col.endswith("_clipped")
            ]
            for col in value_columns:
                mmol_col = f"{col}_mmol"
                combined_df[mmol_col] = combined_df[col] * 0.0555
                column_units[mmol_col] = Unit.MMOL

            # Track stats about the processed data
            total_readings = len(combined_df)
            readings_per_day = (
                total_readings
                / (
                    (combined_df.index.max() - combined_df.index.min()).total_seconds()
                    / 86400
                )
                if total_readings > 0
                else 0
            )

            processing_notes.extend(
                [
                    f"Processed {total_readings} total BGM readings",
                    f"Average of {readings_per_day:.1f} readings per day",
                    f"Date range: {combined_df.index.min()} to {combined_df.index.max()}",
                ]
            )

            return ProcessedTypeData(
                dataframe=combined_df,
                source_units=column_units,
                processing_notes=processing_notes,
            )

        except Exception as e:
            raise ProcessingError(f"Error processing BGM data: {str(e)}") from e

process_type(columns: List[ColumnData]) -> ProcessedTypeData

Process all BGM data from various sources.

Parameters:

Name Type Description Default
columns List[ColumnData]

List of ColumnData containing all BGM data columns

required

Returns:

Type Description
ProcessedTypeData

ProcessedTypeData containing combined and cleaned BGM data

Source code in src/processors/bgm.py
def process_type(
    self,
    columns: List[ColumnData],
) -> ProcessedTypeData:
    """Process all BGM data from various sources.

    Args:
        columns: List of ColumnData containing all BGM data columns

    Returns:
        ProcessedTypeData containing combined and cleaned BGM data
    """
    processing_notes = []

    try:
        # Validate we have at least one primary column
        if not any(col.is_primary for col in columns):
            raise ProcessingError("No primary BGM column found")

        # Create initial dataframe
        combined_df = pd.DataFrame(index=pd.DatetimeIndex([]))
        column_units = {}

        # First pass - combine all data and convert units
        for idx, col_data in enumerate(
            sorted(columns, key=lambda x: (not x.is_primary))
        ):
            # Convert to mg/dL if needed
            df = col_data.dataframe.copy()

            # Filter out invalid/zero values before unit conversion
            valid_mask = df["value"] > 0.0
            invalid_count = (~valid_mask).sum()
            if invalid_count > 0:
                processing_notes.append(
                    f"Removed {invalid_count} invalid/zero values from column {idx + 1}"
                )
            df = df[valid_mask]

            if col_data.unit == Unit.MMOL:
                df["value"] = df["value"] * 18.0182
                processing_notes.append(
                    f"Converted BGM column {idx + 1} from {Unit.MMOL.value} to {Unit.MGDL.value}"
                )

            # Generate column name
            new_name = self._generate_column_name(
                DataType.BGM, col_data.is_primary, idx
            )
            df.columns = [new_name]

            # Add clipped flag column before any clipping occurs
            clipped_name = f"{new_name}_clipped"
            df[clipped_name] = False

            # Identify values outside valid range
            below_range = df[new_name] < 39.64
            above_range = df[new_name] > 360.36

            # Update clipped flags
            df.loc[below_range | above_range, clipped_name] = True

            # Clip values to valid range
            df[new_name] = df[new_name].clip(lower=39.64, upper=360.36)

            # Log clipping statistics
            clipped_count = (below_range | above_range).sum()
            if clipped_count > 0:
                processing_notes.append(
                    f"Clipped {clipped_count} values in column {new_name} "
                    f"({below_range.sum()} below range, {above_range.sum()} above range)"
                )

            # Merge with existing data
            if combined_df.empty:
                combined_df = df
            else:
                combined_df = combined_df.join(df, how="outer")

            column_units[new_name] = Unit.MGDL

        # Handle duplicate timestamps by keeping the last value
        duplicates = combined_df.index.duplicated(keep="last")
        if duplicates.any():
            dup_count = duplicates.sum()
            processing_notes.append(
                f"Found {dup_count} duplicate timestamps - keeping last values"
            )
            combined_df = combined_df[~duplicates]

        # Create mmol/L columns for each mg/dL column (excluding clipped flag columns)
        value_columns = [
            col for col in combined_df.columns if not col.endswith("_clipped")
        ]
        for col in value_columns:
            mmol_col = f"{col}_mmol"
            combined_df[mmol_col] = combined_df[col] * 0.0555
            column_units[mmol_col] = Unit.MMOL

        # Track stats about the processed data
        total_readings = len(combined_df)
        readings_per_day = (
            total_readings
            / (
                (combined_df.index.max() - combined_df.index.min()).total_seconds()
                / 86400
            )
            if total_readings > 0
            else 0
        )

        processing_notes.extend(
            [
                f"Processed {total_readings} total BGM readings",
                f"Average of {readings_per_day:.1f} readings per day",
                f"Date range: {combined_df.index.min()} to {combined_df.index.max()}",
            ]
        )

        return ProcessedTypeData(
            dataframe=combined_df,
            source_units=column_units,
            processing_notes=processing_notes,
        )

    except Exception as e:
        raise ProcessingError(f"Error processing BGM data: {str(e)}") from e

CarbsProcessor

Bases: BaseTypeProcessor

Processes carbohydrate intake data with validation and cleaning.

Source code in src/processors/carbs.py
@DataProcessor.register_processor(DataType.CARBS)
class CarbsProcessor(BaseTypeProcessor):
    """Processes carbohydrate intake data with validation and cleaning."""

    def process_type(
        self,
        columns: List[ColumnData],
    ) -> ProcessedTypeData:
        """Process all carbohydrate data from various sources.

        Args:
            columns: List of ColumnData containing all carb data columns

        Returns:
            ProcessedTypeData containing combined and cleaned carb data
        """
        processing_notes = []

        try:
            # Validate we have at least one primary column
            if not any(col.is_primary for col in columns):
                raise ProcessingError("No primary carbohydrate column found")

            # Sort columns to ensure primary is first
            sorted_columns = sorted(columns, key=lambda x: (not x.is_primary))

            # Combine all columns with standardized names
            combined_df, column_units = self._combine_and_rename_columns(
                sorted_columns, DataType.CARBS
            )

            if combined_df.empty:
                raise ProcessingError("No carbohydrate data to process")

            # Log what we're processing
            processing_notes.append(
                f"Processing {len(combined_df.columns)} carb columns: {', '.join(combined_df.columns)}"
            )

            # Track original row count
            original_count = len(combined_df)

            # Process each carb column
            for col in combined_df.columns:
                # Keep only rows where carbs is >= 1.0 grams
                mask = combined_df[col] >= 1.0
                combined_df.loc[~mask, col] = None

                filtered_count = mask.sum()
                processing_notes.append(
                    f"Column {col}: Kept {filtered_count} entries ≥1g "
                    f"({filtered_count / original_count * 100:.1f}%)"
                )

            # Drop rows where all values are null (no significant carbs in any column)
            combined_df = combined_df.dropna(how="all")

            # Handle duplicate timestamps by keeping the first occurrence
            duplicates = combined_df.index.duplicated()
            if duplicates.any():
                dup_count = duplicates.sum()
                processing_notes.append(f"Removed {dup_count} duplicate timestamps")
                combined_df = combined_df[~duplicates]

            # Final stats
            processing_notes.append(
                f"Final dataset contains {len(combined_df)} entries "
                f"from {original_count} original records"
            )

            return ProcessedTypeData(
                dataframe=combined_df,
                source_units=column_units,
                processing_notes=processing_notes,
            )

        except Exception as e:
            raise ProcessingError(
                f"Error processing carbohydrate data: {str(e)}"
            ) from e

process_type(columns: List[ColumnData]) -> ProcessedTypeData

Process all carbohydrate data from various sources.

Parameters:

Name Type Description Default
columns List[ColumnData]

List of ColumnData containing all carb data columns

required

Returns:

Type Description
ProcessedTypeData

ProcessedTypeData containing combined and cleaned carb data

Source code in src/processors/carbs.py
def process_type(
    self,
    columns: List[ColumnData],
) -> ProcessedTypeData:
    """Process all carbohydrate data from various sources.

    Args:
        columns: List of ColumnData containing all carb data columns

    Returns:
        ProcessedTypeData containing combined and cleaned carb data
    """
    processing_notes = []

    try:
        # Validate we have at least one primary column
        if not any(col.is_primary for col in columns):
            raise ProcessingError("No primary carbohydrate column found")

        # Sort columns to ensure primary is first
        sorted_columns = sorted(columns, key=lambda x: (not x.is_primary))

        # Combine all columns with standardized names
        combined_df, column_units = self._combine_and_rename_columns(
            sorted_columns, DataType.CARBS
        )

        if combined_df.empty:
            raise ProcessingError("No carbohydrate data to process")

        # Log what we're processing
        processing_notes.append(
            f"Processing {len(combined_df.columns)} carb columns: {', '.join(combined_df.columns)}"
        )

        # Track original row count
        original_count = len(combined_df)

        # Process each carb column
        for col in combined_df.columns:
            # Keep only rows where carbs is >= 1.0 grams
            mask = combined_df[col] >= 1.0
            combined_df.loc[~mask, col] = None

            filtered_count = mask.sum()
            processing_notes.append(
                f"Column {col}: Kept {filtered_count} entries ≥1g "
                f"({filtered_count / original_count * 100:.1f}%)"
            )

        # Drop rows where all values are null (no significant carbs in any column)
        combined_df = combined_df.dropna(how="all")

        # Handle duplicate timestamps by keeping the first occurrence
        duplicates = combined_df.index.duplicated()
        if duplicates.any():
            dup_count = duplicates.sum()
            processing_notes.append(f"Removed {dup_count} duplicate timestamps")
            combined_df = combined_df[~duplicates]

        # Final stats
        processing_notes.append(
            f"Final dataset contains {len(combined_df)} entries "
            f"from {original_count} original records"
        )

        return ProcessedTypeData(
            dataframe=combined_df,
            source_units=column_units,
            processing_notes=processing_notes,
        )

    except Exception as e:
        raise ProcessingError(
            f"Error processing carbohydrate data: {str(e)}"
        ) from e

CGMProcessor

Bases: BaseTypeProcessor

Processes CGM data with validation and cleaning.

Source code in src/processors/cgm.py
@DataProcessor.register_processor(DataType.CGM)
class CGMProcessor(BaseTypeProcessor):
    """Processes CGM data with validation and cleaning."""

    def process_type(
        self,
        columns: List[ColumnData],
        interpolation_limit: int = 4,
    ) -> ProcessedTypeData:
        """Process all CGM data from various sources.

        Args:
            columns: List of ColumnData containing all CGM data columns
            interpolation_limit: Maximum number of consecutive missing values to interpolate
                               (default: 4, equivalent to 20 minutes at 5-min intervals)

        Returns:
            ProcessedTypeData containing combined and cleaned CGM data
        """
        processing_notes = []

        try:
            # Validate we have at least one primary column
            if not any(col.is_primary for col in columns):
                raise ProcessingError("No primary CGM column found")

            # Sort columns to ensure primary is first
            sorted_columns = sorted(columns, key=lambda x: (not x.is_primary))

            # Create initial dataframe
            combined_df = pd.DataFrame(index=pd.DatetimeIndex([]))
            column_units = {}

            # First pass - combine all data and convert units
            for idx, col_data in enumerate(sorted_columns):
                # Convert to mg/dL if needed
                df = col_data.dataframe.copy()
                if col_data.unit == Unit.MMOL:
                    df["value"] = df["value"] * 18.0182
                    processing_notes.append(
                        f"Converted CGM column {idx + 1} from {Unit.MMOL.value} to {Unit.MGDL.value}"
                    )

                # Generate column name
                new_name = self._generate_column_name(
                    DataType.CGM, col_data.is_primary, idx
                )
                df.columns = [new_name]

                # Merge with existing data
                if combined_df.empty:
                    combined_df = df
                else:
                    combined_df = combined_df.join(df, how="outer")

                column_units[new_name] = col_data.unit

            # Round all timestamps to nearest 5 minute interval
            combined_df.index = combined_df.index.round("5min")

            # Handle duplicate times by taking mean
            combined_df = combined_df.groupby(level=0).mean()

            # Create complete 5-minute interval index
            full_index = pd.date_range(
                start=combined_df.index.min(), end=combined_df.index.max(), freq="5min"
            )

            # Reindex to include all intervals
            combined_df = combined_df.reindex(full_index)

            # Get primary column name
            primary_col = next(col for col in combined_df.columns if "primary" in col)

            # Create missing flags for each column
            missing_flags = pd.DataFrame(index=combined_df.index)
            missing_flags["missing"] = combined_df[primary_col].isna()

            # Handle interpolation for each column
            for col in combined_df.columns:
                # Create groups of consecutive missing values
                gap_groups = (~combined_df[col].isna()).cumsum()

                # Within each False group (where missing=True), count the group size
                gap_size = (
                    combined_df[combined_df[col].isna()].groupby(gap_groups).size()
                )

                # Identify gap groups that are larger than interpolation_limit
                large_gaps = gap_size[gap_size > interpolation_limit].index

                # Interpolate all gaps initially
                combined_df[col] = combined_df[col].interpolate(
                    method="linear",
                    limit=interpolation_limit,
                    limit_direction="forward",
                )

                # Reset interpolated values back to NaN for large gaps
                for gap_group in large_gaps:
                    mask = (gap_groups == gap_group) & combined_df[col].isna()
                    combined_df.loc[mask, col] = np.nan

                # Clip values to valid range
                combined_df[col] = combined_df[col].clip(lower=39.64, upper=360.36)

            # Create mmol/L columns for each mg/dL column
            for col in combined_df.columns.copy():
                mmol_col = f"{col}_mmol"
                combined_df[mmol_col] = combined_df[col] * 0.0555
                column_units[mmol_col] = Unit.MMOL
                column_units[col] = Unit.MGDL

            # Add the missing flags
            combined_df["missing"] = missing_flags["missing"]

            # Track stats about the processed data
            total_readings = len(combined_df)
            missing_primary = combined_df["missing"].sum()
            total_na = combined_df["cgm_primary"].isna().sum()
            initial_completeness_percent = (
                (total_readings - missing_primary) / total_readings
            ) * 100
            remaining_completeness_percent = (
                (total_readings - total_na) / total_readings
            ) * 100
            processing_notes.extend(
                [
                    f"Processed {total_readings} total CGM readings",
                    f"Found {missing_primary} missing or interpolated values in primary data",
                    f"Found {total_na} missing values after interpolation",
                    f"Initial CGM dataset completeness: {initial_completeness_percent:.2f}%",
                    f"CGM completeness after interpolation: {remaining_completeness_percent:.2f}%",
                ]
            )

            return ProcessedTypeData(
                dataframe=combined_df,
                source_units=column_units,
                processing_notes=processing_notes,
            )

        except Exception as e:
            raise ProcessingError(f"Error processing CGM data: {str(e)}") from e

process_type(columns: List[ColumnData], interpolation_limit: int = 4) -> ProcessedTypeData

Process all CGM data from various sources.

Parameters:

Name Type Description Default
columns List[ColumnData]

List of ColumnData containing all CGM data columns

required
interpolation_limit int

Maximum number of consecutive missing values to interpolate (default: 4, equivalent to 20 minutes at 5-min intervals)

4

Returns:

Type Description
ProcessedTypeData

ProcessedTypeData containing combined and cleaned CGM data

Source code in src/processors/cgm.py
def process_type(
    self,
    columns: List[ColumnData],
    interpolation_limit: int = 4,
) -> ProcessedTypeData:
    """Process all CGM data from various sources.

    Args:
        columns: List of ColumnData containing all CGM data columns
        interpolation_limit: Maximum number of consecutive missing values to interpolate
                           (default: 4, equivalent to 20 minutes at 5-min intervals)

    Returns:
        ProcessedTypeData containing combined and cleaned CGM data
    """
    processing_notes = []

    try:
        # Validate we have at least one primary column
        if not any(col.is_primary for col in columns):
            raise ProcessingError("No primary CGM column found")

        # Sort columns to ensure primary is first
        sorted_columns = sorted(columns, key=lambda x: (not x.is_primary))

        # Create initial dataframe
        combined_df = pd.DataFrame(index=pd.DatetimeIndex([]))
        column_units = {}

        # First pass - combine all data and convert units
        for idx, col_data in enumerate(sorted_columns):
            # Convert to mg/dL if needed
            df = col_data.dataframe.copy()
            if col_data.unit == Unit.MMOL:
                df["value"] = df["value"] * 18.0182
                processing_notes.append(
                    f"Converted CGM column {idx + 1} from {Unit.MMOL.value} to {Unit.MGDL.value}"
                )

            # Generate column name
            new_name = self._generate_column_name(
                DataType.CGM, col_data.is_primary, idx
            )
            df.columns = [new_name]

            # Merge with existing data
            if combined_df.empty:
                combined_df = df
            else:
                combined_df = combined_df.join(df, how="outer")

            column_units[new_name] = col_data.unit

        # Round all timestamps to nearest 5 minute interval
        combined_df.index = combined_df.index.round("5min")

        # Handle duplicate times by taking mean
        combined_df = combined_df.groupby(level=0).mean()

        # Create complete 5-minute interval index
        full_index = pd.date_range(
            start=combined_df.index.min(), end=combined_df.index.max(), freq="5min"
        )

        # Reindex to include all intervals
        combined_df = combined_df.reindex(full_index)

        # Get primary column name
        primary_col = next(col for col in combined_df.columns if "primary" in col)

        # Create missing flags for each column
        missing_flags = pd.DataFrame(index=combined_df.index)
        missing_flags["missing"] = combined_df[primary_col].isna()

        # Handle interpolation for each column
        for col in combined_df.columns:
            # Create groups of consecutive missing values
            gap_groups = (~combined_df[col].isna()).cumsum()

            # Within each False group (where missing=True), count the group size
            gap_size = (
                combined_df[combined_df[col].isna()].groupby(gap_groups).size()
            )

            # Identify gap groups that are larger than interpolation_limit
            large_gaps = gap_size[gap_size > interpolation_limit].index

            # Interpolate all gaps initially
            combined_df[col] = combined_df[col].interpolate(
                method="linear",
                limit=interpolation_limit,
                limit_direction="forward",
            )

            # Reset interpolated values back to NaN for large gaps
            for gap_group in large_gaps:
                mask = (gap_groups == gap_group) & combined_df[col].isna()
                combined_df.loc[mask, col] = np.nan

            # Clip values to valid range
            combined_df[col] = combined_df[col].clip(lower=39.64, upper=360.36)

        # Create mmol/L columns for each mg/dL column
        for col in combined_df.columns.copy():
            mmol_col = f"{col}_mmol"
            combined_df[mmol_col] = combined_df[col] * 0.0555
            column_units[mmol_col] = Unit.MMOL
            column_units[col] = Unit.MGDL

        # Add the missing flags
        combined_df["missing"] = missing_flags["missing"]

        # Track stats about the processed data
        total_readings = len(combined_df)
        missing_primary = combined_df["missing"].sum()
        total_na = combined_df["cgm_primary"].isna().sum()
        initial_completeness_percent = (
            (total_readings - missing_primary) / total_readings
        ) * 100
        remaining_completeness_percent = (
            (total_readings - total_na) / total_readings
        ) * 100
        processing_notes.extend(
            [
                f"Processed {total_readings} total CGM readings",
                f"Found {missing_primary} missing or interpolated values in primary data",
                f"Found {total_na} missing values after interpolation",
                f"Initial CGM dataset completeness: {initial_completeness_percent:.2f}%",
                f"CGM completeness after interpolation: {remaining_completeness_percent:.2f}%",
            ]
        )

        return ProcessedTypeData(
            dataframe=combined_df,
            source_units=column_units,
            processing_notes=processing_notes,
        )

    except Exception as e:
        raise ProcessingError(f"Error processing CGM data: {str(e)}") from e

InsulinProcessor

Bases: BaseTypeProcessor

Processes insulin dose data with classification from meta data if available.

Source code in src/processors/insulin.py
@DataProcessor.register_processor(DataType.INSULIN)
class InsulinProcessor(BaseTypeProcessor):
    """Processes insulin dose data with classification from meta data if available."""

    def _extract_meta_info(self, meta_value: str) -> Tuple[bool, bool, Optional[str]]:
        """Extract insulin type information from meta JSON.

        Returns:
            Tuple of (is_bolus, is_basal, insulin_type)
        """
        try:
            meta_data = json.loads(meta_value)
            if meta_data and isinstance(meta_data, list):
                insulin = meta_data[0].get("insulin", "").lower()
                if "novorapid" in insulin:
                    return True, False, "novorapid"
                if "levemir" in insulin:
                    return False, True, "levemir"
        except (json.JSONDecodeError, IndexError, KeyError, AttributeError):
            pass

        return False, False, None

    def process_type(
        self,
        columns: List[ColumnData],
        bolus_limit: float = 8.0,
        max_limit: float = 15.0,
    ) -> ProcessedTypeData:
        """Process insulin data and classify doses.

        Args:
            columns: List of ColumnData containing insulin data and metadata
            bolus_limit: Maximum insulin units to classify as bolus
            max_limit: Maximum valid insulin dose
        """
        processing_notes = []

        try:
            # Find insulin dose and meta columns
            dose_cols = [col for col in columns if col.data_type == DataType.INSULIN]
            meta_cols = [
                col for col in columns if col.data_type == DataType.INSULIN_META
            ]

            if not any(col.is_primary for col in dose_cols):
                raise ProcessingError("No primary insulin dose column found")

            # Initialize result DataFrame with dose data
            result_df = pd.DataFrame()
            result_units = {}

            # Process dose data first
            for col in dose_cols:
                df = col.dataframe.copy()

                # Keep only positive doses
                valid_mask = df["value"] > 0.0
                df = df[valid_mask]

                if len(df) > 0:
                    processing_notes.append(f"Found {len(df)} positive doses")

                    # Add dose column
                    result_df["dose"] = df["value"]
                    result_units["dose"] = col.unit

                    # Initial classification based on dose
                    result_df["is_bolus"] = df["value"] <= bolus_limit
                    result_df["is_basal"] = (df["value"] > bolus_limit) & (
                        df["value"] <= max_limit
                    )
                    result_df["type"] = ""  # Will be filled by metadata if available

                    # Track classification stats
                    processing_notes.extend(
                        [
                            "Initial dose-based classification:",
                            f"- {result_df['is_bolus'].sum()} doses classified as bolus (≤{bolus_limit}U)",
                            f"- {result_df['is_basal'].sum()} doses classified as basal (>{bolus_limit}U)",
                            f"- Dropped {(df['value'] > max_limit).sum()} doses exceeding {max_limit}U",
                        ]
                    )

            # Update classification with metadata if available
            if meta_cols and not result_df.empty:
                meta_updates = 0
                for col in meta_cols:
                    for idx, meta_value in col.dataframe["value"].items():
                        if idx in result_df.index:
                            is_bolus, is_basal, insulin_type = self._extract_meta_info(
                                meta_value
                            )
                            if insulin_type:
                                result_df.loc[idx, "is_bolus"] = is_bolus
                                result_df.loc[idx, "is_basal"] = is_basal
                                result_df.loc[idx, "type"] = insulin_type
                                meta_updates += 1

                if meta_updates > 0:
                    processing_notes.append(
                        f"Updated {meta_updates} classifications using metadata"
                    )

            # Final stats
            processing_notes.append(
                f"Final dataset contains {len(result_df)} insulin records"
            )

            return ProcessedTypeData(
                dataframe=result_df,
                source_units=result_units,
                processing_notes=processing_notes,
            )

        except Exception as e:
            raise ProcessingError(f"Error processing insulin data: {str(e)}") from e

_extract_meta_info(meta_value: str) -> Tuple[bool, bool, Optional[str]]

Extract insulin type information from meta JSON.

Returns:

Type Description
Tuple[bool, bool, Optional[str]]

Tuple of (is_bolus, is_basal, insulin_type)

Source code in src/processors/insulin.py
def _extract_meta_info(self, meta_value: str) -> Tuple[bool, bool, Optional[str]]:
    """Extract insulin type information from meta JSON.

    Returns:
        Tuple of (is_bolus, is_basal, insulin_type)
    """
    try:
        meta_data = json.loads(meta_value)
        if meta_data and isinstance(meta_data, list):
            insulin = meta_data[0].get("insulin", "").lower()
            if "novorapid" in insulin:
                return True, False, "novorapid"
            if "levemir" in insulin:
                return False, True, "levemir"
    except (json.JSONDecodeError, IndexError, KeyError, AttributeError):
        pass

    return False, False, None

process_type(columns: List[ColumnData], bolus_limit: float = 8.0, max_limit: float = 15.0) -> ProcessedTypeData

Process insulin data and classify doses.

Parameters:

Name Type Description Default
columns List[ColumnData]

List of ColumnData containing insulin data and metadata

required
bolus_limit float

Maximum insulin units to classify as bolus

8.0
max_limit float

Maximum valid insulin dose

15.0
Source code in src/processors/insulin.py
def process_type(
    self,
    columns: List[ColumnData],
    bolus_limit: float = 8.0,
    max_limit: float = 15.0,
) -> ProcessedTypeData:
    """Process insulin data and classify doses.

    Args:
        columns: List of ColumnData containing insulin data and metadata
        bolus_limit: Maximum insulin units to classify as bolus
        max_limit: Maximum valid insulin dose
    """
    processing_notes = []

    try:
        # Find insulin dose and meta columns
        dose_cols = [col for col in columns if col.data_type == DataType.INSULIN]
        meta_cols = [
            col for col in columns if col.data_type == DataType.INSULIN_META
        ]

        if not any(col.is_primary for col in dose_cols):
            raise ProcessingError("No primary insulin dose column found")

        # Initialize result DataFrame with dose data
        result_df = pd.DataFrame()
        result_units = {}

        # Process dose data first
        for col in dose_cols:
            df = col.dataframe.copy()

            # Keep only positive doses
            valid_mask = df["value"] > 0.0
            df = df[valid_mask]

            if len(df) > 0:
                processing_notes.append(f"Found {len(df)} positive doses")

                # Add dose column
                result_df["dose"] = df["value"]
                result_units["dose"] = col.unit

                # Initial classification based on dose
                result_df["is_bolus"] = df["value"] <= bolus_limit
                result_df["is_basal"] = (df["value"] > bolus_limit) & (
                    df["value"] <= max_limit
                )
                result_df["type"] = ""  # Will be filled by metadata if available

                # Track classification stats
                processing_notes.extend(
                    [
                        "Initial dose-based classification:",
                        f"- {result_df['is_bolus'].sum()} doses classified as bolus (≤{bolus_limit}U)",
                        f"- {result_df['is_basal'].sum()} doses classified as basal (>{bolus_limit}U)",
                        f"- Dropped {(df['value'] > max_limit).sum()} doses exceeding {max_limit}U",
                    ]
                )

        # Update classification with metadata if available
        if meta_cols and not result_df.empty:
            meta_updates = 0
            for col in meta_cols:
                for idx, meta_value in col.dataframe["value"].items():
                    if idx in result_df.index:
                        is_bolus, is_basal, insulin_type = self._extract_meta_info(
                            meta_value
                        )
                        if insulin_type:
                            result_df.loc[idx, "is_bolus"] = is_bolus
                            result_df.loc[idx, "is_basal"] = is_basal
                            result_df.loc[idx, "type"] = insulin_type
                            meta_updates += 1

            if meta_updates > 0:
                processing_notes.append(
                    f"Updated {meta_updates} classifications using metadata"
                )

        # Final stats
        processing_notes.append(
            f"Final dataset contains {len(result_df)} insulin records"
        )

        return ProcessedTypeData(
            dataframe=result_df,
            source_units=result_units,
            processing_notes=processing_notes,
        )

    except Exception as e:
        raise ProcessingError(f"Error processing insulin data: {str(e)}") from e

NotesProcessor

Bases: BaseTypeProcessor

Processes text notes/comments data.

Source code in src/processors/notes.py
@DataProcessor.register_processor(DataType.NOTES)
class NotesProcessor(BaseTypeProcessor):
    """Processes text notes/comments data."""

    def process_type(
        self,
        columns: List[ColumnData],
    ) -> ProcessedTypeData:
        """Process notes data ensuring safe string storage.

        Args:
            columns: List of ColumnData containing notes columns

        Returns:
            ProcessedTypeData containing processed notes
        """
        processing_notes = []

        try:
            # Validate we have at least one primary column
            if not any(col.is_primary for col in columns):
                raise ProcessingError("No primary notes column found")

            # Sort columns to ensure primary first
            sorted_columns = sorted(columns, key=lambda x: (not x.is_primary))

            # Initialize result dataframe
            result_df = pd.DataFrame(index=pd.DatetimeIndex([]))

            # Process each column
            for idx, col_data in enumerate(sorted_columns):
                # Generate column name
                col_name = self._generate_column_name(
                    DataType.NOTES, col_data.is_primary, idx
                )

                # Get the notes series
                notes_series = col_data.dataframe["value"]

                # Replace None with pd.NA for better handling
                notes_series = notes_series.replace([None], pd.NA)

                # Convert non-NA values to string and strip whitespace
                notes_series = notes_series.apply(
                    lambda x: x.strip() if pd.notna(x) else pd.NA
                )

                # Remove empty strings
                notes_series = notes_series.replace({"": pd.NA})

                # Add to result DataFrame only if we have any valid notes
                if not notes_series.isna().all():
                    result_df[col_name] = notes_series

                    # Track stats
                    valid_notes = notes_series.notna()
                    processing_notes.append(
                        f"Column {col_name}: found {valid_notes.sum()} valid notes"
                    )

            # Drop rows where all values are NA
            if not result_df.empty:
                result_df = result_df.dropna(how="all")

            processing_notes.append(
                f"Final dataset contains {len(result_df)} notes entries"
            )

            return ProcessedTypeData(
                dataframe=result_df,
                source_units={},  # No units for text data
                processing_notes=processing_notes,
            )

        except Exception as e:
            raise ProcessingError(f"Error processing notes data: {str(e)}") from e

process_type(columns: List[ColumnData]) -> ProcessedTypeData

Process notes data ensuring safe string storage.

Parameters:

Name Type Description Default
columns List[ColumnData]

List of ColumnData containing notes columns

required

Returns:

Type Description
ProcessedTypeData

ProcessedTypeData containing processed notes

Source code in src/processors/notes.py
def process_type(
    self,
    columns: List[ColumnData],
) -> ProcessedTypeData:
    """Process notes data ensuring safe string storage.

    Args:
        columns: List of ColumnData containing notes columns

    Returns:
        ProcessedTypeData containing processed notes
    """
    processing_notes = []

    try:
        # Validate we have at least one primary column
        if not any(col.is_primary for col in columns):
            raise ProcessingError("No primary notes column found")

        # Sort columns to ensure primary first
        sorted_columns = sorted(columns, key=lambda x: (not x.is_primary))

        # Initialize result dataframe
        result_df = pd.DataFrame(index=pd.DatetimeIndex([]))

        # Process each column
        for idx, col_data in enumerate(sorted_columns):
            # Generate column name
            col_name = self._generate_column_name(
                DataType.NOTES, col_data.is_primary, idx
            )

            # Get the notes series
            notes_series = col_data.dataframe["value"]

            # Replace None with pd.NA for better handling
            notes_series = notes_series.replace([None], pd.NA)

            # Convert non-NA values to string and strip whitespace
            notes_series = notes_series.apply(
                lambda x: x.strip() if pd.notna(x) else pd.NA
            )

            # Remove empty strings
            notes_series = notes_series.replace({"": pd.NA})

            # Add to result DataFrame only if we have any valid notes
            if not notes_series.isna().all():
                result_df[col_name] = notes_series

                # Track stats
                valid_notes = notes_series.notna()
                processing_notes.append(
                    f"Column {col_name}: found {valid_notes.sum()} valid notes"
                )

        # Drop rows where all values are NA
        if not result_df.empty:
            result_df = result_df.dropna(how="all")

        processing_notes.append(
            f"Final dataset contains {len(result_df)} notes entries"
        )

        return ProcessedTypeData(
            dataframe=result_df,
            source_units={},  # No units for text data
            processing_notes=processing_notes,
        )

    except Exception as e:
        raise ProcessingError(f"Error processing notes data: {str(e)}") from e