Engines#
IBM DB2#
IBM DB2 Database Client Module#
This module provides the Db2Client class for connecting to and interacting with IBM DB2 databases using the ibm_db library.
- class core_db.engines.db2.Db2Client(dsn: str, user: str = '', password: str = '', **kwargs)[source]#
Bases:
ISqlDatabaseClientClient for IBM DB2 database connection…
How to use#
from core_db.engines.db2 import Db2Client dsn_hostname, dsn_port, dsn_database = "localhost", "50000", "sample" dsn_uid, dsn_pwd = "db2inst1", "SomePassword" dsn = ( f"DATABASE={dsn_database};" f"HOSTNAME={dsn_hostname};" f"PORT={dsn_port};" f"PROTOCOL=TCPIP;" f"UID={dsn_uid};" f"PWD={dsn_pwd};") with Db2Client(dsn=dsn, user="", password="") as client: client.execute("select * from department FETCH FIRST 2 ROWS ONLY;") print(client.fetch_one()) print(client.fetch_record())
- PLACEHOLDER = '?'#
- __init__(dsn: str, user: str = '', password: str = '', **kwargs) None[source]#
Initialize SQL database client with connection parameters.
- Parameters:
kwargs – Database-specific connection parameters (e.g., host, port, database, user, password).
- property cursor#
IBM DB2 uses ‘statement’ instead of ‘cursor’. This property provides compatibility with the base class. Returns a wrapper object that mimics cursor behavior.
- connect() None[source]#
Establish connection to IBM DB2 database. Uses the DSN (Data Source Name) string along with user credentials to create a database connection using ibm_db.connect().
- Raises:
DatabaseClientException – If connection fails.
- test_connection(query: str | None = None) Any[source]#
Test the database connection by executing a simple query.
- Parameters:
query – Optional custom query to test. Defaults to querying DB2 system information.
- Returns:
Result of the query execution.
- execute(query: str, **kwargs) Any[source]#
Execute SQL query with optional parameter binding. Uses ibm_db.prepare and ibm_db.execute for parameterized queries with parameter binding. For queries without parameters, uses ibm_db.exec_immediate for direct execution.
- Parameters:
query – SQL query string to execute.
kwargs – Optional keyword arguments. Supports ‘params’ for parameter binding.
- Raises:
DatabaseClientException – If no connection exists or query execution fails.
- commit() None[source]#
Commit the current transaction. :raises DatabaseClientException: If no active connection exists.
- fetch_record() Dict[str, Any][source]#
Fetch the next row as a dictionary with column names as keys. :return: Dictionary representing a single row, or empty dict if no more rows.
- fetch_records() Iterator[Dict[str, Any]][source]#
Fetch all remaining rows as an iterator of dictionaries. :return: Iterator yielding dictionaries with column names as keys.
- fetch_all() Iterator[Tuple][source]#
Fetch all remaining rows as an iterator of tuples. :return: Iterator yielding tuples representing rows.
- classmethod get_merge_dml(table_fqn: str, pk_ids: List[str], columns: List[str], records: List[Dict]) Tuple[str, Tuple][source]#
Generate parameterized MERGE statement for IBM DB2. Uses parameter binding to prevent SQL injection attacks.
- Parameters:
table_fqn – Table’s fully qualified name.
pk_ids – List of primary key column names.
columns – List of column names.
records – List of dictionaries representing records.
- Returns:
Tuple of (query string with placeholders, tuple of parameter values).
- Raises:
ValueError – If column names contain invalid characters.
- _abc_impl = <_abc._abc_data object>#
MongoDB#
MongoDB Database Client Module#
This module provides the MongoClient class for connecting to and interacting with MongoDB databases using the pymongo library.
- class core_db.engines.mongo.MongoClient(**kwargs)[source]#
Bases:
IDatabaseClientClient for MongoDB connection…
How to use#
from core_db.engines.mongo import MongoClient client = MongoClient(host="host", database="db") client.connect() print(client.test_connection())
- __init__(**kwargs)[source]#
Initialize MongoDB client.
- Parameters:
kwargs – Connection parameters. Must include ‘database’ parameter. Optional: host, port, username, password, and other MongoDB connection options.
- connect() None[source]#
Establish connection to MongoDB database. Creates a connection using PyMongoClient and selects the specified database.
- Raises:
DatabaseClientException – If connection fails or database name is not provided.
- test_connection(query: str | None = None, session: ClientSession | None = None)[source]#
Test the database connection by retrieving server information. Executes MongoDB’s server_info() command to verify connectivity and retrieve server metadata including version and configuration.
- Parameters:
query – Unused parameter (kept for interface compatibility).
session – Optional ClientSession for the operation.
- Returns:
Dictionary containing MongoDB server information.
- Raises:
DatabaseClientException – If no active connection exists.
- find_one(collection_name: str, filters: Dict | None = None, projection: Dict | None = None, *args, **kwargs) Dict[source]#
Get a single document from the database. All arguments to find() are also valid arguments for find_one(), although any limit argument will be ignored. Returns a single document, or None if no matching document is found.
- Parameters:
collection_name – The collection to which you want to add documents.
filters – A dictionary specifying the query to be performed OR any other type to be used as the value for a query for “_id”.
projection – A list of field names that should be returned in the result set or a dict specifying the fields to include or exclude. If projection is a list “_id” will always be returned. Use a dict to exclude fields from the result (e.g. projection={‘_id’: False}).
args: Any additional positional arguments are the same as the arguments to find(). kwargs: Any additional keyword arguments are the same as the arguments to find().
- find(collection_name: str, filters: Dict | None = None, projection: Dict | None = None, *args, **kwargs) Iterator[source]#
Query the database. The filters argument is a query document that all results must match…
- Parameters:
collection_name – The collection to which you want to add documents.
filters – A dictionary specifying the query to be performed OR any other type to be used as the value for a query for “_id”.
projection – A list of field names that should be returned in the result set or a dict specifying the fields to include or exclude. If projection is a list “_id” will always be returned. Use a dict to exclude fields from the result (e.g. projection={‘_id’: False}).
For args/kwargs you can check the documentation.
- insert_one(collection_name: str, document: Dict, bypass_document_validation: bool = False, session: ClientSession | None = None, comment: str | None = None) InsertOneResult[source]#
This is a method by which we can insert a single entry within the collection or the database in MongoDB. If the collection does not exist this method creates a new collection and insert the data into it. It takes a dictionary as a parameter containing the name and value of each field in the document you want to insert in the collection.
- Parameters:
collection_name – The collection to which you want to add documents.
document – The document to insert. Must be a mutable mapping type. If the document does not have an _id field one will be added automatically.
bypass_document_validation – If “True”, allows the write to opt-out of document level validation. Default is “False”.
session – A class ‘~pymongo.client_session.ClientSession’.
comment – A user-provided comment to attach to this command.
- insert_many(collection_name: str, documents: List[Dict], ordered: bool = True, bypass_document_validation: bool = False, session: ClientSession | None = None) InsertManyResult[source]#
This method is used to insert multiple entries in a collection or the database in MongoDB. The parameter of this method is a list that contains dictionaries of the data that we want to insert in the collection.
This method returns an instance of class “~pymongo.results.InsertManyResult” which has a “_id” field that holds the id of the inserted documents. If the document does not specify an “_id” field, then MongoDB will add the “_id” field to all the data in the list and assign a unique object id for the documents before inserting.
- Parameters:
collection_name – The collection to which you want to add documents.
documents – A iterable of documents to insert.
ordered – If “True” (the default) documents will be inserted on the server serially, in the order provided. If an error occurs all remaining inserts are aborted. If “False”, documents will be inserted on the server in arbitrary order, possibly in parallel, and all document inserts will be attempted.
bypass_document_validation – If “True”, allows the write to opt-out of document level validation. Default is “False”.
session – A class ‘~pymongo.client_session.ClientSession’.
- delete_one(collection_name: str, filter_query: Dict, collation: Collation | None = None, hint=None, session: ClientSession | None = None, let: Mapping[str, Any] | None = None, comment: Any | None = None) DeleteResult[source]#
To remove one document from the collection…
- Parameters:
collection_name – The collection to which you want to add documents.
filter_query – A query that matches the document to delete.
collation – An instance of class: ‘~pymongo.collation.Collation’. This option is only supported on MongoDB 3.4 and above.
hint – An index to use to support the query predicate. This option is only supported on MongoDB 3.11 and above.
session – A class ‘~pymongo.client_session.ClientSession’.
let – Map of parameter names and values. Values must be constant or closed expressions that do not reference document fields. Parameters can then be accessed as variables in an aggregate expression context (e.g. “$$var”).
comment – A user-provided comment to attach to this command.
- delete_many(collection_name: str, filter_query: Dict, collation: Collation | None = None, hint=None, session: ClientSession | None = None, let: Mapping[str, Any] | None = None, comment: Any | None = None) DeleteResult[source]#
It is used when one needs to delete more than one document. A query object containing which document to be deleted is created and is passed as the first parameter to the delete_many().
- Parameters:
collection_name – The collection to which you want to add documents.
filter_query – A query that matches the document to delete.
collation – An instance of class: ‘~pymongo.collation.Collation’. This option is only supported on MongoDB 3.4 and above.
hint – An index to use to support the query predicate. This option is only supported on MongoDB 3.11 and above.
session – A class ‘~pymongo.client_session.ClientSession’.
let – Map of parameter names and values. Values must be constant or closed expressions that do not reference document fields. Parameters can then be accessed as variables in an aggregate expression context (e.g. “$$var”).
comment – A user-provided comment to attach to this command.
- _abc_impl = <_abc._abc_data object>#
MsSQL#
MySQL#
MySQL Database Client Module#
This module provides the MySQLClient class for connecting to and interacting with MySQL databases using the pymysql library.
- class core_db.engines.mysql.MySQLClient(**kwargs)[source]#
Bases:
ISqlDatabaseClientMySQL database client with parameterized query support. This client provides secure database operations using parameterized queries to prevent SQL injection attacks. It supports standard CRUD operations, batch inserts, upserts (MERGE), and secure SELECT/DELETE operations.
Usage Examples#
Basic Connection and Query:#
from core_db.engines.mysql import MySQLClient config = { "host": "localhost", "database": "test_database", "user": "root", "password": "password" } with MySQLClient(**config) as client: # Test connection client.execute("SELECT VERSION() AS version;") print(client.fetch_one()[0])
Insert Records (Batch Insert):#
columns = ["first_name", "last_name", "age", "email", "birthdate"] records = [ { "first_name": "John", "last_name": "Doe", "age": 30, "email": "john.doe@example.com", "birthdate": "1994-05-15" }, { "first_name": "Jane", "last_name": "Smith", "age": 25, "email": "jane.smith@example.com", "birthdate": "2000-05-15" } ] with MySQLClient(**config) as client: count = client.insert_records( table_fqn="people", columns=columns, records=records) client.commit() print(f"Inserted {count} records")
Select Records:#
with MySQLClient(**config) as client: client.select("people", columns=["first_name", "last_name", "age"]) for record in client.fetch_records(): print(record)
Delete Records (Conditional):#
with MySQLClient(**config) as client: query, params = client.get_delete_dml( "people", conditionals=[{"first_name": "Jane"}]) client.execute(query, params=params) client.commit()
Upsert/Merge Records (INSERT … ON DUPLICATE KEY UPDATE):#
with MySQLClient(**config) as client: query, params = client.get_merge_dml( table_fqn="people", columns=columns, records=[ { "first_name": "John", "last_name": "Doe", "age": 35, # Updated age "email": "john.doe@example.com", "birthdate": "1994-05-15" } ]) client.execute(query, params=params) client.commit()
Security Features:#
All DML methods use parameterized queries with placeholders (%s)
Column names are validated against SQL injection patterns
Uses pymysql’s built-in parameter binding
Multi-row INSERT for better performance
- _execute(query: Any, **kwargs)[source]#
Execute query with parameter binding support for MySQL. Handles pymysql’s specific parameter passing requirement where parameters are passed via the ‘args’ keyword argument.
- Parameters:
query – SQL query string to execute.
kwargs – Optional keyword arguments. Supports ‘params’ for parameter binding.
- Returns:
Cursor object after execution.
- classmethod get_merge_dml(table_fqn: str, columns: List[str], records: List[Dict], epoch_column: str | None = None) Tuple[str, Tuple][source]#
Generate parameterized MERGE/UPSERT statement for MySQL using INSERT … ON DUPLICATE KEY UPDATE. Uses parameter binding to prevent SQL injection attacks.
- Parameters:
table_fqn – Table’s fully qualified name (schema.table or just table).
columns – List of column names.
records – List of dictionaries representing records.
epoch_column – If specified, only update if this timestamp column is newer.
- Returns:
Tuple of (query string with placeholders, flattened parameter tuple).
- Raises:
ValueError – If column names contain invalid characters.
- _abc_impl = <_abc._abc_data object>#
Oracle#
Oracle Database Client Module#
This module provides the OracleClient class for connecting to and interacting with Oracle databases using the oracledb library.
- class core_db.engines.oracle.OracleClient(**kwargs)[source]#
Bases:
ISqlDatabaseClientClient for Oracle connection…
How to use#
from core_db.engines.oracle import OracleClient with OracleClient(user="...", password="...", dsn=f"{host}:{port}/{service_name}") as client: res = client.execute("SELECT * FROM ...") for x in client.fetch_all(): print(x)
- test_connection(query: str | None = None)[source]#
Test the database connection by executing a simple query.
- Parameters:
query – Optional custom query to test. Defaults to querying Oracle version.
- Returns:
Result of the query execution.
- static _convert_value(value: Any) Any[source]#
Convert Python values to Oracle-compatible types.
- Parameters:
value – The value to convert.
- Returns:
Converted value.
- _execute(query: Any, **kwargs)[source]#
Override execute to handle Oracle’s parameter format requirements, because Oracle’s oracledb driver expects parameters as a list passed as the second positional argument, not as a keyword argument.
- insert_records(table_fqn: str, columns: List[str], records: List[Dict], records_per_request: int = 500) int[source]#
Insert records using Oracle’s executemany for better performance and proper type handling.
- Parameters:
table_fqn – Table’s fully qualified name (FQN).
columns – List of column names to insert into.
records – List of dictionaries representing records to insert.
records_per_request – Number of records to insert per batch.
- Returns:
Total number of inserted records.
- Raises:
DatabaseClientException – If insertion fails.
- classmethod _get_conditional_statements(conditionals: List[Dict] | None = None) Tuple[List, List][source]#
Helper function to generate the conditions and params and reuse it into other implementations. Override if required by a specific engine.
- classmethod get_insert_dml(table_fqn: str, columns: List[str], records: List[Dict]) Tuple[str, Tuple][source]#
Generate a parameterized INSERT statement for Oracle. Uses Oracle’s named parameter syntax (:1, :2, etc.) and parameter binding to prevent SQL injection attacks.
- Parameters:
table_fqn – Table’s fully qualified name (FQN).
columns – List of column names to insert into.
records – List of dictionaries representing records to insert.
- Returns:
Tuple of (query string with placeholders, flattened parameter tuple).
- Raises:
ValueError – If column names contain invalid characters.
- classmethod get_merge_dml(table_fqn: str, pk_ids: List[str], columns: List[str], records: List[Dict]) Tuple[str, Tuple][source]#
Generate parameterized MERGE statement for Oracle. Uses parameter binding to prevent SQL injection attacks.
- Parameters:
table_fqn – Table’s fully qualified name.
pk_ids – List of primary key column names.
columns – List of column names.
records – List of dictionaries representing records.
- Returns:
Tuple of (query string with placeholders, list of parameter tuples).
- Raises:
ValueError – If column names contain invalid characters.
- _abc_impl = <_abc._abc_data object>#
Postgres#
PostgreSQL Database Client Module#
This module provides the PostgresClient class for connecting to and interacting with PostgreSQL databases using the psycopg library.
- class core_db.engines.postgres.PostgresClient(**kwargs)[source]#
Bases:
ISqlDatabaseClientPostgreSQL database client with parameterized query support. This client provides secure database operations using parameterized queries to prevent SQL injection attacks. It supports standard CRUD operations, batch inserts, upserts (MERGE), and secure SELECT/DELETE operations.
Usage Examples#
Basic Connection and Query:#
from core_db.engines.postgres import PostgresClient conninfo = "postgresql://user:password@localhost:5432/database" with PostgresClient(conninfo=conninfo) as client: # Test connection client.execute("SELECT version() AS version;") print(client.fetch_one()[0])
Insert Records (Batch Insert):#
columns = ["first_name", "last_name", "age", "email", "birthdate"] records = [ { "first_name": "John", "last_name": "Doe", "age": 30, "email": "john.doe@example.com", "birthdate": "1994-05-15" }, { "first_name": "Jane", "last_name": "Smith", "age": 25, "email": "jane.smith@example.com", "birthdate": "2000-05-15" } ] with PostgresClient(conninfo=conninfo) as client: count = client.insert_records( table_fqn="people", columns=columns, records=records) client.commit() print(f"Inserted {count} records")
Select Records:#
with PostgresClient(conninfo=conninfo) as client: client.select("people", columns=["first_name", "last_name", "age"]) for record in client.fetch_records(): print(record)
Delete Records (Conditional):#
with PostgresClient(conninfo=conninfo) as client: query, params = client.get_delete_dml( "people", conditionals=[ {"first_name": "Jane"}, {"last_name": "Doe"} ]) client.execute(query, params=params) client.commit()
Upsert/Merge Records (INSERT … ON CONFLICT):#
with PostgresClient(conninfo=conninfo) as client: query, params = client.get_merge_dml( table_fqn="people", pk_ids=["first_name"], # Must match a UNIQUE constraint columns=columns, records=[ { "first_name": "John", "last_name": "Doe", "age": 35, # Updated age "email": "john.doe@example.com", "birthdate": "1994-05-15" } ]) client.execute(query, params=params) client.commit()
Security Features:#
All DML methods use parameterized queries with placeholders (%s)
Column names are validated against SQL injection patterns
Uses psycopg’s built-in parameter binding
Multi-row INSERT for better performance
- TYPE_MAPPER: Dict[Any, str] = {<class 'bool'>: 'BOOLEAN', <class 'dict'>: 'JSONB', <class 'float'>: 'DOUBLE PRECISION', <class 'int'>: 'INTEGER', <class 'list'>: 'JSONB', <class 'str'>: 'TEXT'}#
- __init__(**kwargs) None[source]#
Initialize PostgreSQL client.
- Parameters:
kwargs – Connection parameters including ‘conninfo’ (connection string). Example: “postgresql://user:password@localhost:5432/database”
- classmethod get_merge_dml(table_fqn: str, pk_ids: List[str], columns: List[str], records: List[Dict]) Tuple[str, Tuple][source]#
Generate parameterized MERGE/UPSERT statement for PostgresSQL using INSERT … ON CONFLICT. Uses parameter binding to prevent SQL injection attacks.
- Parameters:
table_fqn – Table’s fully qualified name.
pk_ids – List of primary key column names.
columns – List of column names.
records – List of dictionaries representing records.
- Returns:
Tuple of (query string with placeholders, list of parameter tuples).
- Raises:
ValueError – If column names contain invalid characters.
- _abc_impl = <_abc._abc_data object>#
Snowflake#
Snowflake Data Warehouse Client Module#
This module provides the SnowflakeClient class for connecting to and interacting with Snowflake Data Warehouse using the snowflake-connector-python library.
- class core_db.engines.snowflake_.SnowflakeClient(**kwargs)[source]#
Bases:
ISqlDatabaseClientClient for Snowflake Data Warehouse connection. This client provides secure database operations using parameterized queries and input validation to prevent SQL injection attacks. It supports standard CRUD operations, batch inserts, and upserts (MERGE).
Usage Examples#
Basic Connection and Query:#
from core_db.engines.snowflake_ import SnowflakeClient config = { "user": "username", "password": "password", "account": "account_name", "warehouse": "warehouse_name", "database": "database_name", "schema": "schema_name" } with SnowflakeClient(**config) as client: client.execute("SELECT CURRENT_VERSION();") print(client.fetch_one())
Security Features:#
Validates all SQL identifiers (table/column names) against injection patterns
Escapes string values to prevent SQL injection
Supports parameterized queries where applicable
Validates fully qualified names (schema.table)
- TYPE_MAPPER: Dict[Any, str] = {<class 'bool'>: 'BOOLEAN', <class 'dict'>: 'OBJECT', <class 'float'>: 'DOUBLE', <class 'int'>: 'INTEGER', <class 'list'>: 'VARIANT', <class 'str'>: 'VARCHAR'}#
- VALID_IDENTIFIER = re.compile('^[a-zA-Z_][a-zA-Z0-9_]*(\\.[a-zA-Z_][a-zA-Z0-9_]*)*$')#
- __init__(**kwargs) None[source]#
- Parameters:
kwargs –
user: Username.
host: Hostname.
account: Account name.
password: Password.
warehouse: Warehouse.
database: Database.
schema: Schema.
role: Role.
To connect using OAuth, the connection string must include the authenticator parameter set to oauth and the token parameter set to the oauth_access_token. https://docs.snowflake.com/en/user-guide/python-connector-example.html#connecting-with-oauth
:param authenticator=”oauth” :param token=”oauth_access_token”
- test_connection(query: str | None = None)[source]#
Test the database connection by executing a simple query.
- Parameters:
query – Optional custom query to test. Defaults to querying Snowflake version.
- Returns:
Result of the query execution.
- classmethod get_insert_dml(table_fqn: str, columns: List, records: List[Dict])[source]#
Generate a parameterized INSERT statement with multi-row VALUES. Uses parameter binding to prevent SQL injection attacks.
- Parameters:
table_fqn – Table’s fully qualified name (FQN).
columns – List of column names to insert into.
records – List of dictionaries representing records to insert.
- Returns:
Tuple of (query string with placeholders, flattened parameter tuple).
- Raises:
ValueError – If column names contain invalid characters.
- _abc_impl = <_abc._abc_data object>#
- classmethod get_merge_dml(target: str, columns: List[str], pk_ids: List[str], records: List[Dict], source: str | None = None, epoch_column: str | None = None) Tuple[str, Tuple][source]#
- classmethod get_merge_dml(target: str, columns: List[str], pk_ids: List[str], records: List[Dict] | None = None, source: str | None = None, epoch_column: str | None = None) Tuple[str, Tuple]
Generate the MERGE/UPSERT statement. This is an abstract method that must be implemented by concrete classes, as each database engine may require specific syntax for merge operations.
- Parameters:
args – Positional arguments specific to the implementation.
kwargs – Keyword arguments specific to the implementation.
- Returns:
Tuple of (MERGE/UPSERT statement string, tuple of parameters).