Skip to content

Readers API

src.readers

Reader initialization and registration.

__all__ = ['BaseReader', 'SQLiteReader', 'CSVReader', 'XMLReader'] module-attribute

BaseReader

Bases: ABC

Abstract base class for all file format readers.

This class provides core functionality for reading diabetes device data files and automatic reader selection based on file types. It handles timestamp processing, data validation, and resource management.

Source code in src/readers/base.py
class BaseReader(ABC):
    """Abstract base class for all file format readers.

    This class provides core functionality for reading diabetes device data files
    and automatic reader selection based on file types. It handles timestamp processing,
    data validation, and resource management.
    """

    _readers: Dict[FileType, Type["BaseReader"]] = {}

    @classmethod
    def register(cls, file_type: FileType) -> Callable[[Type[T]], Type[T]]:
        """Register a reader class for a specific file type.

        Args:
            file_type: FileType enum value to associate with the reader

        Returns:
            Callable: Decorator function that registers the reader class
        """

        def wrapper(reader_cls: Type[T]) -> Type[T]:
            cls._readers[file_type] = reader_cls
            return reader_cls

        return wrapper

    @classmethod
    def get_reader_for_format(cls, fmt: DeviceFormat, file_path: Path) -> "BaseReader":
        """Get appropriate reader instance for the detected format.

        Args:
            fmt: Detected device format specification
            file_path: Path to the data file

        Returns:
            Instance of appropriate reader class

        Raises:
            ReaderError: If no reader is registered for the file type
        """
        for file_config in fmt.files:
            if Path(file_path).match(file_config.name_pattern):
                reader_cls = cls._readers.get(file_config.file_type)
                if reader_cls is None:
                    raise ReaderError(
                        f"No reader registered for file type: {file_config.file_type.value}"
                    )
                return reader_cls(file_path, file_config)

        raise ReaderError(f"No matching file configuration found for {file_path}")

    def __init__(self, path: Path, file_config: FileConfig):
        """initialise reader with file path and configuration.

        Args:
            path: Path to the data file
            file_config: Configuration for the file format

        Raises:
            ValueError: If file does not exist
        """
        if not path.exists():
            raise ValueError(f"File not found: {path}")

        self.file_path = path
        self.file_config = file_config

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Cleanup resources if needed."""
        self._cleanup()

    def _cleanup(self):
        """Override this method in derived classes if cleanup is needed."""

    @abstractmethod
    def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
        """Read and process a single table according to its structure.

        This method must be implemented by each specific reader.
        """

    def read_all_tables(self) -> Dict[str, TableData]:
        """Read and process all tables defined in the file configuration."""
        results = {}
        for table_config in self.file_config.tables:
            table_data = self.read_table(table_config)
            if table_data is not None:
                if table_data.missing_required_columns:
                    logger.debug(
                        "Table %s missing required data in columns: %s",
                        table_data.name,
                        table_data.missing_required_columns,
                    )
                results[table_data.name] = table_data
            else:
                logger.error("Failed to process table: %s", table_config.name)

        return results

    def detect_timestamp_format(self, series: pd.Series) -> Tuple[TimestampType, dict]:
        """Detect timestamp format using a small deterministic heuristic.

        Strategy (simple & fast):

        - sample up to 50 non-null values
        - detect numeric epochs first
        - look for obvious ISO-like markers (T, Z, timezone offsets)
        - for hyphen/slash date starts try explicit day-first formats if day>12 found
        - attempt a compact list of explicit formats with a high-acceptance threshold
        - fallback to pandas inference (utc=True) if explicit formats fail

        Returns (TimestampType, parse_kwargs) where parse_kwargs is suitable for
        passing to pd.to_datetime (e.g., {'format': ..., 'utc': True} or {'dayfirst': True,'utc':True}).
        """
        try:
            sample = series.dropna().astype(str).str.strip().head(50)
            if sample.empty:
                logger.warning("No non-null timestamps found in sample")
                return TimestampType.UNKNOWN, {}

            # numeric epochs
            numeric = pd.to_numeric(sample, errors="coerce")
            if numeric.notna().any():
                nums = numeric.dropna().astype(float)
                if (nums > 1e8).all() and (nums < 1e12).all():
                    return TimestampType.UNIX_SECONDS, {"unit": "s", "utc": True}
                if (nums > 1e11).all() and (nums < 1e15).all():
                    return TimestampType.UNIX_MILLISECONDS, {"unit": "ms", "utc": True}

            # quick ISO-like heuristic
            joined = " ".join(sample.head(10).tolist()).upper()
            if "T" in joined and ("Z" in joined or "+" in joined):
                return TimestampType.ISO_8601, {"utc": True}

            # Check for hyphen/slash date leading pattern (d/m/Y or m/d/Y)
            dayfirst_candidate = False
            sep_match = sample.str.match(r"^\s*\d{1,2}[-/]\d{1,2}[-/]\d{2,4}")
            if sep_match.any():
                # If any day > 12 then dayfirst is almost certainly correct
                for v in sample:
                    m = re.match(r"^(\d{1,2})[-/](\d{1,2})[-/](\d{2,4})", v)
                    if m:
                        day = int(m.group(1))
                        if day > 12:
                            dayfirst_candidate = True
                            break

            # Small explicit formats list (keep compact)
            explicit = [
                "%d-%m-%Y %H:%M",
                "%d/%m/%Y %H:%M",
                "%Y-%m-%d %H:%M:%S",
                "%Y-%m-%d %H:%M",
                "%d-%m-%Y %H:%M:%S",
                "%m/%d/%Y %H:%M",
                "%Y-%m-%dT%H:%M:%S%z",
                "%Y-%m-%dT%H:%M:%SZ",
                "%d-%m-%Y %I:%M %p",
            ]

            sample_norm = sample.str.replace(r"\s+UTC$|\s+GMT$", "", regex=True)
            sample_norm = sample_norm.str.replace(r"\s+", " ", regex=True)

            # If dayfirst is obvious, try day-first explicit formats first
            if dayfirst_candidate:
                for fmt in ["%d-%m-%Y %H:%M", "%d/%m/%Y %H:%M", "%d-%m-%Y %H:%M:%S"]:
                    parsed = pd.to_datetime(
                        sample_norm, format=fmt, errors="coerce", utc=True
                    )
                    if parsed.notna().mean() >= 0.95:
                        return TimestampType.ISO_8601, {"format": fmt, "utc": True}

            # Try compact explicit list
            for fmt in explicit:
                parsed = pd.to_datetime(
                    sample_norm, format=fmt, errors="coerce", utc=True
                )
                if parsed.notna().mean() >= 0.95:
                    return TimestampType.ISO_8601, {"format": fmt, "utc": True}

            # If we saw a hyphen/slash pattern but not decisive, prefer dayfirst inference
            if sep_match.any() and not dayfirst_candidate:
                parsed_dayfirst = pd.to_datetime(
                    sample, dayfirst=True, utc=True, errors="coerce"
                )
                if parsed_dayfirst.notna().mean() >= 0.9:
                    return TimestampType.ISO_8601, {"dayfirst": True, "utc": True}

            # Last resort: pandas inference
            with warnings.catch_warnings():
                warnings.filterwarnings(
                    "ignore",
                    message=(
                        "Could not infer format, so each element will be parsed individually, falling back to `dateutil`"
                    ),
                )
                inferred = pd.to_datetime(sample, utc=True, errors="coerce")
            if inferred.notna().mean() >= 0.9:
                return TimestampType.ISO_8601, {"utc": True}

            return TimestampType.UNKNOWN, {}

        except TimestampProcessingError as e:
            logger.error("Error during timestamp detection: %s", e)
            return TimestampType.UNKNOWN, {}

    def _convert_timestamp_to_utc(
        self, df: pd.DataFrame, timestamp_column: str
    ) -> Tuple[pd.DataFrame, TimestampType]:
        """Convert timestamp column to UTC datetime and set as index."""
        fmt, parse_kwargs = self.detect_timestamp_format(df[timestamp_column])

        if fmt == TimestampType.UNKNOWN:
            raise TimestampProcessingError(
                f"Could not detect timestamp format for column {timestamp_column}"
            )

        try:
            # Epoch handling
            if fmt in (TimestampType.UNIX_SECONDS, TimestampType.UNIX_MILLISECONDS):
                unit = parse_kwargs.get("unit", "s")
                df[timestamp_column] = pd.to_datetime(
                    df[timestamp_column], unit=unit, utc=True
                )
            elif fmt == TimestampType.ISO_8601:
                # If an explicit format was provided, try it first but with coercion
                if "format" in parse_kwargs:
                    parsed = pd.to_datetime(
                        df[timestamp_column].astype(str),
                        format=parse_kwargs["format"],
                        errors="coerce",
                        utc=True,
                    )
                    success = parsed.notna().mean()
                    if success >= 0.9:
                        df[timestamp_column] = parsed
                    else:
                        # fallback to pandas with any provided flags (dayfirst/utc)
                        kwargs = {
                            k: v for k, v in parse_kwargs.items() if k != "format"
                        }
                        df[timestamp_column] = pd.to_datetime(
                            df[timestamp_column].astype(str), errors="coerce", **kwargs
                        )
                else:
                    # use parse kwargs (e.g., dayfirst) or generic inference
                    df[timestamp_column] = pd.to_datetime(
                        df[timestamp_column].astype(str),
                        errors="coerce",
                        **parse_kwargs,
                    )

            # Ensure timezone-awareness and set index
            if df[timestamp_column].dt.tz is None:
                df[timestamp_column] = df[timestamp_column].dt.tz_localize("UTC")

            return df.set_index(timestamp_column).sort_index(), fmt

        except Exception as e:
            logger.error("Error converting timestamps: %s", e)
            raise TimestampProcessingError(
                f"Invalid timestamp column: {timestamp_column}"
            ) from e

    def _validate_required_data(
        self, df: pd.DataFrame, columns: List[ColumnMapping]
    ) -> List[str]:
        """Check for missing data in required columns."""
        missing_required = []
        for col in columns:
            if (
                col.requirement == ColumnRequirement.REQUIRED_WITH_DATA
                and col.source_name in df.columns
                and df[col.source_name].isna().all()
            ):
                missing_required.append(col.source_name)
        return missing_required

    @staticmethod
    def _validate_identifier(identifier: str) -> bool:
        """Validate that an identifier only contains safe characters."""
        return all(c.isalnum() or c in ["_", "."] for c in identifier)

file_path = path instance-attribute

file_config = file_config instance-attribute

register(file_type: FileType) -> Callable[[Type[T]], Type[T]] classmethod

Register a reader class for a specific file type.

Parameters:

Name Type Description Default
file_type FileType

FileType enum value to associate with the reader

required

Returns:

Name Type Description
Callable Callable[[Type[T]], Type[T]]

Decorator function that registers the reader class

Source code in src/readers/base.py
@classmethod
def register(cls, file_type: FileType) -> Callable[[Type[T]], Type[T]]:
    """Register a reader class for a specific file type.

    Args:
        file_type: FileType enum value to associate with the reader

    Returns:
        Callable: Decorator function that registers the reader class
    """

    def wrapper(reader_cls: Type[T]) -> Type[T]:
        cls._readers[file_type] = reader_cls
        return reader_cls

    return wrapper

get_reader_for_format(fmt: DeviceFormat, file_path: Path) -> BaseReader classmethod

Get appropriate reader instance for the detected format.

Parameters:

Name Type Description Default
fmt DeviceFormat

Detected device format specification

required
file_path Path

Path to the data file

required

Returns:

Type Description
BaseReader

Instance of appropriate reader class

Raises:

Type Description
ReaderError

If no reader is registered for the file type

Source code in src/readers/base.py
@classmethod
def get_reader_for_format(cls, fmt: DeviceFormat, file_path: Path) -> "BaseReader":
    """Get appropriate reader instance for the detected format.

    Args:
        fmt: Detected device format specification
        file_path: Path to the data file

    Returns:
        Instance of appropriate reader class

    Raises:
        ReaderError: If no reader is registered for the file type
    """
    for file_config in fmt.files:
        if Path(file_path).match(file_config.name_pattern):
            reader_cls = cls._readers.get(file_config.file_type)
            if reader_cls is None:
                raise ReaderError(
                    f"No reader registered for file type: {file_config.file_type.value}"
                )
            return reader_cls(file_path, file_config)

    raise ReaderError(f"No matching file configuration found for {file_path}")

__init__(path: Path, file_config: FileConfig)

initialise reader with file path and configuration.

Parameters:

Name Type Description Default
path Path

Path to the data file

required
file_config FileConfig

Configuration for the file format

required

Raises:

Type Description
ValueError

If file does not exist

Source code in src/readers/base.py
def __init__(self, path: Path, file_config: FileConfig):
    """initialise reader with file path and configuration.

    Args:
        path: Path to the data file
        file_config: Configuration for the file format

    Raises:
        ValueError: If file does not exist
    """
    if not path.exists():
        raise ValueError(f"File not found: {path}")

    self.file_path = path
    self.file_config = file_config

__enter__()

Context manager entry.

Source code in src/readers/base.py
def __enter__(self):
    """Context manager entry."""
    return self

__exit__(exc_type, exc_val, exc_tb)

Cleanup resources if needed.

Source code in src/readers/base.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Cleanup resources if needed."""
    self._cleanup()

read_table(table_structure: TableStructure) -> Optional[TableData] abstractmethod

Read and process a single table according to its structure.

This method must be implemented by each specific reader.

Source code in src/readers/base.py
@abstractmethod
def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
    """Read and process a single table according to its structure.

    This method must be implemented by each specific reader.
    """

read_all_tables() -> Dict[str, TableData]

Read and process all tables defined in the file configuration.

Source code in src/readers/base.py
def read_all_tables(self) -> Dict[str, TableData]:
    """Read and process all tables defined in the file configuration."""
    results = {}
    for table_config in self.file_config.tables:
        table_data = self.read_table(table_config)
        if table_data is not None:
            if table_data.missing_required_columns:
                logger.debug(
                    "Table %s missing required data in columns: %s",
                    table_data.name,
                    table_data.missing_required_columns,
                )
            results[table_data.name] = table_data
        else:
            logger.error("Failed to process table: %s", table_config.name)

    return results

detect_timestamp_format(series: pd.Series) -> Tuple[TimestampType, dict]

Detect timestamp format using a small deterministic heuristic.

Strategy (simple & fast):

  • sample up to 50 non-null values
  • detect numeric epochs first
  • look for obvious ISO-like markers (T, Z, timezone offsets)
  • for hyphen/slash date starts try explicit day-first formats if day>12 found
  • attempt a compact list of explicit formats with a high-acceptance threshold
  • fallback to pandas inference (utc=True) if explicit formats fail

Returns (TimestampType, parse_kwargs) where parse_kwargs is suitable for passing to pd.to_datetime (e.g., {'format': ..., 'utc': True} or {'dayfirst': True,'utc':True}).

Source code in src/readers/base.py
def detect_timestamp_format(self, series: pd.Series) -> Tuple[TimestampType, dict]:
    """Detect timestamp format using a small deterministic heuristic.

    Strategy (simple & fast):

    - sample up to 50 non-null values
    - detect numeric epochs first
    - look for obvious ISO-like markers (T, Z, timezone offsets)
    - for hyphen/slash date starts try explicit day-first formats if day>12 found
    - attempt a compact list of explicit formats with a high-acceptance threshold
    - fallback to pandas inference (utc=True) if explicit formats fail

    Returns (TimestampType, parse_kwargs) where parse_kwargs is suitable for
    passing to pd.to_datetime (e.g., {'format': ..., 'utc': True} or {'dayfirst': True,'utc':True}).
    """
    try:
        sample = series.dropna().astype(str).str.strip().head(50)
        if sample.empty:
            logger.warning("No non-null timestamps found in sample")
            return TimestampType.UNKNOWN, {}

        # numeric epochs
        numeric = pd.to_numeric(sample, errors="coerce")
        if numeric.notna().any():
            nums = numeric.dropna().astype(float)
            if (nums > 1e8).all() and (nums < 1e12).all():
                return TimestampType.UNIX_SECONDS, {"unit": "s", "utc": True}
            if (nums > 1e11).all() and (nums < 1e15).all():
                return TimestampType.UNIX_MILLISECONDS, {"unit": "ms", "utc": True}

        # quick ISO-like heuristic
        joined = " ".join(sample.head(10).tolist()).upper()
        if "T" in joined and ("Z" in joined or "+" in joined):
            return TimestampType.ISO_8601, {"utc": True}

        # Check for hyphen/slash date leading pattern (d/m/Y or m/d/Y)
        dayfirst_candidate = False
        sep_match = sample.str.match(r"^\s*\d{1,2}[-/]\d{1,2}[-/]\d{2,4}")
        if sep_match.any():
            # If any day > 12 then dayfirst is almost certainly correct
            for v in sample:
                m = re.match(r"^(\d{1,2})[-/](\d{1,2})[-/](\d{2,4})", v)
                if m:
                    day = int(m.group(1))
                    if day > 12:
                        dayfirst_candidate = True
                        break

        # Small explicit formats list (keep compact)
        explicit = [
            "%d-%m-%Y %H:%M",
            "%d/%m/%Y %H:%M",
            "%Y-%m-%d %H:%M:%S",
            "%Y-%m-%d %H:%M",
            "%d-%m-%Y %H:%M:%S",
            "%m/%d/%Y %H:%M",
            "%Y-%m-%dT%H:%M:%S%z",
            "%Y-%m-%dT%H:%M:%SZ",
            "%d-%m-%Y %I:%M %p",
        ]

        sample_norm = sample.str.replace(r"\s+UTC$|\s+GMT$", "", regex=True)
        sample_norm = sample_norm.str.replace(r"\s+", " ", regex=True)

        # If dayfirst is obvious, try day-first explicit formats first
        if dayfirst_candidate:
            for fmt in ["%d-%m-%Y %H:%M", "%d/%m/%Y %H:%M", "%d-%m-%Y %H:%M:%S"]:
                parsed = pd.to_datetime(
                    sample_norm, format=fmt, errors="coerce", utc=True
                )
                if parsed.notna().mean() >= 0.95:
                    return TimestampType.ISO_8601, {"format": fmt, "utc": True}

        # Try compact explicit list
        for fmt in explicit:
            parsed = pd.to_datetime(
                sample_norm, format=fmt, errors="coerce", utc=True
            )
            if parsed.notna().mean() >= 0.95:
                return TimestampType.ISO_8601, {"format": fmt, "utc": True}

        # If we saw a hyphen/slash pattern but not decisive, prefer dayfirst inference
        if sep_match.any() and not dayfirst_candidate:
            parsed_dayfirst = pd.to_datetime(
                sample, dayfirst=True, utc=True, errors="coerce"
            )
            if parsed_dayfirst.notna().mean() >= 0.9:
                return TimestampType.ISO_8601, {"dayfirst": True, "utc": True}

        # Last resort: pandas inference
        with warnings.catch_warnings():
            warnings.filterwarnings(
                "ignore",
                message=(
                    "Could not infer format, so each element will be parsed individually, falling back to `dateutil`"
                ),
            )
            inferred = pd.to_datetime(sample, utc=True, errors="coerce")
        if inferred.notna().mean() >= 0.9:
            return TimestampType.ISO_8601, {"utc": True}

        return TimestampType.UNKNOWN, {}

    except TimestampProcessingError as e:
        logger.error("Error during timestamp detection: %s", e)
        return TimestampType.UNKNOWN, {}

CSVReader

Bases: BaseReader

Reads and processes CSV files according to the provided format configuration.

Source code in src/readers/csv.py
@BaseReader.register(FileType.CSV)
class CSVReader(BaseReader):
    """Reads and processes CSV files according to the provided format configuration."""

    def __init__(self, path: Path, file_config: FileConfig):
        super().__init__(path, file_config)
        self._data = None

    def _cleanup(self):
        """Cleanup any held resources."""
        self._data = None

    def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
        """Read and process a single table according to its structure.

        For CSV files, we treat each file as a single table, reading all data at once
        and caching it for subsequent operations if needed.
        """
        try:
            # Read data if not already cached
            if self._data is None:
                try:
                    # If the file config/table declares a header_row, use it to skip preamble
                    header_row = None
                    if self.file_config and self.file_config.tables:
                        header_row = getattr(
                            self.file_config.tables[0], "header_row", None
                        )

                    if header_row is None:
                        self._data = pd.read_csv(
                            self.file_path,
                            encoding="utf-8",
                            low_memory=False,  # Prevent mixed type inference warnings
                        )
                    else:
                        # skip rows up to header_row so that header_row becomes the header (first read line)
                        self._data = pd.read_csv(
                            self.file_path,
                            encoding="utf-8",
                            low_memory=False,
                            header=0,
                            skiprows=range(header_row),
                        )

                    # Normalize column names: strip whitespace from headers
                    self._data.columns = [
                        c.strip() if isinstance(c, str) else c
                        for c in self._data.columns
                    ]
                except Exception as e:
                    raise FileAccessError(f"Failed to read CSV file: {e}") from e

            if self._data.empty:
                raise DataExistsError(f"No data found in CSV file {self.file_path}")

            # Get required columns
            columns_to_read = [
                col.source_name
                for col in table_structure.columns
                if col.requirement != ColumnRequirement.CONFIRMATION_ONLY
            ]
            columns_to_read.append(table_structure.timestamp_column)

            # Check for missing columns
            missing_columns = [
                col for col in columns_to_read if col not in self._data.columns
            ]
            if missing_columns:
                logger.error(
                    "Required columns missing from CSV: %s", ", ".join(missing_columns)
                )
                return None

            # Select only needed columns and make a copy
            df = self._data[columns_to_read].copy()

            # Process timestamps
            df, fmt = self._convert_timestamp_to_utc(
                df, table_structure.timestamp_column
            )

            # Validate required data
            missing_required = self._validate_required_data(df, table_structure.columns)

            return TableData(
                name=table_structure.name,
                dataframe=df,
                missing_required_columns=missing_required,
                timestamp_type=fmt,
            )

        except DataValidationError as e:
            logger.error("Validation error: %s", e)
            return None
        except DataExistsError as e:
            logger.error("No data error: %s", e)
            return None
        except DataProcessingError as e:
            logger.error("Processing error: %s", e)
            return None
        except ProcessingError as e:
            logger.error("Unexpected error processing CSV: %s", e)
            return None

__init__(path: Path, file_config: FileConfig)

Source code in src/readers/csv.py
def __init__(self, path: Path, file_config: FileConfig):
    super().__init__(path, file_config)
    self._data = None

read_table(table_structure: TableStructure) -> Optional[TableData]

Read and process a single table according to its structure.

For CSV files, we treat each file as a single table, reading all data at once and caching it for subsequent operations if needed.

Source code in src/readers/csv.py
def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
    """Read and process a single table according to its structure.

    For CSV files, we treat each file as a single table, reading all data at once
    and caching it for subsequent operations if needed.
    """
    try:
        # Read data if not already cached
        if self._data is None:
            try:
                # If the file config/table declares a header_row, use it to skip preamble
                header_row = None
                if self.file_config and self.file_config.tables:
                    header_row = getattr(
                        self.file_config.tables[0], "header_row", None
                    )

                if header_row is None:
                    self._data = pd.read_csv(
                        self.file_path,
                        encoding="utf-8",
                        low_memory=False,  # Prevent mixed type inference warnings
                    )
                else:
                    # skip rows up to header_row so that header_row becomes the header (first read line)
                    self._data = pd.read_csv(
                        self.file_path,
                        encoding="utf-8",
                        low_memory=False,
                        header=0,
                        skiprows=range(header_row),
                    )

                # Normalize column names: strip whitespace from headers
                self._data.columns = [
                    c.strip() if isinstance(c, str) else c
                    for c in self._data.columns
                ]
            except Exception as e:
                raise FileAccessError(f"Failed to read CSV file: {e}") from e

        if self._data.empty:
            raise DataExistsError(f"No data found in CSV file {self.file_path}")

        # Get required columns
        columns_to_read = [
            col.source_name
            for col in table_structure.columns
            if col.requirement != ColumnRequirement.CONFIRMATION_ONLY
        ]
        columns_to_read.append(table_structure.timestamp_column)

        # Check for missing columns
        missing_columns = [
            col for col in columns_to_read if col not in self._data.columns
        ]
        if missing_columns:
            logger.error(
                "Required columns missing from CSV: %s", ", ".join(missing_columns)
            )
            return None

        # Select only needed columns and make a copy
        df = self._data[columns_to_read].copy()

        # Process timestamps
        df, fmt = self._convert_timestamp_to_utc(
            df, table_structure.timestamp_column
        )

        # Validate required data
        missing_required = self._validate_required_data(df, table_structure.columns)

        return TableData(
            name=table_structure.name,
            dataframe=df,
            missing_required_columns=missing_required,
            timestamp_type=fmt,
        )

    except DataValidationError as e:
        logger.error("Validation error: %s", e)
        return None
    except DataExistsError as e:
        logger.error("No data error: %s", e)
        return None
    except DataProcessingError as e:
        logger.error("Processing error: %s", e)
        return None
    except ProcessingError as e:
        logger.error("Unexpected error processing CSV: %s", e)
        return None

SQLiteReader

Bases: BaseReader

Reads and processes SQLite files according to the provided format configuration.

Source code in src/readers/sqlite.py
@BaseReader.register(FileType.SQLITE)
class SQLiteReader(BaseReader):
    """Reads and processes SQLite files according to the provided format configuration."""

    def __init__(self, path: Path, file_config: FileConfig):
        super().__init__(path, file_config)
        self._engine = None

    @property
    def engine(self):
        """Lazy initialisation of database engine."""
        if self._engine is None:
            self._engine = create_engine(f"sqlite:///{self.file_path}")
        return self._engine

    def _cleanup(self):
        """Cleanup database connections."""
        if self._engine is not None:
            self._engine.dispose()
            self._engine = None

    def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
        """Read and process a single table according to its structure."""
        try:
            # Validate identifiers
            if not self._validate_identifier(table_structure.name):
                raise DataValidationError(f"Invalid table name: {table_structure.name}")

            # Read only needed columns
            columns_to_read = [
                col.source_name
                for col in table_structure.columns
                if col.requirement != ColumnRequirement.CONFIRMATION_ONLY
            ]
            columns_to_read.append(table_structure.timestamp_column)

            # Validate column names
            for col in columns_to_read:
                if not self._validate_identifier(col):
                    raise DataValidationError(f"Invalid column name: {col}")

            # Create query with quoted identifiers for SQLite
            quoted_columns = [f'"{col}"' for col in columns_to_read]
            query = text(
                f"""
                SELECT {', '.join(quoted_columns)}
                FROM "{table_structure.name}"
                ORDER BY "{table_structure.timestamp_column}"
            """
            )

            # Execute query within connection context
            with self.engine.connect() as conn:
                df = pd.read_sql_query(query, conn)

            # Process timestamps
            df, fmt = self._convert_timestamp_to_utc(
                df, table_structure.timestamp_column
            )

            # Validate required data
            missing_required = self._validate_required_data(df, table_structure.columns)

            return TableData(
                name=table_structure.name,
                dataframe=df,
                missing_required_columns=missing_required,
                timestamp_type=fmt,
            )

        except DataValidationError as e:
            # Handle specific ValueErrors such as invalid table or column names
            logger.error("ValueError: %s", e)
            return None
        except SQLAlchemyError as e:
            # Handle any database-related errors (e.g., connection, query execution)
            logger.error(
                "SQLAlchemyError processing table %s: %s", table_structure.name, e
            )
            return None
        except DataExistsError as e:
            # Handle case where there is no data in the result set
            logger.error(
                "EmptyDataError processing table %s: %s", table_structure.name, e
            )
            return None
        except ReaderError as e:
            # Catch any unexpected errors
            logger.error(
                "Unexpected error processing table %s: %s", table_structure.name, e
            )
            return None

engine property

Lazy initialisation of database engine.

__init__(path: Path, file_config: FileConfig)

Source code in src/readers/sqlite.py
def __init__(self, path: Path, file_config: FileConfig):
    super().__init__(path, file_config)
    self._engine = None

read_table(table_structure: TableStructure) -> Optional[TableData]

Read and process a single table according to its structure.

Source code in src/readers/sqlite.py
def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
    """Read and process a single table according to its structure."""
    try:
        # Validate identifiers
        if not self._validate_identifier(table_structure.name):
            raise DataValidationError(f"Invalid table name: {table_structure.name}")

        # Read only needed columns
        columns_to_read = [
            col.source_name
            for col in table_structure.columns
            if col.requirement != ColumnRequirement.CONFIRMATION_ONLY
        ]
        columns_to_read.append(table_structure.timestamp_column)

        # Validate column names
        for col in columns_to_read:
            if not self._validate_identifier(col):
                raise DataValidationError(f"Invalid column name: {col}")

        # Create query with quoted identifiers for SQLite
        quoted_columns = [f'"{col}"' for col in columns_to_read]
        query = text(
            f"""
            SELECT {', '.join(quoted_columns)}
            FROM "{table_structure.name}"
            ORDER BY "{table_structure.timestamp_column}"
        """
        )

        # Execute query within connection context
        with self.engine.connect() as conn:
            df = pd.read_sql_query(query, conn)

        # Process timestamps
        df, fmt = self._convert_timestamp_to_utc(
            df, table_structure.timestamp_column
        )

        # Validate required data
        missing_required = self._validate_required_data(df, table_structure.columns)

        return TableData(
            name=table_structure.name,
            dataframe=df,
            missing_required_columns=missing_required,
            timestamp_type=fmt,
        )

    except DataValidationError as e:
        # Handle specific ValueErrors such as invalid table or column names
        logger.error("ValueError: %s", e)
        return None
    except SQLAlchemyError as e:
        # Handle any database-related errors (e.g., connection, query execution)
        logger.error(
            "SQLAlchemyError processing table %s: %s", table_structure.name, e
        )
        return None
    except DataExistsError as e:
        # Handle case where there is no data in the result set
        logger.error(
            "EmptyDataError processing table %s: %s", table_structure.name, e
        )
        return None
    except ReaderError as e:
        # Catch any unexpected errors
        logger.error(
            "Unexpected error processing table %s: %s", table_structure.name, e
        )
        return None

XMLReader

Bases: BaseReader

Reads and processes XML files according to the provided format configuration.

Source code in src/readers/xml.py
@BaseReader.register(FileType.XML)
class XMLReader(BaseReader):
    """Reads and processes XML files according to the provided format configuration."""

    def __init__(self, path: Path, file_config: FileConfig):
        super().__init__(path, file_config)
        self._tree = None
        self._root = None

    def _cleanup(self):
        """Cleanup any held resources."""
        self._tree = None
        self._root = None

    def _init_xml(self):
        """initialise XML parsing if not already done."""
        if self._root is None:
            try:
                self._tree = ET.parse(self.file_path)
                self._root = self._tree.getroot()
            except ET.ParseError as e:
                raise DataExistsError(f"Failed to parse XML file: {e}") from e
            except Exception as e:
                raise DataExistsError(f"Error reading XML file: {e}") from e

    @staticmethod
    def _extract_value(element: ET.Element, column: str) -> str:
        """Extract value from XML element, checking both attributes and text.

        Args:
            element: XML element to extract from
            column: Column name to look for

        Returns:
            Value from attribute or element text
        """
        # Check attributes first
        if column in element.attrib:
            return element.attrib[column]

        # Then check child elements
        child = element.find(column)
        if child is not None:
            return child.text if child.text else ""

        return ""

    def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
        """Read and process a single table according to its structure.

        For XML files, each table is expected to be contained within elements
        matching the table name or a configured xpath.
        """
        try:
            self._init_xml()

            # Get required columns
            columns_to_read = [
                col.source_name
                for col in table_structure.columns
                if col.requirement != ColumnRequirement.CONFIRMATION_ONLY
            ]
            columns_to_read.append(table_structure.timestamp_column)

            # Find all elements for this table
            table_elements = self._root.findall(f".//{table_structure.name}")
            if not table_elements:
                logger.error("No elements found for table: %s", table_structure.name)
                return None

            # Extract data for each column
            data: Dict[str, List[str]] = {col: [] for col in columns_to_read}

            for element in table_elements:
                for column in columns_to_read:
                    value = self._extract_value(element, column)
                    data[column].append(value)

            # Convert to DataFrame
            df = pd.DataFrame(data)

            if df.empty:
                raise DataExistsError(f"No data found in table {table_structure.name}")

            # Process timestamps
            df, fmt = self._convert_timestamp_to_utc(
                df, table_structure.timestamp_column
            )

            # Validate required data
            missing_required = self._validate_required_data(df, table_structure.columns)

            return TableData(
                name=table_structure.name,
                dataframe=df,
                missing_required_columns=missing_required,
                timestamp_type=fmt,
            )

        except DataValidationError as e:
            logger.error("Validation error: %s", e)
            return None
        except DataExistsError as e:
            logger.error("No data error: %s", e)
            return None
        except DataProcessingError as e:
            logger.error("Processing error: %s", e)
            return None
        except ReaderError as e:
            logger.error("Unexpected error processing XML: %s", e)
            return None

__init__(path: Path, file_config: FileConfig)

Source code in src/readers/xml.py
def __init__(self, path: Path, file_config: FileConfig):
    super().__init__(path, file_config)
    self._tree = None
    self._root = None

read_table(table_structure: TableStructure) -> Optional[TableData]

Read and process a single table according to its structure.

For XML files, each table is expected to be contained within elements matching the table name or a configured xpath.

Source code in src/readers/xml.py
def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
    """Read and process a single table according to its structure.

    For XML files, each table is expected to be contained within elements
    matching the table name or a configured xpath.
    """
    try:
        self._init_xml()

        # Get required columns
        columns_to_read = [
            col.source_name
            for col in table_structure.columns
            if col.requirement != ColumnRequirement.CONFIRMATION_ONLY
        ]
        columns_to_read.append(table_structure.timestamp_column)

        # Find all elements for this table
        table_elements = self._root.findall(f".//{table_structure.name}")
        if not table_elements:
            logger.error("No elements found for table: %s", table_structure.name)
            return None

        # Extract data for each column
        data: Dict[str, List[str]] = {col: [] for col in columns_to_read}

        for element in table_elements:
            for column in columns_to_read:
                value = self._extract_value(element, column)
                data[column].append(value)

        # Convert to DataFrame
        df = pd.DataFrame(data)

        if df.empty:
            raise DataExistsError(f"No data found in table {table_structure.name}")

        # Process timestamps
        df, fmt = self._convert_timestamp_to_utc(
            df, table_structure.timestamp_column
        )

        # Validate required data
        missing_required = self._validate_required_data(df, table_structure.columns)

        return TableData(
            name=table_structure.name,
            dataframe=df,
            missing_required_columns=missing_required,
            timestamp_type=fmt,
        )

    except DataValidationError as e:
        logger.error("Validation error: %s", e)
        return None
    except DataExistsError as e:
        logger.error("No data error: %s", e)
        return None
    except DataProcessingError as e:
        logger.error("Processing error: %s", e)
        return None
    except ReaderError as e:
        logger.error("Unexpected error processing XML: %s", e)
        return None