class Aligner:
"""Aligns diabetes data to a reference timeline."""
def _validate_timeline(self, reference_data: pd.DataFrame, freq: str) -> None:
"""Validate that reference data provides a valid timeline."""
if reference_data.empty:
raise AlignmentError("Reference data is empty")
if not isinstance(reference_data.index, pd.DatetimeIndex):
raise AlignmentError("Reference data must have DatetimeIndex")
if not reference_data.index.is_monotonic_increasing:
raise AlignmentError(
"Reference data index must be monotonically increasing"
)
# Check if the data mostly follows the expected frequency
time_diffs = reference_data.index.to_series().diff()
modal_diff = time_diffs.mode()[0]
expected_diff = pd.Timedelta(freq)
if modal_diff != expected_diff:
raise AlignmentError(
f"Reference data frequency {modal_diff} does not match expected {freq}"
)
def _collect_processing_notes(
self, processed_data: Dict[DataType, ProcessedTypeData]
) -> List[str]:
"""Collect processing notes from all data types."""
all_notes = []
for data_type, data in processed_data.items():
all_notes.extend([f"{data_type.name} Processing Notes:"])
all_notes.extend([f" {note}" for note in data.processing_notes])
return all_notes
def _align_bgm(
self, df: pd.DataFrame, reference_index: pd.DatetimeIndex, freq: str
) -> pd.DataFrame:
"""Align blood glucose meter data.
Args:
df: DataFrame containing BGM data
reference_index: Reference timeline to align to
freq: Frequency for alignment
Returns:
DataFrame aligned to reference timeline with averaged values
"""
df = df.copy()
# If index is empty or not DatetimeIndex, raise
if df.empty or not isinstance(df.index, pd.DatetimeIndex):
raise AlignmentError("Input DataFrame is empty or index is not datetime")
df.index = df.index.round(freq)
value_cols = [
col
for col in df.columns
if not col.endswith("_clipped") and not col.endswith("_mmol")
]
if not value_cols:
return pd.DataFrame(index=reference_index) # nothing to align
clipped_cols = [f"{col}_clipped" for col in value_cols]
mmol_cols = [f"{col}_mmol" for col in value_cols]
result = pd.DataFrame(index=reference_index)
for value_col, clipped_col, mmol_col in zip(
value_cols, clipped_cols, mmol_cols
):
# If any column is missing, raise error
if clipped_col not in df.columns or mmol_col not in df.columns:
raise AlignmentError(f"Missing expected column(s) for {value_col}")
values = df[value_col].resample(freq).mean()
clipped = df[clipped_col].resample(freq).apply(lambda x: x.any())
mmol_values = df[mmol_col].resample(freq).mean()
result[value_col] = values
result[clipped_col] = clipped
result[mmol_col] = mmol_values
return result
def _align_insulin(
self, df: pd.DataFrame, reference_index: pd.DatetimeIndex, freq: str
) -> pd.DataFrame:
"""Align insulin data.
Args:
df: DataFrame containing insulin data with columns: dose, is_basal, is_bolus
reference_index: Reference timeline to align to
freq: Frequency for alignment
Returns:
DataFrame aligned to reference timeline with summed basal and bolus doses
Raises:
AlignmentError: If DataFrame is empty, index is not datetime, or required columns are missing
"""
df = df.copy()
# Validate input DataFrame
if df.empty or not isinstance(df.index, pd.DatetimeIndex):
raise AlignmentError("Input DataFrame is empty or index is not datetime")
# Check for required columns
required_columns = ["dose", "is_basal", "is_bolus"]
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
raise AlignmentError(
f"Missing required column(s): {', '.join(missing_columns)}"
)
# Round timestamps to frequency
df.index = df.index.round(freq)
# Create separate columns for basal and bolus doses
df["basal_dose"] = df["dose"].where(df["is_basal"], 0)
df["bolus_dose"] = df["dose"].where(df["is_bolus"], 0)
# Resample and sum both dose types
result = df[["basal_dose", "bolus_dose"]].resample(freq).sum()
return result.reindex(reference_index).fillna(0)
def _align_carbs(
self, df: pd.DataFrame, reference_index: pd.DatetimeIndex, freq: str
) -> pd.DataFrame:
"""Align carbohydrate data.
Args:
df: DataFrame containing carbohydrate data with column: carbs_primary
reference_index: Reference timeline to align to
freq: Frequency for alignment
Returns:
DataFrame aligned to reference timeline with summed carbohydrate values
Raises:
AlignmentError: If DataFrame is empty, index is not datetime, or required column is missing
"""
df = df.copy()
# Validate input DataFrame
if df.empty or not isinstance(df.index, pd.DatetimeIndex):
raise AlignmentError("Input DataFrame is empty or index is not datetime")
# Check for required column
if "carbs_primary" not in df.columns:
raise AlignmentError("Missing required column: carbs_primary")
# Round timestamps to frequency
df.index = df.index.round(freq)
# Resample and sum carbohydrate values
result = df["carbs_primary"].resample(freq).sum()
return (
pd.DataFrame({"carbs_primary": result}).reindex(reference_index).fillna(0)
)
def _align_notes(
self, df: pd.DataFrame, reference_index: pd.DatetimeIndex, freq: str
) -> pd.DataFrame:
"""Align notes data.
Args:
df: DataFrame containing notes data with column: notes_primary
reference_index: Reference timeline to align to
freq: Frequency for alignment
Returns:
DataFrame aligned to reference timeline with last note in each window.
Empty windows will have NaN (not filled with any default value).
Raises:
AlignmentError: If DataFrame is empty, index is not datetime, or required column is missing
"""
df = df.copy()
# Validate DataFrame structure first
if not isinstance(df.index, pd.DatetimeIndex):
raise AlignmentError("Input DataFrame is empty or index is not datetime")
if df.empty:
raise AlignmentError("Input DataFrame is empty or index is not datetime")
# Check for required column
if "notes_primary" not in df.columns:
raise AlignmentError("Missing required column: notes_primary")
# Round timestamps to frequency
df.index = df.index.round(freq)
# Resample and take last note in each window
result = df["notes_primary"].resample(freq).last()
# Reindex to reference timeline (NaN for empty windows)
return pd.DataFrame({"notes_primary": result}).reindex(reference_index)
def align(
self,
processed_data: Dict[DataType, ProcessedTypeData],
reference_df: pd.DataFrame = None,
freq: str = "5min",
) -> AlignmentResult:
"""Align all data to a reference timeline.
Args:
processed_data: Dictionary of processed data by DataType
reference_df: DataFrame to use as reference timeline. If None, uses CGM data.
freq: Expected frequency of data
Returns:
AlignmentResult containing aligned data and metadata
"""
# Get reference DataFrame (default to CGM if not specified)
if reference_df is None:
cgm_data = processed_data.get(DataType.CGM)
if not cgm_data or cgm_data.dataframe.empty:
raise AlignmentError("No CGM data available for alignment")
reference_df = cgm_data.dataframe
# Validate timeline
self._validate_timeline(reference_df, freq)
reference_index = reference_df.index
# Track alignment process
processing_notes = []
aligned_dfs = []
# Change 'missing' column to clearer name for combined data
reference_df.rename(columns={"missing": "missing_cgm"}, inplace=True)
# Always include reference data first
aligned_dfs.append(reference_df)
processing_notes.append("Reference timeline established")
# Define alignment methods for each data type
type_methods = {
DataType.BGM: self._align_bgm,
DataType.INSULIN: self._align_insulin,
DataType.CARBS: self._align_carbs,
DataType.NOTES: self._align_notes,
}
# Align other available data
for data_type, processed in processed_data.items():
if processed.dataframe is not reference_df: # Skip reference data
try:
align_method = type_methods.get(data_type)
if align_method:
aligned_df = align_method(
processed.dataframe, reference_index, freq
)
aligned_dfs.append(aligned_df)
processing_notes.append(
f"Successfully aligned {data_type.name} data"
)
except AlignmentError as e:
logger.error("Error aligning %s: %s", data_type.name, str(e))
processing_notes.append(
f"Failed to align {data_type.name}: {str(e)}"
)
# Combine all aligned data
combined_df = pd.concat(aligned_dfs, axis=1)
return AlignmentResult(
dataframe=combined_df,
start_time=reference_index[0],
end_time=reference_index[-1],
frequency=freq,
processing_notes=[
*self._collect_processing_notes(processed_data),
*processing_notes,
],
source_units={
col: unit
for data in processed_data.values()
for col, unit in data.source_units.items()
},
)