Source code for core_db.etls.database_based

# -*- coding: utf-8 -*-

"""
Database-Based ETL Module
==========================

This module provides abstract base classes for building ETL (Extract, Transform, Load)
processes that extract data from database sources. It extends the core_etl framework
to provide specialized database connectivity and query execution capabilities.
"""

from __future__ import annotations

from abc import ABC
from abc import abstractmethod
from typing import Any
from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional

from core_etl.record_based import IBaseEtlFromRecord

from core_db.interfaces.base import IDatabaseClient


[docs] class IBaseEtlFromDatabase(IBaseEtlFromRecord, ABC): """ Abstract base class for ETL processes that extract data from databases. This class extends IBaseEtlFromRecord to provide specialized functionality for retrieving data from database sources. It manages database connections, executes queries, and processes records in configurable batches. The class handles the complete lifecycle of database-based ETL operations: - Connection establishment in pre_processing. - Query execution and data retrieval. - Batch-based record processing. - Connection cleanup in clean_resources. Attributes: database_type (str): The class name of the database client to use. connection_parameters (Dict): Parameters for establishing database connection. db_client (Optional[IDatabaseClient]): Active database client instance. base_query (Optional[str]): Base SQL query template for data retrieval. Usage: ------ .. code-block:: python class MyETL(IBaseEtlFromDatabase): def _execute_query(self, query): self.db_client.execute(query) def _fetch_records(self): return self.db_client.fetch_records() def process_records(self, records, **kwargs): # Transform and load records for record in records: self.transform_and_load(record) # Execute the ETL etl = MyETL( database_type="PostgresClient", connection_parameters={"conninfo": "postgresql://..."}, base_query="SELECT * FROM source_table", batch_size=1000 ) etl.run() .. """
[docs] def __init__( self, database_type: str, connection_parameters: Dict, base_query: Optional[str] = None, batch_size: int = 100, **kwargs, ) -> None: """ :param database_type: The name of the class that defines the database connection. :param connection_parameters: The parameters to create the database connection. :param base_query: Query base to use when retrieving data. :param batch_size: Size of the batch per database operation. """ super().__init__(**kwargs) self.database_type = database_type self.connection_parameters = connection_parameters self.db_client: Optional[IDatabaseClient] = None self.base_query = base_query self.batch_size = batch_size
[docs] def pre_processing(self) -> None: """ Initialize database connection before ETL processing begins. This method is called automatically before the ETL process starts. It: 1. Retrieves the appropriate database client class using the factory pattern 2. Instantiates the client with provided connection parameters 3. Establishes the database connection :param kwargs: Additional keyword arguments passed to parent pre_processing. :raises DatabaseClientException: If connection fails or database_type is invalid. """ super().pre_processing() database_cls = IDatabaseClient.get_class(self.database_type) self.db_client = database_cls(**self.connection_parameters) if database_cls else None if self.db_client: self.db_client.connect()
[docs] def get_query(self, *args, **kwargs) -> Optional[str]: """ Generate or return the SQL query for data retrieval. Override this method to implement dynamic query generation based on ETL parameters like date ranges, last processed values, or other criteria. :param args: Positional arguments for query generation. :param kwargs: Keyword arguments (last_processed, start, end, etc.). :return: SQL query string, or None if no query is needed. """ return self.base_query
[docs] def retrieve_records( self, last_processed: Any = None, start: Any = None, end: Any = None, **kwargs, ) -> Iterator[List[Dict]]: """ Retrieve records from the database in batches. This method orchestrates the data retrieval process by: 1. Generating the appropriate query using get_query(). 2. Executing the query via _execute_query(). 3. Fetching records via _fetch_records(). 4. Batching records according to batch_size. 5. Yielding batches for processing. :param last_processed: Identifier of the last processed record for incremental loads. :param start: Start boundary for data extraction (timestamp, ID, etc.). :param end: End boundary for data extraction (timestamp, ID, etc.). :param kwargs: Additional query parameters. :yield: Batches of records as lists of dictionaries. """ if self.db_client: self._execute_query( self.get_query( last_processed=last_processed, start=start, end=end, **kwargs ) ) batch = [] for record in self._fetch_records(): batch.append(record) if len(batch) == self.batch_size: yield batch batch = [] # Yielding any remaining records that # didn't reach batch_size... if batch: yield batch
[docs] @abstractmethod def _execute_query(self, query: Any): """ Execute the SQL query on the database connection. Concrete implementations must define this method to handle database-specific query execution. This typically involves calling the appropriate method on self.db_client. :param query: SQL query string to execute. """
[docs] @abstractmethod def _fetch_records(self) -> Iterator[Dict]: """ Fetch records from the executed query result. Concrete implementations must define this method to retrieve records from the database cursor or result set. This typically returns an iterator from the database client. :return: Iterator yielding dictionaries with column names as keys. """
[docs] def process_records(self, records: List[Dict], **kwargs): """ Process a batch of transformed records. Concrete implementations must define the load operations to perform with the transformed records. Common actions include: - Loading to target database. - Archiving to S3 or cloud storage. - Sending to message queues (SQS, Kinesis, Kafka). - Writing to files or SFTP servers. - Sending to APIs or webhooks. :param records: List of dictionaries representing processed records. :param kwargs: Additional parameters for processing. """
[docs] def clean_resources(self) -> None: """ Clean up database connection and resources after ETL completion. This method is called automatically at the end of the ETL process to ensure proper cleanup. It safely closes the database connection if one exists. """ if self.db_client: if getattr(self.db_client, "close", False): self.db_client.close()