Skip to content

Readers API

src.readers

Reader initialization and registration.

__all__ = ['BaseReader', 'SQLiteReader', 'CSVReader', 'XMLReader'] module-attribute

BaseReader

Bases: ABC

Abstract base class for all file format readers.

This class provides core functionality for reading diabetes device data files and automatic reader selection based on file types. It handles timestamp processing, data validation, and resource management.

Source code in src/readers/base.py
class BaseReader(ABC):
    """Abstract base class for all file format readers.

    This class provides core functionality for reading diabetes device data files
    and automatic reader selection based on file types. It handles timestamp processing,
    data validation, and resource management.
    """

    _readers: Dict[FileType, Type["BaseReader"]] = {}

    @classmethod
    def register(cls, file_type: FileType) -> Callable[[Type[T]], Type[T]]:
        """Register a reader class for a specific file type.

        Args:
            file_type: FileType enum value to associate with the reader

        Returns:
            Callable: Decorator function that registers the reader class
        """

        def wrapper(reader_cls: Type[T]) -> Type[T]:
            cls._readers[file_type] = reader_cls
            return reader_cls

        return wrapper

    @classmethod
    def get_reader_for_format(cls, fmt: DeviceFormat, file_path: Path) -> "BaseReader":
        """Get appropriate reader instance for the detected format.

        Args:
            fmt: Detected device format specification
            file_path: Path to the data file

        Returns:
            Instance of appropriate reader class

        Raises:
            ReaderError: If no reader is registered for the file type
        """
        for file_config in fmt.files:
            if Path(file_path).match(file_config.name_pattern):
                reader_cls = cls._readers.get(file_config.file_type)
                if reader_cls is None:
                    raise ReaderError(
                        f"No reader registered for file type: {file_config.file_type.value}"
                    )
                return reader_cls(file_path, file_config)

        raise ReaderError(f"No matching file configuration found for {file_path}")

    def __init__(self, path: Path, file_config: FileConfig):
        """Initialize reader with file path and configuration.

        Args:
            path: Path to the data file
            file_config: Configuration for the file format

        Raises:
            ValueError: If file does not exist
        """
        if not path.exists():
            raise ValueError(f"File not found: {path}")

        self.file_path = path
        self.file_config = file_config

    def __enter__(self):
        """Context manager entry."""
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Cleanup resources if needed."""
        self._cleanup()

    def _cleanup(self):
        """Override this method in derived classes if cleanup is needed."""

    @abstractmethod
    def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
        """Read and process a single table according to its structure.

        This method must be implemented by each specific reader.
        """

    def read_all_tables(self) -> Dict[str, TableData]:
        """Read and process all tables defined in the file configuration."""
        results = {}
        for table_config in self.file_config.tables:
            table_data = self.read_table(table_config)
            if table_data is not None:
                if table_data.missing_required_columns:
                    logger.warning(
                        "Table %s missing required data in columns: %s",
                        table_data.name,
                        table_data.missing_required_columns,
                    )
                results[table_data.name] = table_data
            else:
                logger.error("Failed to process table: %s", table_config.name)

        return results

    def detect_timestamp_format(self, series: pd.Series) -> TimestampType:
        """Detect the format of timestamp data, assuming chronological order."""
        try:
            # Sample timestamps without sorting
            sample = series.dropna().head(10)
            if sample.empty:
                logger.warning("No non-null timestamps found in sample")
                return TimestampType.UNKNOWN

            # Check if values are monotonically increasing
            if not sample.is_monotonic_increasing:
                logger.warning("Timestamps are not in chronological order")
                return TimestampType.UNKNOWN

            # pylint: disable=R1705
            # Supress pylint warning and use elif functionality for efficiency
            # Check for UNIX epoch formats
            if all(sample.astype(float) < 1e10):  # Seconds
                logger.debug("Detected timestamp type: UNIX_SECONDS")
                return TimestampType.UNIX_SECONDS
            elif all(sample.astype(float) < 1e13):  # Milliseconds
                logger.debug("Detected timestamp type: UNIX_MILLISECONDS")
                return TimestampType.UNIX_MILLISECONDS
            elif all(sample.astype(float) < 1e16):  # Microseconds
                logger.debug("Detected timestamp type: UNIX_MICROSECONDS")
                return TimestampType.UNIX_MICROSECONDS

            # Try ISO 8601 for string timestamps
            try:
                pd.to_datetime(sample, utc=True)
                logger.debug("Detected timestamp type: ISO_8601")
                return TimestampType.ISO_8601
            except TimestampProcessingError:
                pass

            logger.warning("Could not determine timestamp format")
            return TimestampType.UNKNOWN

        except TimestampProcessingError as e:
            logger.error("Error during timestamp detection: %s", e)
            return TimestampType.UNKNOWN

    def _convert_timestamp_to_utc(
        self, df: pd.DataFrame, timestamp_column: str
    ) -> Tuple[pd.DataFrame, TimestampType]:
        """Convert timestamp column to UTC datetime and set as index."""
        fmt = self.detect_timestamp_format(df[timestamp_column])

        if fmt == TimestampType.UNKNOWN:
            raise TimestampProcessingError(
                f"Could not detect timestamp format for column {timestamp_column}"
            )

        try:
            if fmt == TimestampType.UNIX_SECONDS:
                df[timestamp_column] = pd.to_datetime(
                    df[timestamp_column], unit="s", utc=True
                )
            elif fmt == TimestampType.UNIX_MILLISECONDS:
                df[timestamp_column] = pd.to_datetime(
                    df[timestamp_column], unit="ms", utc=True
                )
            elif fmt == TimestampType.UNIX_MICROSECONDS:
                df[timestamp_column] = pd.to_datetime(
                    df[timestamp_column], unit="us", utc=True
                )
            elif fmt == TimestampType.ISO_8601:
                df[timestamp_column] = pd.to_datetime(df[timestamp_column], utc=True)

            return df.set_index(timestamp_column).sort_index(), fmt

        except DataProcessingError as e:
            logger.error("Error converting timestamps: %s", e)
            raise DataProcessingError(f"Invalid value: {timestamp_column}") from e

    def _validate_required_data(
        self, df: pd.DataFrame, columns: List[ColumnMapping]
    ) -> List[str]:
        """Check for missing data in required columns."""
        missing_required = []
        for col in columns:
            if (
                col.requirement == ColumnRequirement.REQUIRED_WITH_DATA
                and col.source_name in df.columns
                and df[col.source_name].isna().all()
            ):
                missing_required.append(col.source_name)
        return missing_required

    @staticmethod
    def _validate_identifier(identifier: str) -> bool:
        """Validate that an identifier only contains safe characters."""
        return all(c.isalnum() or c in ["_", "."] for c in identifier)

_readers: Dict[FileType, Type[BaseReader]] = {} class-attribute instance-attribute

file_path = path instance-attribute

file_config = file_config instance-attribute

register(file_type: FileType) -> Callable[[Type[T]], Type[T]] classmethod

Register a reader class for a specific file type.

Parameters:

Name Type Description Default
file_type FileType

FileType enum value to associate with the reader

required

Returns:

Name Type Description
Callable Callable[[Type[T]], Type[T]]

Decorator function that registers the reader class

Source code in src/readers/base.py
@classmethod
def register(cls, file_type: FileType) -> Callable[[Type[T]], Type[T]]:
    """Register a reader class for a specific file type.

    Args:
        file_type: FileType enum value to associate with the reader

    Returns:
        Callable: Decorator function that registers the reader class
    """

    def wrapper(reader_cls: Type[T]) -> Type[T]:
        cls._readers[file_type] = reader_cls
        return reader_cls

    return wrapper

get_reader_for_format(fmt: DeviceFormat, file_path: Path) -> BaseReader classmethod

Get appropriate reader instance for the detected format.

Parameters:

Name Type Description Default
fmt DeviceFormat

Detected device format specification

required
file_path Path

Path to the data file

required

Returns:

Type Description
BaseReader

Instance of appropriate reader class

Raises:

Type Description
ReaderError

If no reader is registered for the file type

Source code in src/readers/base.py
@classmethod
def get_reader_for_format(cls, fmt: DeviceFormat, file_path: Path) -> "BaseReader":
    """Get appropriate reader instance for the detected format.

    Args:
        fmt: Detected device format specification
        file_path: Path to the data file

    Returns:
        Instance of appropriate reader class

    Raises:
        ReaderError: If no reader is registered for the file type
    """
    for file_config in fmt.files:
        if Path(file_path).match(file_config.name_pattern):
            reader_cls = cls._readers.get(file_config.file_type)
            if reader_cls is None:
                raise ReaderError(
                    f"No reader registered for file type: {file_config.file_type.value}"
                )
            return reader_cls(file_path, file_config)

    raise ReaderError(f"No matching file configuration found for {file_path}")

__init__(path: Path, file_config: FileConfig)

Initialize reader with file path and configuration.

Parameters:

Name Type Description Default
path Path

Path to the data file

required
file_config FileConfig

Configuration for the file format

required

Raises:

Type Description
ValueError

If file does not exist

Source code in src/readers/base.py
def __init__(self, path: Path, file_config: FileConfig):
    """Initialize reader with file path and configuration.

    Args:
        path: Path to the data file
        file_config: Configuration for the file format

    Raises:
        ValueError: If file does not exist
    """
    if not path.exists():
        raise ValueError(f"File not found: {path}")

    self.file_path = path
    self.file_config = file_config

__enter__()

Context manager entry.

Source code in src/readers/base.py
def __enter__(self):
    """Context manager entry."""
    return self

__exit__(exc_type, exc_val, exc_tb)

Cleanup resources if needed.

Source code in src/readers/base.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Cleanup resources if needed."""
    self._cleanup()

_cleanup()

Override this method in derived classes if cleanup is needed.

Source code in src/readers/base.py
def _cleanup(self):
    """Override this method in derived classes if cleanup is needed."""

read_table(table_structure: TableStructure) -> Optional[TableData] abstractmethod

Read and process a single table according to its structure.

This method must be implemented by each specific reader.

Source code in src/readers/base.py
@abstractmethod
def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
    """Read and process a single table according to its structure.

    This method must be implemented by each specific reader.
    """

read_all_tables() -> Dict[str, TableData]

Read and process all tables defined in the file configuration.

Source code in src/readers/base.py
def read_all_tables(self) -> Dict[str, TableData]:
    """Read and process all tables defined in the file configuration."""
    results = {}
    for table_config in self.file_config.tables:
        table_data = self.read_table(table_config)
        if table_data is not None:
            if table_data.missing_required_columns:
                logger.warning(
                    "Table %s missing required data in columns: %s",
                    table_data.name,
                    table_data.missing_required_columns,
                )
            results[table_data.name] = table_data
        else:
            logger.error("Failed to process table: %s", table_config.name)

    return results

detect_timestamp_format(series: pd.Series) -> TimestampType

Detect the format of timestamp data, assuming chronological order.

Source code in src/readers/base.py
def detect_timestamp_format(self, series: pd.Series) -> TimestampType:
    """Detect the format of timestamp data, assuming chronological order."""
    try:
        # Sample timestamps without sorting
        sample = series.dropna().head(10)
        if sample.empty:
            logger.warning("No non-null timestamps found in sample")
            return TimestampType.UNKNOWN

        # Check if values are monotonically increasing
        if not sample.is_monotonic_increasing:
            logger.warning("Timestamps are not in chronological order")
            return TimestampType.UNKNOWN

        # pylint: disable=R1705
        # Supress pylint warning and use elif functionality for efficiency
        # Check for UNIX epoch formats
        if all(sample.astype(float) < 1e10):  # Seconds
            logger.debug("Detected timestamp type: UNIX_SECONDS")
            return TimestampType.UNIX_SECONDS
        elif all(sample.astype(float) < 1e13):  # Milliseconds
            logger.debug("Detected timestamp type: UNIX_MILLISECONDS")
            return TimestampType.UNIX_MILLISECONDS
        elif all(sample.astype(float) < 1e16):  # Microseconds
            logger.debug("Detected timestamp type: UNIX_MICROSECONDS")
            return TimestampType.UNIX_MICROSECONDS

        # Try ISO 8601 for string timestamps
        try:
            pd.to_datetime(sample, utc=True)
            logger.debug("Detected timestamp type: ISO_8601")
            return TimestampType.ISO_8601
        except TimestampProcessingError:
            pass

        logger.warning("Could not determine timestamp format")
        return TimestampType.UNKNOWN

    except TimestampProcessingError as e:
        logger.error("Error during timestamp detection: %s", e)
        return TimestampType.UNKNOWN

_convert_timestamp_to_utc(df: pd.DataFrame, timestamp_column: str) -> Tuple[pd.DataFrame, TimestampType]

Convert timestamp column to UTC datetime and set as index.

Source code in src/readers/base.py
def _convert_timestamp_to_utc(
    self, df: pd.DataFrame, timestamp_column: str
) -> Tuple[pd.DataFrame, TimestampType]:
    """Convert timestamp column to UTC datetime and set as index."""
    fmt = self.detect_timestamp_format(df[timestamp_column])

    if fmt == TimestampType.UNKNOWN:
        raise TimestampProcessingError(
            f"Could not detect timestamp format for column {timestamp_column}"
        )

    try:
        if fmt == TimestampType.UNIX_SECONDS:
            df[timestamp_column] = pd.to_datetime(
                df[timestamp_column], unit="s", utc=True
            )
        elif fmt == TimestampType.UNIX_MILLISECONDS:
            df[timestamp_column] = pd.to_datetime(
                df[timestamp_column], unit="ms", utc=True
            )
        elif fmt == TimestampType.UNIX_MICROSECONDS:
            df[timestamp_column] = pd.to_datetime(
                df[timestamp_column], unit="us", utc=True
            )
        elif fmt == TimestampType.ISO_8601:
            df[timestamp_column] = pd.to_datetime(df[timestamp_column], utc=True)

        return df.set_index(timestamp_column).sort_index(), fmt

    except DataProcessingError as e:
        logger.error("Error converting timestamps: %s", e)
        raise DataProcessingError(f"Invalid value: {timestamp_column}") from e

_validate_required_data(df: pd.DataFrame, columns: List[ColumnMapping]) -> List[str]

Check for missing data in required columns.

Source code in src/readers/base.py
def _validate_required_data(
    self, df: pd.DataFrame, columns: List[ColumnMapping]
) -> List[str]:
    """Check for missing data in required columns."""
    missing_required = []
    for col in columns:
        if (
            col.requirement == ColumnRequirement.REQUIRED_WITH_DATA
            and col.source_name in df.columns
            and df[col.source_name].isna().all()
        ):
            missing_required.append(col.source_name)
    return missing_required

_validate_identifier(identifier: str) -> bool staticmethod

Validate that an identifier only contains safe characters.

Source code in src/readers/base.py
@staticmethod
def _validate_identifier(identifier: str) -> bool:
    """Validate that an identifier only contains safe characters."""
    return all(c.isalnum() or c in ["_", "."] for c in identifier)

CSVReader

Bases: BaseReader

Reads and processes CSV files according to the provided format configuration.

Source code in src/readers/csv.py
@BaseReader.register(FileType.CSV)
class CSVReader(BaseReader):
    """Reads and processes CSV files according to the provided format configuration."""

    def __init__(self, path: Path, file_config: FileConfig):
        super().__init__(path, file_config)
        self._data = None

    def _cleanup(self):
        """Cleanup any held resources."""
        self._data = None

    def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
        """Read and process a single table according to its structure.

        For CSV files, we treat each file as a single table, reading all data at once
        and caching it for subsequent operations if needed.
        """
        try:
            # Read data if not already cached
            if self._data is None:
                try:
                    self._data = pd.read_csv(
                        self.file_path,
                        encoding="utf-8",
                        low_memory=False,  # Prevent mixed type inference warnings
                    )
                except Exception as e:
                    raise FileAccessError(f"Failed to read CSV file: {e}") from e

            if self._data.empty:
                raise DataExistsError(f"No data found in CSV file {self.file_path}")

            # Get required columns
            columns_to_read = [
                col.source_name
                for col in table_structure.columns
                if col.requirement != ColumnRequirement.CONFIRMATION_ONLY
            ]
            columns_to_read.append(table_structure.timestamp_column)

            # Check for missing columns
            missing_columns = [
                col for col in columns_to_read if col not in self._data.columns
            ]
            if missing_columns:
                logger.error(
                    "Required columns missing from CSV: %s", ", ".join(missing_columns)
                )
                return None

            # Select only needed columns and make a copy
            df = self._data[columns_to_read].copy()

            # Process timestamps
            df, fmt = self._convert_timestamp_to_utc(
                df, table_structure.timestamp_column
            )

            # Validate required data
            missing_required = self._validate_required_data(df, table_structure.columns)

            return TableData(
                name=table_structure.name,
                dataframe=df,
                missing_required_columns=missing_required,
                timestamp_type=fmt,
            )

        except DataValidationError as e:
            logger.error("Validation error: %s", e)
            return None
        except DataExistsError as e:
            logger.error("No data error: %s", e)
            return None
        except DataProcessingError as e:
            logger.error("Processing error: %s", e)
            return None
        except ProcessingError as e:
            logger.error("Unexpected error processing CSV: %s", e)
            return None

_data = None instance-attribute

__init__(path: Path, file_config: FileConfig)

Source code in src/readers/csv.py
def __init__(self, path: Path, file_config: FileConfig):
    super().__init__(path, file_config)
    self._data = None

_cleanup()

Cleanup any held resources.

Source code in src/readers/csv.py
def _cleanup(self):
    """Cleanup any held resources."""
    self._data = None

read_table(table_structure: TableStructure) -> Optional[TableData]

Read and process a single table according to its structure.

For CSV files, we treat each file as a single table, reading all data at once and caching it for subsequent operations if needed.

Source code in src/readers/csv.py
def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
    """Read and process a single table according to its structure.

    For CSV files, we treat each file as a single table, reading all data at once
    and caching it for subsequent operations if needed.
    """
    try:
        # Read data if not already cached
        if self._data is None:
            try:
                self._data = pd.read_csv(
                    self.file_path,
                    encoding="utf-8",
                    low_memory=False,  # Prevent mixed type inference warnings
                )
            except Exception as e:
                raise FileAccessError(f"Failed to read CSV file: {e}") from e

        if self._data.empty:
            raise DataExistsError(f"No data found in CSV file {self.file_path}")

        # Get required columns
        columns_to_read = [
            col.source_name
            for col in table_structure.columns
            if col.requirement != ColumnRequirement.CONFIRMATION_ONLY
        ]
        columns_to_read.append(table_structure.timestamp_column)

        # Check for missing columns
        missing_columns = [
            col for col in columns_to_read if col not in self._data.columns
        ]
        if missing_columns:
            logger.error(
                "Required columns missing from CSV: %s", ", ".join(missing_columns)
            )
            return None

        # Select only needed columns and make a copy
        df = self._data[columns_to_read].copy()

        # Process timestamps
        df, fmt = self._convert_timestamp_to_utc(
            df, table_structure.timestamp_column
        )

        # Validate required data
        missing_required = self._validate_required_data(df, table_structure.columns)

        return TableData(
            name=table_structure.name,
            dataframe=df,
            missing_required_columns=missing_required,
            timestamp_type=fmt,
        )

    except DataValidationError as e:
        logger.error("Validation error: %s", e)
        return None
    except DataExistsError as e:
        logger.error("No data error: %s", e)
        return None
    except DataProcessingError as e:
        logger.error("Processing error: %s", e)
        return None
    except ProcessingError as e:
        logger.error("Unexpected error processing CSV: %s", e)
        return None

SQLiteReader

Bases: BaseReader

Reads and processes SQLite files according to the provided format configuration.

Source code in src/readers/sqlite.py
@BaseReader.register(FileType.SQLITE)
class SQLiteReader(BaseReader):
    """Reads and processes SQLite files according to the provided format configuration."""

    def __init__(self, path: Path, file_config: FileConfig):
        super().__init__(path, file_config)
        self._engine = None

    @property
    def engine(self):
        """Lazy initialization of database engine."""
        if self._engine is None:
            self._engine = create_engine(f"sqlite:///{self.file_path}")
        return self._engine

    def _cleanup(self):
        """Cleanup database connections."""
        if self._engine is not None:
            self._engine.dispose()
            self._engine = None

    def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
        """Read and process a single table according to its structure."""
        try:
            # Validate identifiers
            if not self._validate_identifier(table_structure.name):
                raise DataValidationError(f"Invalid table name: {table_structure.name}")

            # Read only needed columns
            columns_to_read = [
                col.source_name
                for col in table_structure.columns
                if col.requirement != ColumnRequirement.CONFIRMATION_ONLY
            ]
            columns_to_read.append(table_structure.timestamp_column)

            # Validate column names
            for col in columns_to_read:
                if not self._validate_identifier(col):
                    raise DataValidationError(f"Invalid column name: {col}")

            # Create query with quoted identifiers for SQLite
            quoted_columns = [f'"{col}"' for col in columns_to_read]
            query = text(
                f"""
                SELECT {', '.join(quoted_columns)}
                FROM "{table_structure.name}"
                ORDER BY "{table_structure.timestamp_column}"
            """
            )

            # Execute query within connection context
            with self.engine.connect() as conn:
                df = pd.read_sql_query(query, conn)

            # Process timestamps
            df, fmt = self._convert_timestamp_to_utc(
                df, table_structure.timestamp_column
            )

            # Validate required data
            missing_required = self._validate_required_data(df, table_structure.columns)

            return TableData(
                name=table_structure.name,
                dataframe=df,
                missing_required_columns=missing_required,
                timestamp_type=fmt,
            )

        except DataValidationError as e:
            # Handle specific ValueErrors such as invalid table or column names
            logger.error("ValueError: %s", e)
            return None
        except SQLAlchemyError as e:
            # Handle any database-related errors (e.g., connection, query execution)
            logger.error(
                "SQLAlchemyError processing table %s: %s", table_structure.name, e
            )
            return None
        except DataExistsError as e:
            # Handle case where there is no data in the result set
            logger.error(
                "EmptyDataError processing table %s: %s", table_structure.name, e
            )
            return None
        except ReaderError as e:
            # Catch any unexpected errors
            logger.error(
                "Unexpected error processing table %s: %s", table_structure.name, e
            )
            return None

_engine = None instance-attribute

engine property

Lazy initialization of database engine.

__init__(path: Path, file_config: FileConfig)

Source code in src/readers/sqlite.py
def __init__(self, path: Path, file_config: FileConfig):
    super().__init__(path, file_config)
    self._engine = None

_cleanup()

Cleanup database connections.

Source code in src/readers/sqlite.py
def _cleanup(self):
    """Cleanup database connections."""
    if self._engine is not None:
        self._engine.dispose()
        self._engine = None

read_table(table_structure: TableStructure) -> Optional[TableData]

Read and process a single table according to its structure.

Source code in src/readers/sqlite.py
def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
    """Read and process a single table according to its structure."""
    try:
        # Validate identifiers
        if not self._validate_identifier(table_structure.name):
            raise DataValidationError(f"Invalid table name: {table_structure.name}")

        # Read only needed columns
        columns_to_read = [
            col.source_name
            for col in table_structure.columns
            if col.requirement != ColumnRequirement.CONFIRMATION_ONLY
        ]
        columns_to_read.append(table_structure.timestamp_column)

        # Validate column names
        for col in columns_to_read:
            if not self._validate_identifier(col):
                raise DataValidationError(f"Invalid column name: {col}")

        # Create query with quoted identifiers for SQLite
        quoted_columns = [f'"{col}"' for col in columns_to_read]
        query = text(
            f"""
            SELECT {', '.join(quoted_columns)}
            FROM "{table_structure.name}"
            ORDER BY "{table_structure.timestamp_column}"
        """
        )

        # Execute query within connection context
        with self.engine.connect() as conn:
            df = pd.read_sql_query(query, conn)

        # Process timestamps
        df, fmt = self._convert_timestamp_to_utc(
            df, table_structure.timestamp_column
        )

        # Validate required data
        missing_required = self._validate_required_data(df, table_structure.columns)

        return TableData(
            name=table_structure.name,
            dataframe=df,
            missing_required_columns=missing_required,
            timestamp_type=fmt,
        )

    except DataValidationError as e:
        # Handle specific ValueErrors such as invalid table or column names
        logger.error("ValueError: %s", e)
        return None
    except SQLAlchemyError as e:
        # Handle any database-related errors (e.g., connection, query execution)
        logger.error(
            "SQLAlchemyError processing table %s: %s", table_structure.name, e
        )
        return None
    except DataExistsError as e:
        # Handle case where there is no data in the result set
        logger.error(
            "EmptyDataError processing table %s: %s", table_structure.name, e
        )
        return None
    except ReaderError as e:
        # Catch any unexpected errors
        logger.error(
            "Unexpected error processing table %s: %s", table_structure.name, e
        )
        return None

XMLReader

Bases: BaseReader

Reads and processes XML files according to the provided format configuration.

Source code in src/readers/xml.py
@BaseReader.register(FileType.XML)
class XMLReader(BaseReader):
    """Reads and processes XML files according to the provided format configuration."""

    def __init__(self, path: Path, file_config: FileConfig):
        super().__init__(path, file_config)
        self._tree = None
        self._root = None

    def _cleanup(self):
        """Cleanup any held resources."""
        self._tree = None
        self._root = None

    def _init_xml(self):
        """Initialize XML parsing if not already done."""
        if self._root is None:
            try:
                self._tree = ET.parse(self.file_path)
                self._root = self._tree.getroot()
            except ET.ParseError as e:
                raise DataExistsError(f"Failed to parse XML file: {e}") from e
            except Exception as e:
                raise DataExistsError(f"Error reading XML file: {e}") from e

    @staticmethod
    def _extract_value(element: ET.Element, column: str) -> str:
        """Extract value from XML element, checking both attributes and text.

        Args:
            element: XML element to extract from
            column: Column name to look for

        Returns:
            Value from attribute or element text
        """
        # Check attributes first
        if column in element.attrib:
            return element.attrib[column]

        # Then check child elements
        child = element.find(column)
        if child is not None:
            return child.text if child.text else ""

        return ""

    def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
        """Read and process a single table according to its structure.

        For XML files, each table is expected to be contained within elements
        matching the table name or a configured xpath.
        """
        try:
            self._init_xml()

            # Get required columns
            columns_to_read = [
                col.source_name
                for col in table_structure.columns
                if col.requirement != ColumnRequirement.CONFIRMATION_ONLY
            ]
            columns_to_read.append(table_structure.timestamp_column)

            # Find all elements for this table
            table_elements = self._root.findall(f".//{table_structure.name}")
            if not table_elements:
                logger.error("No elements found for table: %s", table_structure.name)
                return None

            # Extract data for each column
            data: Dict[str, List[str]] = {col: [] for col in columns_to_read}

            for element in table_elements:
                for column in columns_to_read:
                    value = self._extract_value(element, column)
                    data[column].append(value)

            # Convert to DataFrame
            df = pd.DataFrame(data)

            if df.empty:
                raise DataExistsError(f"No data found in table {table_structure.name}")

            # Process timestamps
            df, fmt = self._convert_timestamp_to_utc(
                df, table_structure.timestamp_column
            )

            # Validate required data
            missing_required = self._validate_required_data(df, table_structure.columns)

            return TableData(
                name=table_structure.name,
                dataframe=df,
                missing_required_columns=missing_required,
                timestamp_type=fmt,
            )

        except DataValidationError as e:
            logger.error("Validation error: %s", e)
            return None
        except DataExistsError as e:
            logger.error("No data error: %s", e)
            return None
        except DataProcessingError as e:
            logger.error("Processing error: %s", e)
            return None
        except ReaderError as e:
            logger.error("Unexpected error processing XML: %s", e)
            return None

_tree = None instance-attribute

_root = None instance-attribute

__init__(path: Path, file_config: FileConfig)

Source code in src/readers/xml.py
def __init__(self, path: Path, file_config: FileConfig):
    super().__init__(path, file_config)
    self._tree = None
    self._root = None

_cleanup()

Cleanup any held resources.

Source code in src/readers/xml.py
def _cleanup(self):
    """Cleanup any held resources."""
    self._tree = None
    self._root = None

_init_xml()

Initialize XML parsing if not already done.

Source code in src/readers/xml.py
def _init_xml(self):
    """Initialize XML parsing if not already done."""
    if self._root is None:
        try:
            self._tree = ET.parse(self.file_path)
            self._root = self._tree.getroot()
        except ET.ParseError as e:
            raise DataExistsError(f"Failed to parse XML file: {e}") from e
        except Exception as e:
            raise DataExistsError(f"Error reading XML file: {e}") from e

_extract_value(element: ET.Element, column: str) -> str staticmethod

Extract value from XML element, checking both attributes and text.

Parameters:

Name Type Description Default
element Element

XML element to extract from

required
column str

Column name to look for

required

Returns:

Type Description
str

Value from attribute or element text

Source code in src/readers/xml.py
@staticmethod
def _extract_value(element: ET.Element, column: str) -> str:
    """Extract value from XML element, checking both attributes and text.

    Args:
        element: XML element to extract from
        column: Column name to look for

    Returns:
        Value from attribute or element text
    """
    # Check attributes first
    if column in element.attrib:
        return element.attrib[column]

    # Then check child elements
    child = element.find(column)
    if child is not None:
        return child.text if child.text else ""

    return ""

read_table(table_structure: TableStructure) -> Optional[TableData]

Read and process a single table according to its structure.

For XML files, each table is expected to be contained within elements matching the table name or a configured xpath.

Source code in src/readers/xml.py
def read_table(self, table_structure: TableStructure) -> Optional[TableData]:
    """Read and process a single table according to its structure.

    For XML files, each table is expected to be contained within elements
    matching the table name or a configured xpath.
    """
    try:
        self._init_xml()

        # Get required columns
        columns_to_read = [
            col.source_name
            for col in table_structure.columns
            if col.requirement != ColumnRequirement.CONFIRMATION_ONLY
        ]
        columns_to_read.append(table_structure.timestamp_column)

        # Find all elements for this table
        table_elements = self._root.findall(f".//{table_structure.name}")
        if not table_elements:
            logger.error("No elements found for table: %s", table_structure.name)
            return None

        # Extract data for each column
        data: Dict[str, List[str]] = {col: [] for col in columns_to_read}

        for element in table_elements:
            for column in columns_to_read:
                value = self._extract_value(element, column)
                data[column].append(value)

        # Convert to DataFrame
        df = pd.DataFrame(data)

        if df.empty:
            raise DataExistsError(f"No data found in table {table_structure.name}")

        # Process timestamps
        df, fmt = self._convert_timestamp_to_utc(
            df, table_structure.timestamp_column
        )

        # Validate required data
        missing_required = self._validate_required_data(df, table_structure.columns)

        return TableData(
            name=table_structure.name,
            dataframe=df,
            missing_required_columns=missing_required,
            timestamp_type=fmt,
        )

    except DataValidationError as e:
        logger.error("Validation error: %s", e)
        return None
    except DataExistsError as e:
        logger.error("No data error: %s", e)
        return None
    except DataProcessingError as e:
        logger.error("Processing error: %s", e)
        return None
    except ReaderError as e:
        logger.error("Unexpected error processing XML: %s", e)
        return None