Source code for save_to_db.core.item_metaclass

from functools import partial, wraps
from itertools import chain

from save_to_db.adapters.utils import adapter_manager
from save_to_db.adapters.utils.relation_type import RelationType

from save_to_db.exceptions import (
    InvalidFieldName,
    RelatedItemNotFound,
    MultipleRelatedItemsFound,
    ItemAdapterNotFound,
    NonExistentRelationDefined,
    NonExistentFieldUsed,
    FieldsFromRelationNotAllowed,
    NorewriteRelationException,
    NorewriteKeyUsedTwice,
    DeleterSelectorsOrKeepersEmpty,
    DeleterXToManyRelationUsed,
    UnrefNonXToManyFieldUsed,
)
from .item_cls_manager import item_cls_manager
from .model_deleter import ModelDeleter


[docs]class ItemMetaclass(type): """A class that is used as a metaclass for :py:class:`~.item.Item`. The :py:class:`~.item.Item` class has a configuration of how collected data must be transformed and persisted into a database. This metaclass processes and corrects item configuration. """ def __new__(cls, name, bases, dct, no_setup=False): item_cls = super().__new__(cls, name, bases, dct) # `no_setup` variable is only used to prevent setting up the # :py:class:`~.item.Item` class that must be only used as base class. if not no_setup: cls.process_configuration(item_cls) if item_cls.metadata["collection_id"] == None: item_cls_manager.add(item_cls) return item_cls # this is needed for Python versions less then 3.6 def __init__(cls, name, bases, dct, **_): return super().__init__(name, bases, dct) @staticmethod def get_default_configuration(): return { "batch_size": None, "metadata": { "collection_id": None, "setup_completed": False, "autogenerated_item_cls": False, "model_deleter": None, "model_unrefs": {}, }, "model_cls": None, "defaults": {}, "creators": None, "creators_autoconfig": None, "autoinject_creators": True, "getters": None, "getters_autoconfig": None, "nullables": set(), "remove_null_fields": None, "remove_null_fields_autoconfig": None, "fields": {}, "relations": {}, "conversions": { # boolean "boolean_true_strings": ( "true", "yes", "on", "1", "+", ), "boolean_false_strings": ( "false", "no", "off", "0", "-", ), # integer and float "decimal_separator": ".", # date and time formats "date_formats": ("%Y-%m-%d",), "time_formats": ("%H:%M:%S",), "datetime_formats": ("%Y-%m-%d %H:%M:%S",), # datetime functions "date_func": None, "time_func": None, "datetime_func": None, # timezone "default_timezone": None, }, "allow_multi_update": False, "allow_merge_items": False, "aliases": {}, "update_only_mode": False, "get_only_mode": False, "norewrite_fields": {}, "fast_insert": False, "deleter_selectors": None, "deleter_keepers": None, "deleter_execute_on_persist": False, "unref_x_to_many": {}, } @staticmethod def get_default_relation_attrs(): return { "item_cls": None, "relation_type": None, "replace_x_to_many": False, "reverse_key": None, } @classmethod def process_configuration(cls, item_cls): # --- making sure all configuration variables are present --- default_configuration = cls.get_default_configuration() for attr_name, default_value in default_configuration.items(): if not hasattr(item_cls, attr_name): setattr(item_cls, attr_name, default_value) # --- correcting metadata ---------------------------------------------- for key, value in default_configuration["metadata"].items(): if key not in item_cls.metadata: item_cls.metadata[key] = value # --- correcting configuration structure ------------------------------- # conversions conversions = default_configuration["conversions"] conversions.update(item_cls.conversions) # fixing conversions for date and time for key in ( "date_formats", "time_formats", "datetime_formats", ): if isinstance(conversions[key], str): conversions[key] = (conversions[key],) item_cls.conversions = conversions # getters and creators for groups_key in ("getters", "creators"): groups = getattr(item_cls, groups_key) if groups is None: continue # to be autoconfigured for i in range(len(groups)): if isinstance(groups[i], str): groups[i] = {groups[i]} if not isinstance(groups[i], set): groups[i] = set(groups[i]) # nullables if not isinstance(item_cls.nullables, set): item_cls.nullables = set(item_cls.nullables) # remove_null_fields if ( item_cls.remove_null_fields is not None and item_cls.remove_null_fields is not True and not isinstance(item_cls.remove_null_fields, set) ): item_cls.remove_null_fields = set(item_cls.remove_null_fields) # relations relations = item_cls.relations default_relation_attrs = cls.get_default_relation_attrs() for key in relations: if not isinstance(relations[key], dict): relations[key] = { "item_cls": relations[key], } for k, v in default_relation_attrs.items(): relations[key].setdefault(k, v) # unref_x_to_many unref_x_to_many = item_cls.unref_x_to_many for key in unref_x_to_many: if isinstance(unref_x_to_many[key], (list, set, tuple)): unref_x_to_many[key] = { "selectors": unref_x_to_many[key], } if "keepers" not in unref_x_to_many[key]: unref_x_to_many[key]["keepers"] = None # --- complete_setup --------------------------------------------------- wrap = wraps(cls.complete_setup) item_cls.complete_setup = wrap(partial(cls.complete_setup, item_cls))
[docs] @classmethod def complete_setup(cls, item_cls): """This method validates manual configuration of an :py:class:`~.item.Item` and automatically completes configuration based on available data. .. note:: All :py:class:`~.item.Item` classes get this method wrapped in :py:class:`functools.partial` that already passes the `item_cls` value. """ if item_cls.metadata["setup_completed"]: return # --- relation paths --------------------------------------------------- for relation in item_cls.relations.values(): if isinstance(relation["item_cls"], str): # first looking in the item's module item_cls_list = item_cls_manager.get_by_path( relation["item_cls"], relative_to=item_cls.__module__ ) # if not found, looking everywhere if not item_cls_list: item_cls_list = item_cls_manager.get_by_path(relation["item_cls"]) # exceptions if not item_cls_list: raise RelatedItemNotFound(relation["item_cls"]) if len(item_cls_list) != 1: raise MultipleRelatedItemsFound(relation["item_cls"]) relation["item_cls"] = item_cls_list[0] model_cls = item_cls.model_cls if model_cls is None: item_cls.metadata["setup_completed"] = True return # no model_cls to fully complete setup # --- setup with adapter usage ----------------------------------------- adapter_cls = adapter_manager.get_adapter_cls(model_cls) if not adapter_cls: raise ItemAdapterNotFound(item_cls) # fields item_cls.fields = {} for fname, ftype in adapter_cls.iter_fields(model_cls): if "__" in fname: raise InvalidFieldName(fname) item_cls.fields[fname] = ftype # relations original_relation_keys_to_copy = { "replace_x_to_many", } relations = {} for ( key, other_model_cls, relation_type, reverse_key, ) in adapter_cls.iter_relations(model_cls): other_item_cls_list = item_cls_manager.get_by_model_cls( other_model_cls, collection_id=item_cls.get_collection_id() ) # exceptions if not other_item_cls_list: raise RelatedItemNotFound(model_cls, key, other_model_cls) if len(other_item_cls_list) != 1: raise MultipleRelatedItemsFound(model_cls, other_item_cls_list) relations[key] = cls.get_default_relation_attrs() relations[key].update( { "item_cls": other_item_cls_list[0], "relation_type": relation_type, "reverse_key": reverse_key, } ) for key in item_cls.relations.keys(): if key not in relations: raise NonExistentRelationDefined(item_cls, "relations", key) for subkey in original_relation_keys_to_copy: if subkey in item_cls.relations[key]: relations[key][subkey] = item_cls.relations[key][subkey] # (to keep reference to the same dictionary) item_cls.relations.clear() item_cls.relations.update(relations) # norewrite_fields ----------------------------------------------------- def fix_norewrite_field(item_cls, key, value): if key not in item_cls.norewrite_fields: item_cls.norewrite_fields[key] = value if key in item_cls.relations: other_item_cls = item_cls.relations[key]["item_cls"] reverse_key = item_cls.relations[key]["reverse_key"] if reverse_key: if reverse_key not in other_item_cls.norewrite_fields: other_item_cls.norewrite_fields[reverse_key] = value elif not (value is other_item_cls.norewrite_fields[reverse_key]): raise NorewriteRelationException( "norewrite_fields for relations must be the same " "on both sides", key, reverse_key, ) # untuple norewrite fields def untuple(tupled_value): for key_in_tuple in tupled_value: if isinstance(key_in_tuple, tuple): yield from untuple(key_in_tuple) else: yield key_in_tuple for key in tuple(item_cls.norewrite_fields): if isinstance(key, tuple): norewrite_value = item_cls.norewrite_fields[key] del item_cls.norewrite_fields[key] for key_in_tuple in untuple(key): if key_in_tuple in item_cls.norewrite_fields: raise NorewriteKeyUsedTwice(key_in_tuple) item_cls.norewrite_fields[key_in_tuple] = norewrite_value # regular fileds norewrite_selectors = {} for key in tuple(item_cls.norewrite_fields): norewrite_value = item_cls.norewrite_fields[key] if isinstance(key, (bool, RelationType)): # selector field norewrite_selectors[key] = norewrite_value continue fix_norewrite_field(item_cls, key, norewrite_value) # selector fields selector_keys = list(norewrite_selectors) type_to_key = { RelationType: 0, bool: 1, } selector_keys.sort(key=lambda t: type_to_key[type(t)]) for selector in selector_keys: norewrite_value = item_cls.norewrite_fields[selector] del item_cls.norewrite_fields[selector] if isinstance(selector, RelationType): for key, relation in item_cls.relations.items(): if key in item_cls.norewrite_fields: continue if relation["relation_type"] is selector: fix_norewrite_field(item_cls, key, norewrite_value) continue # boolean is the last in the list, it'll set all fields, including # relations if isinstance(selector, bool): for key in chain(item_cls.fields, item_cls.relations): if key in item_cls.norewrite_fields: continue fix_norewrite_field(item_cls, key, norewrite_value) continue # creators, getters, nullables, and other field validation ------------- def basic_field_validation(item_cls, field_names, groups_key): for field_name in field_names: if "__" in field_name: raise FieldsFromRelationNotAllowed(item_cls, groups_key, field_name) # NonExistentFieldUsed if ( field_name not in item_cls.fields and field_name not in item_cls.relations ): raise NonExistentFieldUsed(item_cls, groups_key, field_name) # Default relations if groups_key == "defaults" and field_name in item_cls.relations: default_value = item_cls.defaults[field_name] if not isinstance(default_value, list): item_cls.defaults[field_name] = default_value else: bulk_item_cls = item_cls.relations[field_name]["item_cls"] default_bulk = bulk_item_cls.Bulk() default_bulk.add(*default_value) item_cls.defaults[field_name] = default_bulk # remove_null_fields as `True` ----------------------------------------- if item_cls.remove_null_fields is True: item_cls.remove_null_fields = set( field_name for field_name, *_ in adapter_cls.iter_fields(model_cls) ).union( set( field_name for field_name, *_ in adapter_cls.iter_relations(model_cls) ) ) for groups_key in ( "defaults", "getters", "creators", "nullables", "remove_null_fields", "norewrite_fields", "deleter_selectors", "deleter_keepers", "unref_x_to_many", ): groups = getattr(item_cls, groups_key) if groups is None: continue # to be autoconfigured if groups_key == "nullables": # nullables is a single group groups = [groups] elif groups_key == "defaults": # defaults is a dictionary groups = [set(groups)] elif groups_key == "norewrite_fields": groups = [set(groups)] elif groups_key == "remove_null_fields": groups = [set(groups)] elif groups_key in ["deleter_selectors", "deleter_keepers"]: groups = [set(groups)] elif groups_key == "unref_x_to_many": groups = [set(groups)] basic_field_validation(item_cls, chain(*groups), groups_key) # creators and getters automatic configuration ------------------------- if item_cls.creators_autoconfig is True or ( item_cls.creators_autoconfig is None and item_cls.creators is None ): item_cls.creators = item_cls.creators or [] not_nulls = set(adapter_cls.iter_required_fields(model_cls)) if not_nulls: if not item_cls.creators: # now `item_cls.creators` cannot be empty item_cls.creators.append(not_nulls) if item_cls.autoinject_creators: for index, value in enumerate(item_cls.creators): item_cls.creators[index] = value.union(not_nulls) elif not_nulls not in item_cls.creators: item_cls.creators.append(not_nulls) if item_cls.getters_autoconfig is True or ( item_cls.getters_autoconfig is None and item_cls.getters is None ): item_cls.getters = item_cls.getters or [] uniques = adapter_cls.iter_unique_field_combinations(model_cls) for unique_set in uniques: unique_set = set(unique_set) if unique_set not in item_cls.getters: item_cls.getters.append(unique_set) # remove_null_fields automatic configuration --------------------------- if item_cls.remove_null_fields_autoconfig is True or ( item_cls.remove_null_fields is None and item_cls.remove_null_fields_autoconfig is None ): item_cls.remove_null_fields = item_cls.remove_null_fields or set() not_nulls = set(adapter_cls.iter_required_fields(model_cls)) item_cls.remove_null_fields = item_cls.remove_null_fields.union(not_nulls) else: item_cls.remove_null_fields = item_cls.remove_null_fields or set() # completing setup for related classes --------------------------------- item_cls.metadata["setup_completed"] = True for relation in item_cls.relations.values(): relation["item_cls"].complete_setup() item_cls.metadata["setup_completed"] = False # setting up model deleters -------------------------------------------- # validating `unref_x_to_many` for fkey in item_cls.unref_x_to_many: if ( fkey not in item_cls.relations or item_cls.relations[fkey]["relation_type"].is_x_to_one() ): raise UnrefNonXToManyFieldUsed(fkey) other_cls = item_cls.relations[fkey]["item_cls"] fields = list( chain( item_cls.unref_x_to_many[fkey].get("selectors") or [], item_cls.unref_x_to_many[fkey].get("keepers") or [], ) ) basic_field_validation(other_cls, fields, None) # generating model deleters def create_model_deleter(item_cls, selectors, keepers): model_cls = item_cls.model_cls if keepers is None: keepers = set(adapter_cls.get_primary_key_names(model_cls)) if not selectors or not keepers: raise DeleterSelectorsOrKeepersEmpty( "Selectors and keepers cannot be empty" ) for field_name in chain(selectors, keepers): if field_name in item_cls.relations: relation = item_cls.relations[field_name] if relation["relation_type"].is_x_to_many(): raise DeleterXToManyRelationUsed(field_name) return ModelDeleter(model_cls, selectors, keepers) # model_deleter if item_cls.deleter_selectors is not None: item_cls.metadata["model_deleter"] = create_model_deleter( item_cls, item_cls.deleter_selectors, item_cls.deleter_keepers ) # model_unrefs for fkey in item_cls.unref_x_to_many: other_cls = item_cls.relations[fkey]["item_cls"] item_cls.metadata["model_unrefs"][fkey] = create_model_deleter( other_cls, item_cls.unref_x_to_many[fkey]["selectors"], item_cls.unref_x_to_many[fkey]["keepers"], ) # testing aliases ------------------------------------------------------ item_cls.metadata["setup_completed"] = True try: for alias in item_cls.aliases.values(): item_cls._get_real_keys(alias) except: item_cls.metadata["setup_completed"] = False raise item_cls.metadata["setup_completed"] = True