class FormatDetector:
"""Detects device formats by examining file structure."""
def __init__(self, format_registry: FormatRegistry):
"""initialise detector with format registry."""
self._registry = format_registry
def detect_format(
self, path: Path
) -> Tuple[Optional[DeviceFormat], Optional[str], Dict[str, ValidationResult]]:
"""Detect format of provided file.
Args:
path: Path to the file to check
Returns:
Tuple containing:
- Matched format (or None)
- Error message (or None)
- Dictionary of validation results per format tried
Example:
>>> detector = FormatDetector(registry)
>>> fmt, error, results = detector.detect_format(Path("data.sqlite"))
>>> if fmt:
... print(f"Matched format: {fmt.name}")
... else:
... print(f"No match: {error}")
"""
logger.debug("Starting format detection for: %s", path)
val_results = {}
try:
# Validate file exists and is readable
if not self._validate_file_exists(path):
return None, f"File not found or not accessible: {path}", {}
# Get potential formats based on file extension
potential_formats = self._registry.get_formats_for_file(path)
if not potential_formats:
return None, f"No formats available for {path.suffix} files", {}
# Try each format
for fmt in potential_formats:
try:
val_test_result = ValidationResult()
if self._validate_format(path, fmt, val_test_result):
logger.debug("Successfully matched format: %s", fmt.name)
return fmt, None, val_results
val_results[fmt.name] = val_test_result
except FormatValidationError as e:
logger.debug("Error validating format %s: %s", fmt.name, str(e))
continue
return None, "No matching format found", val_results
except FileAccessError as e:
logger.error("Unexpected error during format detection: %s", str(e))
return None, f"Detection error: {str(e)}", {}
def _validate_file_exists(self, path: Path) -> bool:
"""Validate file exists and is accessible."""
try:
return path.exists() and path.is_file()
except Exception as e:
raise FileAccessError(f"Error occurred: {str(e)}") from e
def _validate_format(
self, path: Path, fmt: DeviceFormat, validation_result: ValidationResult
) -> bool:
"""Validate if file matches format definition."""
for config in fmt.files:
validator = self._get_validator(config.file_type)
if validator is None:
logger.warning("No validator available for %s", config.file_type.value)
return False
try:
if not validator(path, config, validation_result):
return False
except FormatValidationError as e:
logger.debug("Validation failed: %s", {str(e)})
return False
return True
def _get_validator(self, file_type: FileType):
"""Get appropriate validation function for file type."""
validators = {
FileType.SQLITE: self._validate_sqlite,
FileType.CSV: self._validate_csv,
FileType.JSON: self._validate_json,
FileType.XML: self._validate_xml,
}
return validators.get(file_type)
def _validate_sqlite(
self, path: Path, config, val_result: ValidationResult
) -> bool:
"""Validate SQLite file structure."""
try:
engine = create_engine(f"sqlite:///{path}")
inspector = inspect(engine)
# Get all tables (case sensitive)
actual_tables = {name: name for name in inspector.get_table_names()}
# Check each required table
for required_table in config.tables:
table_name = required_table.name
if table_name not in actual_tables:
val_result.missing_tables.append(required_table.name)
continue
# Check columns
columns = inspector.get_columns(table_name)
column_names = {col["name"] for col in columns}
# Check required columns exist in file
required_columns = {
col.source_name
for col in required_table.columns
if col.requirement != ColumnRequirement.OPTIONAL
}
missing = required_columns - column_names
if missing:
val_result.missing_columns[required_table.name] = [
col.source_name
for col in required_table.columns
if col.requirement != ColumnRequirement.OPTIONAL
and col.source_name in missing
]
return not val_result.has_errors()
except FormatValidationError as e:
logger.debug("SQLite validation error: %s", str(e))
return False
def _validate_csv(self, path: Path, config, val_result: ValidationResult) -> bool:
"""Validate CSV file structure."""
try:
# CSV should have exactly one table
csv_table = config.tables[0]
# If the format specifies a header_row use it directly
header_row = getattr(csv_table, "header_row", None)
if header_row is not None:
header_found = False
with open(path, encoding="utf-8", newline="") as fh:
reader = csv.reader(fh)
for idx, row in enumerate(reader):
if idx == header_row:
# normalize column names: strip whitespace and lowercase
row_columns = {
str(col).strip().lower()
for col in row
if col is not None
}
header_found = True
break
if not header_found:
# requested header row does not exist in file
val_result.missing_columns[""] = [
col.source_name
for col in csv_table.columns
if col.requirement != ColumnRequirement.OPTIONAL
]
return False
required_columns = {
col.source_name.strip().lower()
for col in csv_table.columns
if col.requirement != ColumnRequirement.OPTIONAL
}
missing = required_columns - row_columns
if missing:
val_result.missing_columns[csv_table.name] = [
col.source_name
for col in csv_table.columns
if col.requirement != ColumnRequirement.OPTIONAL
and col.source_name.strip().lower() in missing
]
return False
return True
# No header_row specified: keep scanning first 4 rows for a header
df = pd.read_csv(path, nrows=4, header=None)
# Check each of the first 4 rows to see if it contains valid column headers
found_header_row = None
for row_idx in range(min(4, len(df))):
# normalise string values: strip whitespace and lowercase
row_columns = {str(col).strip().lower() for col in df.iloc[row_idx]}
logger.debug("CSV header row %d: %s", row_idx, row_columns)
required_columns = {
col.source_name.strip().lower()
for col in csv_table.columns
if col.requirement != ColumnRequirement.OPTIONAL
}
missing = required_columns - row_columns
# If this row has all required columns, treat it as the header
if not missing:
found_header_row = row_idx
break
if found_header_row is None:
# No valid header found in first 4 rows
val_result.missing_columns[""] = [
col.source_name
for col in csv_table.columns
if col.requirement != ColumnRequirement.OPTIONAL
]
return False
# Successfully found header row
return True
except FormatValidationError as e:
logger.debug("CSV validation error: %s", str(e))
return False
def _validate_json(self, path: Path, config, val_result: ValidationResult) -> bool:
"""Validate JSON file structure."""
try:
with open(path, encoding="utf-8") as f:
data = json.load(f)
for json_table in config.tables:
if isinstance(data, list):
if not data:
val_result.missing_tables.append(json_table.name)
continue
record = data[0]
else:
if json_table.name not in data:
val_result.missing_tables.append(json_table.name)
continue
record = (
data[json_table.name][0]
if isinstance(data[json_table.name], list)
else data[json_table.name]
)
# Check required fields
fields = {k.lower() for k in record.keys()}
required_fields = {
col.source_name.lower()
for col in json_table.columns
if col.requirement != ColumnRequirement.OPTIONAL
}
missing = required_fields - fields
if missing:
val_result.missing_columns[json_table.name] = [
col
for col in json_table.columns
if col.requirement != ColumnRequirement.OPTIONAL
and col.source_name.lower() in missing
]
return not val_result.has_errors()
except FormatValidationError as e:
logger.debug("JSON validation error: %s", str(e))
return False
def _validate_xml(self, path: Path, config, val_result: ValidationResult) -> bool:
"""Validate XML file structure."""
try:
tree = ET.parse(path)
root = tree.getroot()
for xml_table in config.tables:
elements = root.findall(f".//{xml_table.name}")
if not elements:
val_result.missing_tables.append(xml_table.name)
continue
# Check first element
element = elements[0]
fields = set()
fields.update(element.attrib.keys())
fields.update(child.tag for child in element)
fields = {f.lower() for f in fields}
required_fields = {
col.source_name.lower()
for col in xml_table.columns
if col.requirement != ColumnRequirement.OPTIONAL
}
missing = required_fields - fields
if missing:
val_result.missing_columns[xml_table.name] = [
col
for col in xml_table.columns
if col.requirement != ColumnRequirement.OPTIONAL
and col.source_name.lower() in missing
]
return not val_result.has_errors()
except FormatValidationError as e:
logger.debug("XML validation error: %s", str(e))
return False