Source code for save_to_db.core.item_metaclass
from functools import partial, wraps
from itertools import chain
from save_to_db.adapters.utils import adapter_manager
from save_to_db.adapters.utils.relation_type import RelationType
from save_to_db.exceptions import (
InvalidFieldName,
RelatedItemNotFound,
MultipleRelatedItemsFound,
ItemAdapterNotFound,
NonExistentRelationDefined,
NonExistentFieldUsed,
FieldsFromRelationNotAllowed,
NorewriteRelationException,
NorewriteKeyUsedTwice,
DeleterSelectorsOrKeepersEmpty,
DeleterXToManyRelationUsed,
UnrefNonXToManyFieldUsed,
)
from .item_cls_manager import item_cls_manager
from .model_deleter import ModelDeleter
[docs]class ItemMetaclass(type):
"""A class that is used as a metaclass for :py:class:`~.item.Item`.
The :py:class:`~.item.Item` class has a configuration of
how collected data must be transformed and persisted into a database.
This metaclass processes and corrects item configuration.
"""
def __new__(cls, name, bases, dct, no_setup=False):
item_cls = super().__new__(cls, name, bases, dct)
# `no_setup` variable is only used to prevent setting up the
# :py:class:`~.item.Item` class that must be only used as base class.
if not no_setup:
cls.process_configuration(item_cls)
if item_cls.metadata["collection_id"] == None:
item_cls_manager.add(item_cls)
return item_cls
# this is needed for Python versions less then 3.6
def __init__(cls, name, bases, dct, **_):
return super().__init__(name, bases, dct)
@staticmethod
def get_default_configuration():
return {
"batch_size": None,
"metadata": {
"collection_id": None,
"setup_completed": False,
"autogenerated_item_cls": False,
"model_deleter": None,
"model_unrefs": {},
},
"model_cls": None,
"defaults": {},
"creators": None,
"creators_autoconfig": None,
"autoinject_creators": True,
"getters": None,
"getters_autoconfig": None,
"nullables": set(),
"remove_null_fields": None,
"remove_null_fields_autoconfig": None,
"fields": {},
"relations": {},
"conversions": {
# boolean
"boolean_true_strings": (
"true",
"yes",
"on",
"1",
"+",
),
"boolean_false_strings": (
"false",
"no",
"off",
"0",
"-",
),
# integer and float
"decimal_separator": ".",
# date and time formats
"date_formats": ("%Y-%m-%d",),
"time_formats": ("%H:%M:%S",),
"datetime_formats": ("%Y-%m-%d %H:%M:%S",),
# datetime functions
"date_func": None,
"time_func": None,
"datetime_func": None,
# timezone
"default_timezone": None,
},
"allow_multi_update": False,
"allow_merge_items": False,
"aliases": {},
"update_only_mode": False,
"get_only_mode": False,
"norewrite_fields": {},
"fast_insert": False,
"deleter_selectors": None,
"deleter_keepers": None,
"deleter_execute_on_persist": False,
"unref_x_to_many": {},
}
@staticmethod
def get_default_relation_attrs():
return {
"item_cls": None,
"relation_type": None,
"replace_x_to_many": False,
"reverse_key": None,
}
@classmethod
def process_configuration(cls, item_cls):
# --- making sure all configuration variables are present ---
default_configuration = cls.get_default_configuration()
for attr_name, default_value in default_configuration.items():
if not hasattr(item_cls, attr_name):
setattr(item_cls, attr_name, default_value)
# --- correcting metadata ----------------------------------------------
for key, value in default_configuration["metadata"].items():
if key not in item_cls.metadata:
item_cls.metadata[key] = value
# --- correcting configuration structure -------------------------------
# conversions
conversions = default_configuration["conversions"]
conversions.update(item_cls.conversions)
# fixing conversions for date and time
for key in (
"date_formats",
"time_formats",
"datetime_formats",
):
if isinstance(conversions[key], str):
conversions[key] = (conversions[key],)
item_cls.conversions = conversions
# getters and creators
for groups_key in ("getters", "creators"):
groups = getattr(item_cls, groups_key)
if groups is None:
continue # to be autoconfigured
for i in range(len(groups)):
if isinstance(groups[i], str):
groups[i] = {groups[i]}
if not isinstance(groups[i], set):
groups[i] = set(groups[i])
# nullables
if not isinstance(item_cls.nullables, set):
item_cls.nullables = set(item_cls.nullables)
# remove_null_fields
if (
item_cls.remove_null_fields is not None
and item_cls.remove_null_fields is not True
and not isinstance(item_cls.remove_null_fields, set)
):
item_cls.remove_null_fields = set(item_cls.remove_null_fields)
# relations
relations = item_cls.relations
default_relation_attrs = cls.get_default_relation_attrs()
for key in relations:
if not isinstance(relations[key], dict):
relations[key] = {
"item_cls": relations[key],
}
for k, v in default_relation_attrs.items():
relations[key].setdefault(k, v)
# unref_x_to_many
unref_x_to_many = item_cls.unref_x_to_many
for key in unref_x_to_many:
if isinstance(unref_x_to_many[key], (list, set, tuple)):
unref_x_to_many[key] = {
"selectors": unref_x_to_many[key],
}
if "keepers" not in unref_x_to_many[key]:
unref_x_to_many[key]["keepers"] = None
# --- complete_setup ---------------------------------------------------
wrap = wraps(cls.complete_setup)
item_cls.complete_setup = wrap(partial(cls.complete_setup, item_cls))
[docs] @classmethod
def complete_setup(cls, item_cls):
"""This method validates manual configuration of an
:py:class:`~.item.Item` and automatically completes configuration based
on available data.
.. note::
All :py:class:`~.item.Item` classes get this method wrapped in
:py:class:`functools.partial` that already passes the `item_cls`
value.
"""
if item_cls.metadata["setup_completed"]:
return
# --- relation paths ---------------------------------------------------
for relation in item_cls.relations.values():
if isinstance(relation["item_cls"], str):
# first looking in the item's module
item_cls_list = item_cls_manager.get_by_path(
relation["item_cls"], relative_to=item_cls.__module__
)
# if not found, looking everywhere
if not item_cls_list:
item_cls_list = item_cls_manager.get_by_path(relation["item_cls"])
# exceptions
if not item_cls_list:
raise RelatedItemNotFound(relation["item_cls"])
if len(item_cls_list) != 1:
raise MultipleRelatedItemsFound(relation["item_cls"])
relation["item_cls"] = item_cls_list[0]
model_cls = item_cls.model_cls
if model_cls is None:
item_cls.metadata["setup_completed"] = True
return # no model_cls to fully complete setup
# --- setup with adapter usage -----------------------------------------
adapter_cls = adapter_manager.get_adapter_cls(model_cls)
if not adapter_cls:
raise ItemAdapterNotFound(item_cls)
# fields
item_cls.fields = {}
for fname, ftype in adapter_cls.iter_fields(model_cls):
if "__" in fname:
raise InvalidFieldName(fname)
item_cls.fields[fname] = ftype
# relations
original_relation_keys_to_copy = {
"replace_x_to_many",
}
relations = {}
for (
key,
other_model_cls,
relation_type,
reverse_key,
) in adapter_cls.iter_relations(model_cls):
other_item_cls_list = item_cls_manager.get_by_model_cls(
other_model_cls, collection_id=item_cls.get_collection_id()
)
# exceptions
if not other_item_cls_list:
raise RelatedItemNotFound(model_cls, key, other_model_cls)
if len(other_item_cls_list) != 1:
raise MultipleRelatedItemsFound(model_cls, other_item_cls_list)
relations[key] = cls.get_default_relation_attrs()
relations[key].update(
{
"item_cls": other_item_cls_list[0],
"relation_type": relation_type,
"reverse_key": reverse_key,
}
)
for key in item_cls.relations.keys():
if key not in relations:
raise NonExistentRelationDefined(item_cls, "relations", key)
for subkey in original_relation_keys_to_copy:
if subkey in item_cls.relations[key]:
relations[key][subkey] = item_cls.relations[key][subkey]
# (to keep reference to the same dictionary)
item_cls.relations.clear()
item_cls.relations.update(relations)
# norewrite_fields -----------------------------------------------------
def fix_norewrite_field(item_cls, key, value):
if key not in item_cls.norewrite_fields:
item_cls.norewrite_fields[key] = value
if key in item_cls.relations:
other_item_cls = item_cls.relations[key]["item_cls"]
reverse_key = item_cls.relations[key]["reverse_key"]
if reverse_key:
if reverse_key not in other_item_cls.norewrite_fields:
other_item_cls.norewrite_fields[reverse_key] = value
elif not (value is other_item_cls.norewrite_fields[reverse_key]):
raise NorewriteRelationException(
"norewrite_fields for relations must be the same "
"on both sides",
key,
reverse_key,
)
# untuple norewrite fields
def untuple(tupled_value):
for key_in_tuple in tupled_value:
if isinstance(key_in_tuple, tuple):
yield from untuple(key_in_tuple)
else:
yield key_in_tuple
for key in tuple(item_cls.norewrite_fields):
if isinstance(key, tuple):
norewrite_value = item_cls.norewrite_fields[key]
del item_cls.norewrite_fields[key]
for key_in_tuple in untuple(key):
if key_in_tuple in item_cls.norewrite_fields:
raise NorewriteKeyUsedTwice(key_in_tuple)
item_cls.norewrite_fields[key_in_tuple] = norewrite_value
# regular fileds
norewrite_selectors = {}
for key in tuple(item_cls.norewrite_fields):
norewrite_value = item_cls.norewrite_fields[key]
if isinstance(key, (bool, RelationType)): # selector field
norewrite_selectors[key] = norewrite_value
continue
fix_norewrite_field(item_cls, key, norewrite_value)
# selector fields
selector_keys = list(norewrite_selectors)
type_to_key = {
RelationType: 0,
bool: 1,
}
selector_keys.sort(key=lambda t: type_to_key[type(t)])
for selector in selector_keys:
norewrite_value = item_cls.norewrite_fields[selector]
del item_cls.norewrite_fields[selector]
if isinstance(selector, RelationType):
for key, relation in item_cls.relations.items():
if key in item_cls.norewrite_fields:
continue
if relation["relation_type"] is selector:
fix_norewrite_field(item_cls, key, norewrite_value)
continue
# boolean is the last in the list, it'll set all fields, including
# relations
if isinstance(selector, bool):
for key in chain(item_cls.fields, item_cls.relations):
if key in item_cls.norewrite_fields:
continue
fix_norewrite_field(item_cls, key, norewrite_value)
continue
# creators, getters, nullables, and other field validation -------------
def basic_field_validation(item_cls, field_names, groups_key):
for field_name in field_names:
if "__" in field_name:
raise FieldsFromRelationNotAllowed(item_cls, groups_key, field_name)
# NonExistentFieldUsed
if (
field_name not in item_cls.fields
and field_name not in item_cls.relations
):
raise NonExistentFieldUsed(item_cls, groups_key, field_name)
# Default relations
if groups_key == "defaults" and field_name in item_cls.relations:
default_value = item_cls.defaults[field_name]
if not isinstance(default_value, list):
item_cls.defaults[field_name] = default_value
else:
bulk_item_cls = item_cls.relations[field_name]["item_cls"]
default_bulk = bulk_item_cls.Bulk()
default_bulk.add(*default_value)
item_cls.defaults[field_name] = default_bulk
# remove_null_fields as `True` -----------------------------------------
if item_cls.remove_null_fields is True:
item_cls.remove_null_fields = set(
field_name for field_name, *_ in adapter_cls.iter_fields(model_cls)
).union(
set(
field_name
for field_name, *_ in adapter_cls.iter_relations(model_cls)
)
)
for groups_key in (
"defaults",
"getters",
"creators",
"nullables",
"remove_null_fields",
"norewrite_fields",
"deleter_selectors",
"deleter_keepers",
"unref_x_to_many",
):
groups = getattr(item_cls, groups_key)
if groups is None:
continue # to be autoconfigured
if groups_key == "nullables": # nullables is a single group
groups = [groups]
elif groups_key == "defaults": # defaults is a dictionary
groups = [set(groups)]
elif groups_key == "norewrite_fields":
groups = [set(groups)]
elif groups_key == "remove_null_fields":
groups = [set(groups)]
elif groups_key in ["deleter_selectors", "deleter_keepers"]:
groups = [set(groups)]
elif groups_key == "unref_x_to_many":
groups = [set(groups)]
basic_field_validation(item_cls, chain(*groups), groups_key)
# creators and getters automatic configuration -------------------------
if item_cls.creators_autoconfig is True or (
item_cls.creators_autoconfig is None and item_cls.creators is None
):
item_cls.creators = item_cls.creators or []
not_nulls = set(adapter_cls.iter_required_fields(model_cls))
if not_nulls:
if not item_cls.creators:
# now `item_cls.creators` cannot be empty
item_cls.creators.append(not_nulls)
if item_cls.autoinject_creators:
for index, value in enumerate(item_cls.creators):
item_cls.creators[index] = value.union(not_nulls)
elif not_nulls not in item_cls.creators:
item_cls.creators.append(not_nulls)
if item_cls.getters_autoconfig is True or (
item_cls.getters_autoconfig is None and item_cls.getters is None
):
item_cls.getters = item_cls.getters or []
uniques = adapter_cls.iter_unique_field_combinations(model_cls)
for unique_set in uniques:
unique_set = set(unique_set)
if unique_set not in item_cls.getters:
item_cls.getters.append(unique_set)
# remove_null_fields automatic configuration ---------------------------
if item_cls.remove_null_fields_autoconfig is True or (
item_cls.remove_null_fields is None
and item_cls.remove_null_fields_autoconfig is None
):
item_cls.remove_null_fields = item_cls.remove_null_fields or set()
not_nulls = set(adapter_cls.iter_required_fields(model_cls))
item_cls.remove_null_fields = item_cls.remove_null_fields.union(not_nulls)
else:
item_cls.remove_null_fields = item_cls.remove_null_fields or set()
# completing setup for related classes ---------------------------------
item_cls.metadata["setup_completed"] = True
for relation in item_cls.relations.values():
relation["item_cls"].complete_setup()
item_cls.metadata["setup_completed"] = False
# setting up model deleters --------------------------------------------
# validating `unref_x_to_many`
for fkey in item_cls.unref_x_to_many:
if (
fkey not in item_cls.relations
or item_cls.relations[fkey]["relation_type"].is_x_to_one()
):
raise UnrefNonXToManyFieldUsed(fkey)
other_cls = item_cls.relations[fkey]["item_cls"]
fields = list(
chain(
item_cls.unref_x_to_many[fkey].get("selectors") or [],
item_cls.unref_x_to_many[fkey].get("keepers") or [],
)
)
basic_field_validation(other_cls, fields, None)
# generating model deleters
def create_model_deleter(item_cls, selectors, keepers):
model_cls = item_cls.model_cls
if keepers is None:
keepers = set(adapter_cls.get_primary_key_names(model_cls))
if not selectors or not keepers:
raise DeleterSelectorsOrKeepersEmpty(
"Selectors and keepers cannot be empty"
)
for field_name in chain(selectors, keepers):
if field_name in item_cls.relations:
relation = item_cls.relations[field_name]
if relation["relation_type"].is_x_to_many():
raise DeleterXToManyRelationUsed(field_name)
return ModelDeleter(model_cls, selectors, keepers)
# model_deleter
if item_cls.deleter_selectors is not None:
item_cls.metadata["model_deleter"] = create_model_deleter(
item_cls, item_cls.deleter_selectors, item_cls.deleter_keepers
)
# model_unrefs
for fkey in item_cls.unref_x_to_many:
other_cls = item_cls.relations[fkey]["item_cls"]
item_cls.metadata["model_unrefs"][fkey] = create_model_deleter(
other_cls,
item_cls.unref_x_to_many[fkey]["selectors"],
item_cls.unref_x_to_many[fkey]["keepers"],
)
# testing aliases ------------------------------------------------------
item_cls.metadata["setup_completed"] = True
try:
for alias in item_cls.aliases.values():
item_cls._get_real_keys(alias)
except:
item_cls.metadata["setup_completed"] = False
raise
item_cls.metadata["setup_completed"] = True