Source code for save_to_db.core.persister
import pickle
import struct
from .item_cls_manager import item_cls_manager
from .merge_policy import MergePolicy
from .utils.item_collection import ItemCollection
[docs]class Persister(object):
"""This class is used to persist items to database or save and load them
from files.
:param db_adapter: Instance of a subclass of
:py:class:`~save_to_db.adapters.utils.adapter_base.AdapterBase` used
to deal with items and ORM models.
:param autocommit: If `True` commits changes to database each time an
item is persisted.
"""
def __init__(self, db_adapter, autocommit=False):
self.db_adapter = db_adapter
self.autocommit = autocommit
# --- database adapter facade ----------------------------------------------
[docs] def persist(self, item, commit=None):
"""Saves item data into a database by creating or update appropriate
database records.
:param item: an instance of :py:class:`~.item_base.ItemBase` to persist.
:param commit: If `True` commits changes to database. If `None` then
`autocommit` value (initially set at creation time) is used.
:returns: Item list and corresponding list of ORM model lists.
"""
result = self.db_adapter.persist(item)
self.__commit_if_needed(commit)
return result
[docs] def merge_models(self, models, commit=None, merge_policy=None, ignore_fields=None):
"""Calls
:py:meth:`~save_to_db.adapters.utils.adapter_base.AdapterBase.merge_models`
method of
:py:class:`~save_to_db.adapters.utils.adapter_base.AdapterBase`
internal instance.
:param commit: If `True` commits changes to database. If `None` then
`autocommit` value (initially set at creation time) is used.
:returns: First model from `models` into which all other models merged.
"""
result_model = self.db_adapter.merge_models(
models, merge_policy=merge_policy, ignore_fields=ignore_fields
)
self.__commit_if_needed(commit)
return result_model
def __commit_if_needed(self, commit):
try:
if commit or (commit is None and self.autocommit):
self.commit()
except:
if commit or (commit is None and self.autocommit):
self.rollback()
raise
[docs] def commit(self):
""" Commits persisted items to database. """
self.db_adapter.commit()
[docs] def rollback(self):
""" Rolls back current transaction. """
self.db_adapter.rollback()
[docs] def pprint(self, *models):
"""Pretty prints `model` to console.
:param \*models: List of models to print.
"""
self.db_adapter.pprint(*models)
# --- interface for working with files -------------------------------------
[docs] def dumps(self, item):
"""Converts an item into bytes.
:param item: An instance of :py:class:`~.item_base.ItemBase` class.
:returns: Encoded item as `bytes`.
"""
item.process()
model_cls = item.model_cls
return pickle.dumps(
{
"table_fullname": self.db_adapter.get_table_fullname(model_cls),
"is_bulk_item": item.is_bulk_item(),
"dict_wrapper": item.to_dict(),
"collection_id": item.get_collection_id(),
}
)
[docs] def loads(self, data):
"""Decodes `bytes` data into an instance of
:py:class:`~.item_base.ItemBase`.
:param data: Encoded item as `bytes`.
:returns: An instance of :py:class:`~.item_base.ItemBase`.
"""
result_data = pickle.loads(data)
model_cls = self.db_adapter.get_model_cls_by_table_fullname(
result_data["table_fullname"]
)
item_cls = item_cls_manager.get_by_model_cls(
model_cls, collection_id=result_data["collection_id"]
)[0]
item = item_cls.Bulk() if result_data["is_bulk_item"] else item_cls()
item = item.load_dict(result_data["dict_wrapper"])
return item
[docs] def dump(self, item, fp):
"""Saves an item into a file.
.. note::
This method also saves the size of encoded item. So it is possible
to save multiple items one after another into the same file and
load them later.
:param item: An item to be saved.
:param fp: File-like object to save `item` into.
"""
item_data = self.dumps(item)
fp.write(struct.pack(">I", len(item_data)))
fp.write(item_data)
[docs] def load(self, fp):
"""Loads and decodes one item from a file-like object.
:param fp: File-like object to read from.
:return: One item read from `fp` or `None` if there are no data to read
anymore.
"""
int_as_bytes = fp.read(4)
if not int_as_bytes:
return None
l = struct.unpack(">I", int_as_bytes)[0]
return self.loads(fp.read(l))
# ---model deleter ---------------------------------------------------------
[docs] def execute_deleter(self, item_cls, commit=None):
"""Deletes models according to `deleter_selectors`
and `deleter_keepers`. See their description in
:py:class:`~.item.Item` configuration.
:param item_cls: :py:class:`~.item.Item` instance for which deletion
must be executed.
:param commit: If `True` commits changes to database. If `None` then
`autocommit` value (initially set at creation time) is used.
"""
if not item_cls.metadata["model_deleter"]:
return
item_cls.metadata["model_deleter"].execute_delete(self.db_adapter)
self.__commit_if_needed(commit)
[docs] def execute_scope_deleter(self, collection_id, commit=None):
"""Calls :py:meth:`~execute_deleter` method for all item classes in
scope.
:param collection_id: An ID of an
:py:class:`~utils.item_collection.ItemCollection`
(:py:class:`~.scope.Scope` is a subclass of it).
:param commit: If `True` commits changes to database. If `None` then
`autocommit` value (initially set at creation time) is used.
"""
collection = ItemCollection.get_collection_by_id(collection_id)
for item_cls in collection.get_all_item_classes():
self.execute_deleter(item_cls, commit=False)
self.__commit_if_needed(commit)
# ---merge policy ----------------------------------------------------------
[docs] def create_merge_policy(self, policy, defaults=None):
"""Creates an instance of :py:class:`~.merge_policy.MergePolicy`
class.
:param policy: *policy* argument for
:py:class:`~.merge_policy.MergePolicy` class constructor.
:param defaults: *defaults* argument for
:py:class:`~.merge_policy.MergePolicy` class constructor.
:returns: An instance of :py:class:`~.merge_policy.MergePolicy` class.
"""
return MergePolicy(self.db_adapter, policy, defaults)