Source code for core_db.engines.mysql

# -*- coding: utf-8 -*-

"""
MySQL Database Client Module
==============================

This module provides the MySQLClient class for connecting to and interacting
with MySQL databases using the pymysql library.
"""

import json
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple

import pymysql  # type: ignore

from core_db.interfaces import DatabaseClientException
from core_db.interfaces.sql_based import ISqlDatabaseClient


[docs] class MySQLClient(ISqlDatabaseClient): """ MySQL database client with parameterized query support. This client provides secure database operations using parameterized queries to prevent SQL injection attacks. It supports standard CRUD operations, batch inserts, upserts (MERGE), and secure SELECT/DELETE operations. =================================================== Usage Examples =================================================== Basic Connection and Query: ---------------------------- .. code-block:: python from core_db.engines.mysql import MySQLClient config = { "host": "localhost", "database": "test_database", "user": "root", "password": "password" } with MySQLClient(**config) as client: # Test connection client.execute("SELECT VERSION() AS version;") print(client.fetch_one()[0]) .. Insert Records (Batch Insert): ------------------------------- .. code-block:: python columns = ["first_name", "last_name", "age", "email", "birthdate"] records = [ { "first_name": "John", "last_name": "Doe", "age": 30, "email": "john.doe@example.com", "birthdate": "1994-05-15" }, { "first_name": "Jane", "last_name": "Smith", "age": 25, "email": "jane.smith@example.com", "birthdate": "2000-05-15" } ] with MySQLClient(**config) as client: count = client.insert_records( table_fqn="people", columns=columns, records=records) client.commit() print(f"Inserted {count} records") .. Select Records: --------------- .. code-block:: python with MySQLClient(**config) as client: client.select("people", columns=["first_name", "last_name", "age"]) for record in client.fetch_records(): print(record) .. Delete Records (Conditional): ----------------------------- .. code-block:: python with MySQLClient(**config) as client: query, params = client.get_delete_dml( "people", conditionals=[{"first_name": "Jane"}]) client.execute(query, params=params) client.commit() .. Upsert/Merge Records (INSERT ... ON DUPLICATE KEY UPDATE): ----------------------------------------------------------- .. code-block:: python with MySQLClient(**config) as client: query, params = client.get_merge_dml( table_fqn="people", columns=columns, records=[ { "first_name": "John", "last_name": "Doe", "age": 35, # Updated age "email": "john.doe@example.com", "birthdate": "1994-05-15" } ]) client.execute(query, params=params) client.commit() .. Security Features: ------------------ - All DML methods use parameterized queries with placeholders (%s) - Column names are validated against SQL injection patterns - Uses pymysql's built-in parameter binding - Multi-row INSERT for better performance """
[docs] def __init__(self, **kwargs) -> None: """ Expected -> host, user, password, database More information: - https://pymysql.readthedocs.io/en/latest/user/index.html# - https://pypi.org/project/PyMySQL/ """ super().__init__(**kwargs) self.epoch_to_timestamp_fcn = "FROM_UNIXTIME" self.connect_fcn = pymysql.connect
[docs] def _execute(self, query: Any, **kwargs): """ Execute query with parameter binding support for MySQL. Handles pymysql's specific parameter passing requirement where parameters are passed via the 'args' keyword argument. :param query: SQL query string to execute. :param kwargs: Optional keyword arguments. Supports 'params' for parameter binding. :return: Cursor object after execution. """ if not self.cursor: raise DatabaseClientException("No active cursor!") args = kwargs params = kwargs.pop("params", None) if params: args["args"] = params return self.cursor.execute(query, **args)
[docs] @classmethod def get_merge_dml( cls, table_fqn: str, columns: List[str], records: List[Dict], epoch_column: Optional[str] = None, ) -> Tuple[str, Tuple]: """ Generate parameterized MERGE/UPSERT statement for MySQL using INSERT ... ON DUPLICATE KEY UPDATE. Uses parameter binding to prevent SQL injection attacks. :param table_fqn: Table's fully qualified name (schema.table or just table). :param columns: List of column names. :param records: List of dictionaries representing records. :param epoch_column: If specified, only update if this timestamp column is newer. :return: Tuple of (query string with placeholders, flattened parameter tuple). :raises ValueError: If column names contain invalid characters. """ if not records: return "", tuple() cls.validate_identifier(columns) if epoch_column and epoch_column not in columns: raise ValueError(f"epoch_column '{epoch_column}' must be in columns list") # Build multi-row VALUES clause with placeholders placeholders = ", ".join(["%s" for _ in columns]) values_rows = ", ".join([f"({placeholders})" for _ in records]) # Building `ON DUPLICATE KEY UPDATE` statement... if epoch_column: # Only update if the new epoch value is greater on_duplicate = [ f"`{col}` = IF(VALUES(`{epoch_column}`) > `{epoch_column}`, VALUES(`{col}`), `{col}`)" for col in columns ] else: on_duplicate = [f"`{col}` = VALUES(`{col}`)" for col in columns] # SQL construction is safe: columns are validated, values use placeholders... query = f""" INSERT INTO {table_fqn} ({', '.join([f'`{col}`' for col in columns])}) VALUES {values_rows} ON DUPLICATE KEY UPDATE {', '.join(on_duplicate)}""" # nosec B608 # Extract and flatten parameters in the correct order params = [] for record in records: params.extend([ json.dumps(record[col]) if type(record[col]) in [dict, list] else record[col] for col in columns ]) return query, tuple(params)