import copy
from .item import ItemBase
from .item_contructor import complete_item_structure
from .utils.mapper import mapper
from save_to_db.exceptions import (
BulkItemOneToXDefaultError,
ItemProcessError,
ItemRevertError,
)
[docs]class BulkItem(ItemBase):
"""This class deals with instances of :py:class:`~.item.Item` in chunks.
It can create or update multiple database rows using single query, e.g. it
can persist multiple items at once.
.. note::
You get items values from a bulk item like this:
- `bulk[number]` return item at a given index `number`;
- `bulk[string]` return default value for a key `string`;
- `bulk[slice]` (e.g. `bulk[1:2]`) returns python list of items
containing specified items.
.. note::
Defaults order is based on the number of '__' the key contains,
shorter keys are prioritized. This is due to the fact that default
values can be another items.
:param item_cls: A subclass of :py:class:`~.item.Item` that
this class deals with.
:param \*\*kwargs: Values that will be saved as **default** item data.
"""
# --- special methods ------------------------------------------------------
def __init__(self, item_cls, **kwargs):
self.bulk = []
self.item_cls = item_cls
super().__init__()
for key, value in kwargs.items():
self[key] = value # this will trigger `__setitem__` function
def __setitem__(self, key, value):
real_key = self.item_cls._get_real_keys(key)
real_key_str = "__".join(real_key)
# value one-to-x must not be an item
if isinstance(value, ItemBase):
# getting last relation type
item_cls = self.item_cls
for key in real_key:
if key not in item_cls.relations:
break
relation = item_cls.relations[key]
item_cls = relation["item_cls"]
if relation["relation_type"].is_one_to_x():
raise BulkItemOneToXDefaultError(self.item_cls, key, real_key_str)
self.data[real_key_str] = value
def __getitem__(self, key):
if isinstance(key, int) or isinstance(key, slice):
return self.bulk[key]
real_keys = self.item_cls._get_real_keys(key)
return self._get_direct(real_keys)
def _get_direct(self, real_keys):
real_key_str = "__".join(real_keys)
if real_key_str in self.data:
return self.data[real_key_str]
item_cls = self.item_cls
previous_cls = None
for real_key in real_keys:
previous_cls = item_cls
if real_key in item_cls.relations:
item_cls = item_cls.relations[real_key]["item_cls"]
else:
raise KeyError(real_key)
if len(real_keys) == 1:
self[real_key_str] = self.item_cls._cls_genitem(real_keys[-1])
else:
self[real_key_str] = previous_cls._cls_genitem(real_keys[-1])
return self.data[real_key_str]
def __delitem__(self, key):
real_key = self.item_cls._get_real_keys(key, as_string=True)
del self.data[real_key]
def __contains__(self, key):
if isinstance(key, ItemBase):
return key in self.bulk
try:
real_keys = self.item_cls._get_real_keys(key)
except KeyError:
return False
return self._contains_direct(real_keys)
def _contains_direct(self, real_keys):
return "__".join(real_keys) in self.data
def __iter__(self):
for item in self.bulk:
yield item
def __len__(self):
return len(self.bulk)
def __deepcopy__(self, memo={}):
if self in memo:
return memo[self]
self_copy = type(self)(self.item_cls)
memo[self] = self_copy
# copying fields and relations
for key in self.data:
self_copy.data[key] = copy.deepcopy(self.data[key], memo=memo)
# copying items in bulk
for item in self.bulk:
self_copy.add(copy.deepcopy(item, memo=memo))
return self_copy
# --- utility methods ------------------------------------------------------
[docs] def to_dict(self, revert=False):
return self._to_dict(revert=revert, _item_to_dict={}, _address_to_item={})
def _to_dict(self, revert, _item_to_dict, _address_to_item):
self_address = id(self)
if self_address in _item_to_dict:
if self_address not in _address_to_item:
_address_to_item[self_address] = len(_address_to_item) + 1
self_id = _address_to_item[self_address]
_item_to_dict[self_address]["id"] = self_id
return {
"id": self_id,
}
result = {
"defaults": {},
"bulk": [],
}
_item_to_dict[self_address] = result
# defaults first in `_load_dict()` as well
defaults = result["defaults"]
for key, value in self.data.items():
if not isinstance(value, ItemBase):
if revert:
value = self.item_cls.revert_field(key, value, aliased=False)
defaults[key] = value
else:
defaults[key] = value._to_dict(
revert=revert,
_item_to_dict=_item_to_dict,
_address_to_item=_address_to_item,
)
bulk = result["bulk"]
for item in self.bulk:
bulk.append(
item._to_dict(
revert=revert,
_item_to_dict=_item_to_dict,
_address_to_item=_address_to_item,
)
)
return result
[docs] def load_dict(self, data):
return self._load_dict(data, _id_to_item={})
def _load_dict(self, data, _id_to_item):
if "id" in data:
if data["id"] not in _id_to_item:
_id_to_item[data["id"]] = self.item_cls.Bulk()
bulk = _id_to_item[data["id"]]
if "defaults" not in data:
return _id_to_item[data["id"]]
else:
bulk = self.item_cls.Bulk()
# defaults first in `_to_dict()` as well
for key, value in data["defaults"].items():
# getting relation class
cur_cls = bulk.item_cls
for cur_key in key.split("__"):
if cur_key in cur_cls.relations:
cur_cls = cur_cls.relations[cur_key]["item_cls"]
else:
cur_cls = None
break
if cur_cls: # relation
bulk[key] = bulk[key]._load_dict(value, _id_to_item)
else:
bulk[key] = value
# bulk
for dict_wrapper in data["bulk"]:
bulk.add(bulk.item_cls()._load_dict(dict_wrapper, _id_to_item))
return bulk
# --- properties -----------------------------------------------------------
@property
def model_cls(self):
"""Property that returns `model_cls` attribute of the `item_cls`
class.
"""
return self.item_cls.model_cls
[docs] def get_item_cls(self):
return self.item_cls
[docs] def is_scoped(self):
return self.item_cls.metadata["collection_id"] != None
[docs] def get_collection_id(self):
return self.item_cls.metadata["collection_id"]
@property
def relations(self):
return self.item_cls.relations
# --- main methods ---------------------------------------------------------
[docs] def add(self, *items):
"""Adds `items` to the bulk.
:param \*items: List of instances of :py:class:`~.item_base.ItemBase`
class to be added to the bulk.
"""
self.add_at_index(None, *items)
[docs] def add_at_index(self, index, *items):
"""Adds `items` to the bulk at `index` position.
If item already in the bulk, nothing will happen.
:param index: Starting position in the bulk list.
:param \*items: List of instances of :py:class:`~.item_base.ItemBase`
class to be added to the bulk.
"""
for index_shift, item in enumerate(items):
if item not in self.bulk:
if index is None:
self.bulk.append(item)
else:
self.bulk.insert(index + index_shift, item)
mapper.item_added_to_bulk(item, self)
[docs] def gen(self, *args, **kwargs):
"""Creates a :py:class:`~.item.Item` instance and adds it to the bulk.
:param \*args: Positional arguments that are passed to the item
constructor.
:param \*\*kwargs: Keyword arguments that are passed to the item
constructor.
:returns: :py:class:`~.item.Item` instance.
"""
item = self.item_cls(*args, **kwargs)
self.add(item)
return item
[docs] def remove(self, *items):
"""Removes `item` from the bulk.
:param \*items: List of instances of :py:class:`~.item_base.ItemBase`
class to be removed from the bulk.
"""
for item in items:
if item in self.bulk:
self.bulk.remove(item)
mapper.item_removed_from_bulk(item, self)
[docs] def as_list(self):
return self.bulk
[docs] def is_single_item(self):
return False
[docs] def is_bulk_item(self):
return True
[docs] def revert(self):
return self._revert(_procesed_items=[])
def _revert(self, _procesed_items=[]):
if self in _procesed_items:
return
_procesed_items.append(self)
for key, value in self.data.items():
if not isinstance(value, ItemBase):
try:
self.data[key] = self.item_cls.revert_field(
key, value, aliased=False
)
except:
raise ItemRevertError(self, key, self.data[key])
else:
self.data[key]._revert(_procesed_items=_procesed_items)
for item in self.bulk:
item._revert(_procesed_items=_procesed_items)
[docs] def process(self):
self._process(_procesed_items=[])
return complete_item_structure(self)
def _process(self, _procesed_items):
if self in _procesed_items:
return
_procesed_items.append(self)
keys = list(self.data)
keys.sort(key=lambda key: key.count("__"))
for key in keys:
value = self.data[key]
end_item_cls = self.item_cls
end_relation = None
for subkey in key.split("__"):
if subkey in end_item_cls.relations:
end_relation = end_item_cls.relations[subkey]
end_item_cls = end_relation["item_cls"]
else:
end_relation = None
if not end_relation: # not a relation
try:
self.data[key] = self.item_cls.process_field(
key, value, aliased=False
)
except:
raise ItemProcessError(self, key, value)
else:
if isinstance(value, list):
bulk_item = self.item_cls.relations[key]["item_cls"].Bulk()
bulk_item.add(*value)
value = bulk_item
value._process(_procesed_items=_procesed_items)
self.data[key] = value
for item in self.bulk:
if key in item:
continue
item[key] = self.data[key]
for item in self.bulk:
item._process(_procesed_items=_procesed_items)