Interfaces#

This module provides abstract base classes and exceptions for implementing database clients with a standardized interface across different database engines.

class core_db.interfaces.base.IDatabaseClient(**kwargs)[source]#

Bases: IFactory, ABC

Abstract base class for all database clients.

This class provides a standardized interface for connecting to and interacting with various database engines. It implements the context manager protocol for automatic resource management and integrates with the factory pattern for dynamic client instantiation.

TYPE_MAPPER: Dict[Any, str] = {}#
__init__(**kwargs) None[source]#

Initialize the database client with connection parameters.

Parameters:

kwargs – Arbitrary keyword arguments representing database-specific connection parameters (e.g., host, port, database, user, password).

connect_fcn: Callable[[...], Any] | None#
classmethod registration_key() str[source]#

Get the registration key for the factory pattern. :returns: The class name used as the registration key.

connect() None[source]#

Establish a connection to the database. Uses the connection function and parameters provided during initialization to create a connection to the database engine.

Raises:

DatabaseClientException: If the connection fails for any reason.

abstractmethod test_connection(query: Any)[source]#

Test the database connection by executing a simple query.

Parameters:

(Any) (query) – A simple test query to verify the connection is working (e.g., “SELECT 1” for most databases).

close() None[source]#

Close the database connection and release resources. This method safely closes the connection if one exists, releasing any database resources held by the client.

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {'Db2Client': <class 'core_db.engines.db2.Db2Client'>, 'MongoClient': <class 'core_db.engines.mongo.MongoClient'>, 'MySQLClient': <class 'core_db.engines.mysql.MySQLClient'>, 'OracleClient': <class 'core_db.engines.oracle.OracleClient'>, 'PostgresClient': <class 'core_db.engines.postgres.PostgresClient'>, 'SnowflakeClient': <class 'core_db.engines.snowflake_.SnowflakeClient'>}#
exception core_db.interfaces.base.DatabaseClientException[source]#

Bases: Exception

Custom exception for database client operations. Raised when a database operations fail, including connection errors, query execution errors, or other database-related issues.

class core_db.interfaces.sql_based.ISqlDatabaseClient(**kwargs)[source]#

Bases: IDatabaseClient, ABC

Abstract base class for SQL-based database clients.

This class extends IDatabaseClient to provide SQL-specific functionality including parameterized query execution, CRUD operations, and batch data manipulation. It implements security measures to prevent SQL injection and provides standardized methods for common database operations.

Key Features:#

  • Parameterized queries: All DML methods use placeholders to prevent SQL injection.

  • Batch operations: Efficient batch inserts with configurable chunk sizes.

  • Type mapping: Python-to-SQL type conversion for DDL generation.

  • Column validation: Automatic validation of column names against injection patterns.

  • Context manager support: Automatic commit and connection cleanup.

Usage:#

class MyDatabaseClient(ISqlDatabaseClient):
    PLACEHOLDER = "?"  # Override for database-specific placeholder

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self.connect_fcn = my_driver.connect

    @classmethod
    def get_merge_dml(cls, table_fqn, pk_ids, columns, records):
        # Implement database-specific MERGE/UPSERT logic
        pass

# Use the client
with MyDatabaseClient(host="localhost", database="mydb") as client:
    # Insert records
    client.insert_records(
        table_fqn="users",
        columns=["name", "email"],
        records=[{"name": "Alice", "email": "alice@example.com"}]
    )
    # Query data
    client.select("users", columns=["name", "email"])
    for record in client.fetch_records():
        print(record)

See Also:#

  • core_db.engines: Concrete implementations for specific databases

TYPE_MAPPER: Dict[Any, str] = {<class 'bool'>: 'BOOLEAN', <class 'dict'>: 'JSON', <class 'float'>: 'DOUBLE', <class 'int'>: 'INTEGER', <class 'list'>: 'JSON', <class 'str'>: 'TEXT'}#
VALID_IDENTIFIER = re.compile('^[a-zA-Z_][a-zA-Z0-9_]*$')#
PLACEHOLDER = '%s'#
__init__(**kwargs)[source]#

Initialize SQL database client with connection parameters.

Parameters:

kwargs – Database-specific connection parameters (e.g., host, port, database, user, password).

test_connection(query: Any = None)[source]#

Test the database connection by executing a version query.

Parameters:

query – Optional custom query to test connection. Defaults to version query.

Returns:

Query execution result.

Raises:

DatabaseClientException – If connection test fails.

execute(query: Any, **kwargs)[source]#

Execute a SQL query.

Parameters:
  • query – SQL query string to execute.

  • kwargs – Additional keyword arguments.

Returns:

Cursor execution result.

Raises:

DatabaseClientException – If there is no active connection or execution fails.

_execute(query: Any, **kwargs)[source]#

Internal method for executing queries with database-specific parameter handling. Override this method in subclasses if the database driver requires specific parameter passing conventions (e.g., positional vs keyword arguments).

Parameters:
  • query – SQL query string to execute.

  • kwargs – Additional keyword arguments including ‘params’ for parameter binding.

Returns:

Cursor execution result.

commit() None[source]#

Commit the current transaction to persist changes. :raises DatabaseClientException: If no active connection exists.

select(table_fqn: str, columns: List[str] | None = None)[source]#

Execute a SELECT query on the specified table.

Parameters:
  • table_fqn – Table’s fully qualified name.

  • columns – List of column names to select. If None, selects all columns (*).

Returns:

Cursor execution result.

Raises:

ValueError – If column names contain invalid characters.

classmethod validate_identifier(identifiers: Iterable[str]) None[source]#

Validate table or column names to prevent SQL injection attacks. Checks that all identifiers match the pattern for valid SQL identifiers:

  • Must start with letter or underscore

  • Can contain only alphanumeric characters and underscores

  • Full qualified names can contain a dot.

Parameters:

identifiers – Iterable of identifiers like column name strings to validate.

Raises:

ValueError – If any identifier contains invalid characters.

static _escape_string_value(value: str) str[source]#

Escape string values to prevent SQL injection. Escapes single quotes by doubling them (SQL standard).

Parameters:

value – The string value to escape.

Returns:

Escaped string value.

classmethod get_select_ddl(table_fqn: str, columns: List[str] | None = None) str[source]#

Returns the DDL statement for a select.

Parameters:
  • table_fqn – Table’s fully qualified name.

  • columns – List of column names to select. If None, selects all columns (*).

Returns:

SELECT SQL statement.

Raises:

ValueError – If column names contain invalid characters (potential SQL injection).

columns()[source]#

Get column names from the current cursor. :return: List of column names, or empty list if cursor is None.

fetch_record() Dict[str, Any][source]#

Fetch a single record as a dictionary with column names as keys. :return: Dictionary with column names as keys and row values, or None if no record.

fetch_one() Tuple[source]#

Fetch a single record as a tuple. :return: Tuple containing row values.

fetch_records() Iterator[Dict[str, Any]][source]#

Fetch all records as an iterator of dictionaries. Converts fetchall tuples into dictionaries with column names as keys.

Returns:

Iterator yielding dictionaries with column names as keys.

fetch_all() Iterator[Tuple][source]#

Fetch all records as an iterator of tuples. :return: Iterator yielding tuples containing row values.

classmethod get_create_table_ddl(table_fqn: str, columns: List[Tuple[str, Any]], temporal: bool = False, primary_keys: List[str] | None = None, unique_columns: List[str] | None = None, not_null_columns: List[str] | None = None) str[source]#

Generate the SQL CREATE TABLE statement.

Parameters:
  • table_fqn – Table’s fully qualified name.

  • columns – List of tuples defining the column name and data type.

  • temporal – Whether to create a temporary table. Defaults to False.

  • primary_keys – Column names to include in the PRIMARY KEY constraint.

  • unique_columns – Column names to include in the UNIQUE constraint.

  • not_null_columns – Column names that should have a NOT NULL constraint.

Returns:

The CREATE TABLE SQL statement.

insert_records(table_fqn: str, columns: List[str], records: List[Dict], records_per_request: int = 500) int[source]#

Insert a batch of records into a table using parameterized queries. Automatically manages batching to avoid memory issues with large datasets.

Parameters:
  • table_fqn – Table’s fully qualified name (FQN).

  • columns – List of column names to insert into.

  • records – List of dictionaries representing records to insert.

  • records_per_request – Number of records to insert per batch. Defaults to 500.

Returns:

Total number of inserted records.

Raises:

DatabaseClientException – If insertion fails.

classmethod get_insert_dml(table_fqn: str, columns: List[str], records: List[Dict]) Tuple[str, tuple][source]#

Generate a parameterized INSERT statement with multi-row VALUES. Uses parameter binding to prevent SQL injection attacks.

Parameters:
  • table_fqn – Table’s fully qualified name (FQN).

  • columns – List of column names to insert into.

  • records – List of dictionaries representing records to insert.

Returns:

Tuple of (query string with placeholders, flattened parameter tuple).

Raises:

ValueError – If column names contain invalid characters.

classmethod get_delete_dml(table_fqn: str, *, pk_id: str | None = None, ids: List | None = None) Tuple[str, Tuple][source]#
classmethod get_delete_dml(table_fqn: str, *, pk_id: str | None = None, conditionals: List[Dict] | None = None) Tuple[str, Tuple]

Generate a parameterized DELETE statement with placeholders. Uses parameter binding to prevent SQL injection attacks.

Parameters:
  • table_fqn – Table’s fully qualified name.

  • pk_id – Primary key column name for IN clause deletion.

  • ids – List of ID values to delete (used with pk_id).

  • conditionals – List of dictionaries with conditional criteria for WHERE clause.

Returns:

Tuple of (query string with placeholders, list of parameter values).

classmethod _get_conditional_statements(conditionals: List[Dict] | None = None) Tuple[List, List][source]#

Generate parameterized WHERE clause components from conditional dictionaries. Each dictionary in conditionals represents an OR condition, with keys as column names and values as comparison values. Keys within a dictionary are combined with AND.

Parameters:

conditionals – List of dictionaries with column:value pairs.

Returns:

Tuple of (list of condition strings, list of parameter values).

Example:#

>>> conditionals = [{"name": "Alice", "age": 30}, {"status": "active"}]
>>> # Generates: WHERE (name = ? AND age = ?) OR (status = ?)
_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {'Db2Client': <class 'core_db.engines.db2.Db2Client'>, 'MySQLClient': <class 'core_db.engines.mysql.MySQLClient'>, 'OracleClient': <class 'core_db.engines.oracle.OracleClient'>, 'PostgresClient': <class 'core_db.engines.postgres.PostgresClient'>, 'SnowflakeClient': <class 'core_db.engines.snowflake_.SnowflakeClient'>}#
abstractmethod classmethod get_merge_dml(*args, **kwargs) Tuple[str, Tuple][source]#

Generate the MERGE/UPSERT statement. This is an abstract method that must be implemented by concrete classes, as each database engine may require specific syntax for merge operations.

Parameters:
  • args – Positional arguments specific to the implementation.

  • kwargs – Keyword arguments specific to the implementation.

Returns:

Tuple of (MERGE/UPSERT statement string, tuple of parameters).

close() None[source]#

Close the database connection after committing pending changes. This method automatically commits any pending transactions before closing the connection to ensure data persistence.

Database-Based ETL Module#

This module provides abstract base classes for building ETL (Extract, Transform, Load) processes that extract data from database sources. It extends the core_etl framework to provide specialized database connectivity and query execution capabilities.

class core_db.etls.database_based.IBaseEtlFromDatabase(database_type: str, connection_parameters: Dict, base_query: str | None = None, batch_size: int = 100, **kwargs)[source]#

Bases: IBaseEtlFromRecord, ABC

Abstract base class for ETL processes that extract data from databases. This class extends IBaseEtlFromRecord to provide specialized functionality for retrieving data from database sources. It manages database connections, executes queries, and processes records in configurable batches.

The class handles the complete lifecycle of database-based ETL operations:
  • Connection establishment in pre_processing.

  • Query execution and data retrieval.

  • Batch-based record processing.

  • Connection cleanup in clean_resources.

database_type#

The class name of the database client to use.

Type:

str

connection_parameters#

Parameters for establishing database connection.

Type:

Dict

db_client#

Active database client instance.

Type:

Optional[IDatabaseClient]

base_query#

Base SQL query template for data retrieval.

Type:

Optional[str]

Usage:#

class MyETL(IBaseEtlFromDatabase):
    def _execute_query(self, query):
        self.db_client.execute(query)

    def _fetch_records(self):
        return self.db_client.fetch_records()

    def process_records(self, records, **kwargs):
        # Transform and load records
        for record in records:
            self.transform_and_load(record)

# Execute the ETL
etl = MyETL(
    database_type="PostgresClient",
    connection_parameters={"conninfo": "postgresql://..."},
    base_query="SELECT * FROM source_table",
    batch_size=1000
)
etl.run()
__init__(database_type: str, connection_parameters: Dict, base_query: str | None = None, batch_size: int = 100, **kwargs) None[source]#
Parameters:
  • database_type – The name of the class that defines the database connection.

  • connection_parameters – The parameters to create the database connection.

  • base_query – Query base to use when retrieving data.

  • batch_size – Size of the batch per database operation.

pre_processing() None[source]#

Initialize database connection before ETL processing begins. This method is called automatically before the ETL process starts. It:

  1. Retrieves the appropriate database client class using the factory pattern

  2. Instantiates the client with provided connection parameters

  3. Establishes the database connection

Parameters:

kwargs – Additional keyword arguments passed to parent pre_processing.

Raises:

DatabaseClientException – If connection fails or database_type is invalid.

get_query(*args, **kwargs) str | None[source]#

Generate or return the SQL query for data retrieval. Override this method to implement dynamic query generation based on ETL parameters like date ranges, last processed values, or other criteria.

Parameters:
  • args – Positional arguments for query generation.

  • kwargs – Keyword arguments (last_processed, start, end, etc.).

Returns:

SQL query string, or None if no query is needed.

retrieve_records(last_processed: Any = None, start: Any = None, end: Any = None, **kwargs) Iterator[List[Dict]][source]#

Retrieve records from the database in batches. This method orchestrates the data retrieval process by:

  1. Generating the appropriate query using get_query().

  2. Executing the query via _execute_query().

  3. Fetching records via _fetch_records().

  4. Batching records according to batch_size.

  5. Yielding batches for processing.

Parameters:
  • last_processed – Identifier of the last processed record for incremental loads.

  • start – Start boundary for data extraction (timestamp, ID, etc.).

  • end – End boundary for data extraction (timestamp, ID, etc.).

  • kwargs – Additional query parameters.

Yield:

Batches of records as lists of dictionaries.

abstractmethod _execute_query(query: Any)[source]#

Execute the SQL query on the database connection. Concrete implementations must define this method to handle database-specific query execution. This typically involves calling the appropriate method on self.db_client.

Parameters:

query – SQL query string to execute.

abstractmethod _fetch_records() Iterator[Dict][source]#

Fetch records from the executed query result. Concrete implementations must define this method to retrieve records from the database cursor or result set. This typically returns an iterator from the database client.

Returns:

Iterator yielding dictionaries with column names as keys.

process_records(records: List[Dict], **kwargs)[source]#

Process a batch of transformed records. Concrete implementations must define the load operations to perform with the transformed records. Common actions include:

  • Loading to target database.

  • Archiving to S3 or cloud storage.

  • Sending to message queues (SQS, Kinesis, Kafka).

  • Writing to files or SFTP servers.

  • Sending to APIs or webhooks.

Parameters:
  • records – List of dictionaries representing processed records.

  • kwargs – Additional parameters for processing.

clean_resources() None[source]#

Clean up database connection and resources after ETL completion. This method is called automatically at the end of the ETL process to ensure proper cleanup. It safely closes the database connection if one exists.

_abc_impl = <_abc._abc_data object>#
_impls: Dict[str, Type[Self]] = {}#