# -*- coding: utf-8 -*-
"""
MongoDB Database Client Module
================================
This module provides the MongoClient class for connecting to and interacting
with MongoDB databases using the pymongo library.
"""
from typing import Any
from typing import Dict
from typing import Iterator
from typing import List
from typing import Mapping
from typing import Optional
from pymongo import MongoClient as PyMongoClient
from pymongo.client_session import ClientSession
from pymongo.collation import Collation
from pymongo.results import DeleteResult
from pymongo.results import InsertManyResult
from pymongo.results import InsertOneResult
from core_db.interfaces.base import DatabaseClientException
from core_db.interfaces.base import IDatabaseClient
[docs]
class MongoClient(IDatabaseClient):
"""
Client for MongoDB connection...
===================================================
How to use
===================================================
.. code-block:: python
from core_db.engines.mongo import MongoClient
client = MongoClient(host="host", database="db")
client.connect()
print(client.test_connection())
..
"""
[docs]
def __init__(self, **kwargs):
"""
Initialize MongoDB client.
:param kwargs:
Connection parameters. Must include 'database' parameter.
Optional: host, port, username, password, and
other MongoDB connection options.
More information:
- https://pymongo.readthedocs.io/en/stable/
- https://www.mongodb.com/docs/manual/reference/connection-string/
"""
self.db_name = kwargs.pop("database", None)
super().__init__(**kwargs)
self.connect_fcn = PyMongoClient
self.db: Any = None
[docs]
def connect(self) -> None:
"""
Establish connection to MongoDB database. Creates a connection
using `PyMongoClient` and selects the specified database.
:raises DatabaseClientException: If connection fails or database name is not provided.
"""
super().connect()
if not self.cxn:
raise DatabaseClientException("Failed to establish connection")
if not self.db_name:
raise DatabaseClientException("Database name is required")
self.db = self.cxn[self.db_name]
[docs]
def test_connection(
self,
query: Optional[str] = None,
session: Optional[ClientSession] = None,
):
"""
Test the database connection by retrieving server
information. Executes MongoDB's `server_info()` command to
verify connectivity and retrieve server metadata
including version and configuration.
:param query: Unused parameter (kept for interface compatibility).
:param session: Optional ClientSession for the operation.
:return: Dictionary containing MongoDB server information.
:raises DatabaseClientException: If no active connection exists.
"""
if not self.cxn:
raise DatabaseClientException("No active connection")
return self.cxn.server_info(session=session)
[docs]
def find_one(
self,
collection_name: str,
filters: Optional[Dict] = None,
projection: Optional[Dict] = None,
*args,
**kwargs,
) -> Dict:
"""
Get a single document from the database. All arguments to find() are also
valid arguments for find_one(), although any limit argument will be ignored. Returns
a single document, or None if no matching document is found.
:param collection_name: The collection to which you want to add documents.
:param filters:
A dictionary specifying the query to be performed OR any other
type to be used as the value for a query for "_id".
:param projection:
A list of field names that should be returned in the result set or
a dict specifying the fields to include or exclude. If projection is
a list “_id” will always be returned. Use a dict to exclude fields
from the result (e.g. projection={‘_id’: False}).
args: Any additional positional arguments are the same as the arguments to find().
kwargs: Any additional keyword arguments are the same as the arguments to find().
"""
if not filters:
filters = {}
if not projection:
projection = {}
return getattr(self.db, collection_name).find_one(filters, projection, *args, **kwargs)
[docs]
def find(
self,
collection_name: str,
filters: Optional[Dict] = None,
projection: Optional[Dict] = None,
*args,
**kwargs,
) -> Iterator:
"""
Query the database. The filters argument is a query document
that all results must match...
https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.find
:param collection_name: The collection to which you want to add documents.
:param filters:
A dictionary specifying the query to be performed OR any other
type to be used as the value for a query for "_id".
:param projection:
A list of field names that should be returned in the result set or
a dict specifying the fields to include or exclude. If projection is
a list “_id” will always be returned. Use a dict to exclude fields
from the result (e.g. projection={‘_id’: False}).
For args/kwargs you can check the documentation.
"""
if not filters:
filters = {}
if not projection:
projection = {}
cursor = getattr(self.db, collection_name).find(filters, projection, *args, **kwargs)
for record in cursor:
yield record
[docs]
def insert_one(
self,
collection_name: str,
document: Dict,
bypass_document_validation: bool = False,
session: Optional[ClientSession] = None,
comment: Optional[str] = None,
) -> InsertOneResult:
"""
This is a method by which we can insert a single entry within
the collection or the database in MongoDB. If the collection does
not exist this method creates a new collection and insert the
data into it. It takes a dictionary as a parameter containing the
name and value of each field in the document you want to insert
in the collection.
:param collection_name: The collection to which you want to add documents.
:param document:
The document to insert. Must be a mutable mapping type. If the
document does not have an _id field one will be added automatically.
:param bypass_document_validation:
If “True”, allows the write to opt-out of document level
validation. Default is “False”.
:param session: A class ‘~pymongo.client_session.ClientSession’.
:param comment: A user-provided comment to attach to this command.
"""
return getattr(
self.db,
collection_name,
).insert_one(
document,
session=session,
bypass_document_validation=bypass_document_validation,
comment=comment,
)
[docs]
def insert_many(
self,
collection_name: str,
documents: List[Dict],
ordered: bool = True,
bypass_document_validation: bool = False,
session: Optional[ClientSession] = None,
) -> InsertManyResult:
"""
This method is used to insert multiple entries in a collection or the
database in MongoDB. The parameter of this method is a list that contains
dictionaries of the data that we want to insert in the collection.
This method returns an instance of class “~pymongo.results.InsertManyResult” which
has a “_id” field that holds the id of the inserted documents. If the document does not
specify an “_id” field, then MongoDB will add the “_id” field to all the data in the
list and assign a unique object id for the documents before inserting.
:param collection_name: The collection to which you want to add documents.
:param documents: A iterable of documents to insert.
:param ordered:
If “True” (the default) documents will be inserted on
the server serially, in the order provided. If an error
occurs all remaining inserts are aborted. If “False”, documents
will be inserted on the server in arbitrary order, possibly
in parallel, and all document inserts will be attempted.
:param bypass_document_validation:
If “True”, allows the write to opt-out of document level
validation. Default is “False”.
:param session: A class ‘~pymongo.client_session.ClientSession’.
"""
return getattr(
self.db,
collection_name,
).insert_many(
documents,
ordered=ordered,
bypass_document_validation=bypass_document_validation,
session=session,
)
[docs]
def delete_one(
self,
collection_name: str,
filter_query: Dict,
collation: Optional[Collation] = None,
hint=None,
session: Optional[ClientSession] = None,
let: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
) -> DeleteResult:
"""
To remove one document from the collection...
:param collection_name: The collection to which you want to add documents.
:param filter_query: A query that matches the document to delete.
:param collation:
An instance of class: ‘~pymongo.collation.Collation’. This option
is only supported on MongoDB 3.4 and above.
:param hint:
An index to use to support the query predicate. This option
is only supported on MongoDB 3.11 and above.
:param session: A class ‘~pymongo.client_session.ClientSession’.
:param let:
Map of parameter names and values. Values must be constant or closed
expressions that do not reference document fields. Parameters can then be
accessed as variables in an aggregate expression context (e.g. "$$var").
:param comment: A user-provided comment to attach to this command.
"""
return getattr(
self.db,
collection_name,
).delete_one(
filter_query,
collation=collation,
hint=hint,
session=session,
let=let,
comment=comment,
)
[docs]
def delete_many(
self,
collection_name: str,
filter_query: Dict,
collation: Optional[Collation] = None,
hint=None,
session: Optional[ClientSession] = None,
let: Optional[Mapping[str, Any]] = None,
comment: Optional[Any] = None,
) -> DeleteResult:
"""
It is used when one needs to delete more than one document. A query
object containing which document to be deleted is created and is passed
as the first parameter to the delete_many().
:param collection_name: The collection to which you want to add documents.
:param filter_query: A query that matches the document to delete.
:param collation:
An instance of class: ‘~pymongo.collation.Collation’. This option
is only supported on MongoDB 3.4 and above.
:param hint:
An index to use to support the query predicate. This option
is only supported on MongoDB 3.11 and above.
:param session: A class ‘~pymongo.client_session.ClientSession’.
:param let:
Map of parameter names and values. Values must be constant or closed
expressions that do not reference document fields. Parameters can then be
accessed as variables in an aggregate expression context (e.g. "$$var").
:param comment: A user-provided comment to attach to this command.
"""
return getattr(self.db, collection_name).delete_many(
filter_query,
collation=collation,
hint=hint,
session=session,
let=let,
comment=comment,
)