Source code for dataclass_wizard.v1.loaders

from __future__ import annotations

import collections.abc as abc
import dataclasses

from base64 import b64decode
from collections import defaultdict, deque
from dataclasses import is_dataclass, Field, MISSING
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from enum import Enum
from pathlib import Path
from typing import Any, Callable, Literal, NamedTuple, cast
from uuid import UUID

from .decorators import (process_patterned_date_time,
                         setup_recursive_safe_function,
                         setup_recursive_safe_function_for_generic)
from .enums import KeyAction, KeyCase
from .models import Extras, PatternBase, TypeInfo
from ..abstractions import AbstractLoaderGenerator
from ..bases import AbstractMeta, BaseLoadHook, META
from ..class_helper import (create_meta,
                            dataclass_fields,
                            dataclass_field_to_default,
                            dataclass_init_fields,
                            dataclass_init_field_names,
                            get_meta,
                            is_subclass_safe,
                            v1_dataclass_field_to_alias,
                            CLASS_TO_LOAD_FUNC,
                            DATACLASS_FIELD_TO_ALIAS_PATH_FOR_LOAD)
from ..constants import CATCH_ALL, TAG, PY311_OR_ABOVE, PACKAGE_NAME
from ..errors import (JSONWizardError,
                      MissingData,
                      MissingFields,
                      ParseError,
                      UnknownKeysError)
from ..loader_selection import fromdict, get_loader
from ..log import LOG
from ..type_def import DefFactory, JSONObject, NoneType, PyLiteralString, T
# noinspection PyProtectedMember
from ..utils.dataclass_compat import _set_new_attribute
from ..utils.function_builder import FunctionBuilder
from ..utils.object_path import v1_safe_get
from ..utils.string_conv import possible_json_keys
from ..utils.type_conv import (
    as_datetime_v1, as_date_v1, as_int_v1,
    as_time_v1, as_timedelta, TRUTHY_VALUES,
)
from ..utils.typing_compat import (eval_forward_ref_if_needed,
                                   get_args,
                                   get_keys_for_typed_dict,
                                   get_origin_v2,
                                   is_annotated,
                                   is_typed_dict,
                                   is_typed_dict_type_qualifier,
                                   is_union)


# Atomic immutable types which don't require any recursive handling and for which deepcopy
# returns the same object. We can provide a fast-path for these types in asdict and astuple.
_SIMPLE_TYPES = (
    # Common JSON Serializable types
    NoneType,
    bool,
    int,
    float,
    str,
    # Other common types
    complex,
    bytes,
    # TODO support
    # Other types that are also unaffected by deepcopy
    # types.EllipsisType,
    # types.NotImplementedType,
    # types.CodeType,
    # types.BuiltinFunctionType,
    # types.FunctionType,
    # type,
    # range,
    # property,
)


[docs] class LoadMixin(AbstractLoaderGenerator, BaseLoadHook): """ This Mixin class derives its name from the eponymous `json.loads` function. Essentially it contains helper methods to convert JSON strings (or a Python dictionary object) to a `dataclass` which can often contain complex types such as lists, dicts, or even other dataclasses nested within it. Refer to the :class:`AbstractLoader` class for documentation on any of the implemented methods. """ __slots__ = () def __init_subclass__(cls, **kwargs): super().__init_subclass__() setup_default_loader(cls) transform_json_field = None
[docs] @staticmethod def default_load_to(tp: TypeInfo, extras: Extras): # identity: o return tp.v()
[docs] @staticmethod def load_to_str(tp: TypeInfo, extras: Extras): tn = tp.type_name(extras) o = tp.v() if tp.in_optional: # str(v) return f'{tn}({o})' # '' if v is None else str(v) default = "''" if tp.origin is str else f'{tn}()' return f'{default} if {o} is None else {tn}({o})'
[docs] @staticmethod def load_to_int(tp: TypeInfo, extras: Extras): """ Generate code to load a value into an integer field. Current logic to parse (an annotated) ``int`` returns: - ``v`` --> ``v`` is an ``int`` or similarly annotated type. - ``int(v)`` --> ``v`` is a ``str`` value of either a decimal integer (e.g. ``'123'``) or a non-fractional float value (e.g. ``42.0``). - ``as_int(v)`` --> ``v`` is a non-fractional ``float``, or in case of "less common" types / scenarios. Note that empty strings and ``None`` (e.g. null values) are not supported. """ tn = tp.type_name(extras) o = tp.v() tp.ensure_in_locals(extras, as_int=as_int_v1) return (f"{o} if (tp := {o}.__class__) is {tn} " f"else {tn}(" f"f if '.' in {o} and (f := float({o})).is_integer() else {o}" ") if tp is str " f"else as_int({o},tp,{tn})")
# TODO when `in_union`, we already know `o.__class__` # is not `tn`, and we already have a variable `tp`.
[docs] @staticmethod def load_to_float(tp: TypeInfo, extras: Extras): # alias: float(o) return tp.wrap_builtin(float, tp.v(), extras)
[docs] @staticmethod def load_to_bool(tp: TypeInfo, extras: Extras): o = tp.v() tp.ensure_in_locals(extras, __TRUTHY=TRUTHY_VALUES) return (f'{o}.lower() in __TRUTHY ' f'if {o}.__class__ is str ' f'else {o} == 1')
[docs] @staticmethod def load_to_bytes(tp: TypeInfo, extras: Extras): tp.ensure_in_locals(extras, b64decode) return f'b64decode({tp.v()})'
[docs] @classmethod def load_to_bytearray(cls, tp: TypeInfo, extras: Extras): as_bytes = cls.load_to_bytes(tp, extras) return tp.wrap_builtin(bytearray, as_bytes, extras)
[docs] @staticmethod def load_to_none(tp: TypeInfo, extras: Extras): return 'None'
[docs] @staticmethod def load_to_enum(tp: TypeInfo, extras: Extras): # alias: enum_cls(o) return tp.wrap(tp.v(), extras)
[docs] @staticmethod def load_to_uuid(tp: TypeInfo, extras: Extras): # alias: UUID(o) return tp.wrap_builtin(UUID, tp.v(), extras)
[docs] @classmethod def load_to_iterable(cls, tp: TypeInfo, extras: Extras): v, v_next, i_next = tp.v_and_next() gorg = tp.origin # noinspection PyBroadException try: elem_type = tp.args[0] except: elem_type = Any string = cls.get_string_for_annotation( tp.replace(origin=elem_type, i=i_next, index=None), extras) if issubclass(gorg, (set, frozenset)): start_char = '{' end_char = '}' else: start_char = '[' end_char = ']' result = f'{start_char}{string} for {v_next} in {v}{end_char}' return tp.wrap(result, extras)
[docs] @classmethod def load_to_tuple(cls, tp: TypeInfo, extras: Extras): args = tp.args # Determine the code string for the annotation # Check if the `Tuple` appears in the variadic form # i.e. Tuple[str, ...] if args: is_variadic = args[-1] is ... else: # Annotated without args, as simply `tuple` args = (Any, ...) is_variadic = True if is_variadic: # Logic that handles the variadic form of :class:`Tuple`'s, # i.e. ``Tuple[str, ...]`` # # Per `PEP 484`_, only **one** required type is allowed before the # ``Ellipsis``. That is, ``Tuple[int, ...]`` is valid whereas # ``Tuple[int, str, ...]`` would be invalid. `See here`_ for more info. # # .. _PEP 484: https://www.python.org/dev/peps/pep-0484/ # .. _See here: https://github.com/python/typing/issues/180 v, v_next, i_next = tp.v_and_next() # Given `Tuple[T, ...]`, we only need the generated string for `T` string = cls.get_string_for_annotation( tp.replace(origin=args[0], i=i_next, index=None), extras) result = f'[{string} for {v_next} in {v}]' # Wrap because we need to create a tuple from list comprehension force_wrap = True else: string = ', '.join([ cls.get_string_for_annotation( tp.replace(origin=arg, index=k), extras) for k, arg in enumerate(args)]) result = f'({string}, )' force_wrap = False return tp.wrap(result, extras, force=force_wrap)
[docs] @classmethod @setup_recursive_safe_function def load_to_named_tuple(cls, tp: TypeInfo, extras: Extras): fn_gen = extras['fn_gen'] nt_tp = cast(NamedTuple, tp.origin) _locals = extras['locals'] _locals['cls'] = nt_tp _locals['msg'] = "`dict` input is not supported for NamedTuple, use a dataclass instead." req_field_to_assign = {} field_assigns = [] # noinspection PyProtectedMember optional_fields = set(nt_tp._field_defaults) has_optionals = True if optional_fields else False only_optionals = has_optionals and len(optional_fields) == len(nt_tp.__annotations__) num_fields = 0 for field, field_tp in nt_tp.__annotations__.items(): string = cls.get_string_for_annotation( tp.replace(origin=field_tp, index=num_fields), extras) if has_optionals and field in optional_fields: field_assigns.append(string) else: req_field_to_assign[f'__{field}'] = string num_fields += 1 params = ', '.join(req_field_to_assign) with fn_gen.try_(): for field, string in req_field_to_assign.items(): fn_gen.add_line(f'{field} = {string}') if has_optionals: opt_start = len(req_field_to_assign) fn_gen.add_line(f'L = len(v1); has_opt = L > {opt_start}') with fn_gen.if_(f'has_opt'): fn_gen.add_line(f'fields = [{field_assigns.pop(0)}]') for i, string in enumerate(field_assigns, start=opt_start + 1): fn_gen.add_line(f'if L > {i}: fields.append({string})') if only_optionals: fn_gen.add_line(f'return cls(*fields)') else: fn_gen.add_line(f'return cls({params}, *fields)') fn_gen.add_line(f'return cls({params})') with fn_gen.except_(Exception, 'e'): with fn_gen.if_('(e_cls := e.__class__) is IndexError'): # raise `MissingFields`, as required NamedTuple fields # are not present in the input object `o`. fn_gen.add_line("raise_missing_fields(locals(), v1, cls, None)") with fn_gen.if_('e_cls is KeyError and type(v1) is dict'): # Input object is a `dict` # TODO should we support dict for namedtuple? fn_gen.add_line('raise TypeError(msg) from None') # re-raise fn_gen.add_line('raise e from None')
[docs] @classmethod def load_to_named_tuple_untyped(cls, tp: TypeInfo, extras: Extras): # Check if input object is `dict` or `list`. # # Assuming `Point` is a `namedtuple`, this performs # the equivalent logic as: # Point(**x) if isinstance(x, dict) else Point(*x) v = tp.v() star, dbl_star = tp.multi_wrap(extras, 'nt_', f'*{v}', f'**{v}') return f'{dbl_star} if isinstance({v}, dict) else {star}'
@classmethod def _build_dict_comp(cls, tp, v, i_next, k_next, v_next, kt, vt, extras): tp_k_next = tp.replace(origin=kt, i=i_next, prefix='k', index=None) string_k = cls.get_string_for_annotation(tp_k_next, extras) tp_v_next = tp.replace(origin=vt, i=i_next, prefix='v', index=None) string_v = cls.get_string_for_annotation(tp_v_next, extras) return f'{{{string_k}: {string_v} for {k_next}, {v_next} in {v}.items()}}'
[docs] @classmethod def load_to_dict(cls, tp: TypeInfo, extras: Extras): v, k_next, v_next, i_next = tp.v_and_next_k_v() try: kt, vt = tp.args except ValueError: # Annotated without two arguments, # e.g. like `dict[str]` or `dict` kt = vt = Any result = cls._build_dict_comp( tp, v, i_next, k_next, v_next, kt, vt, extras) return tp.wrap(result, extras)
[docs] @classmethod def load_to_defaultdict(cls, tp: TypeInfo, extras: Extras): v, k_next, v_next, i_next = tp.v_and_next_k_v() default_factory: DefFactory | None try: kt, vt = tp.args default_factory = getattr(vt, '__origin__', vt) except ValueError: # Annotated without two arguments, # e.g. like `defaultdict[str]` or `defaultdict` kt = vt = Any default_factory = NoneType result = cls._build_dict_comp( tp, v, i_next, k_next, v_next, kt, vt, extras) return tp.wrap_dd(default_factory, result, extras)
[docs] @classmethod @setup_recursive_safe_function def load_to_typed_dict(cls, tp: TypeInfo, extras: Extras): fn_gen = extras['fn_gen'] req_keys, opt_keys = get_keys_for_typed_dict(tp.origin) result_list = [] # TODO set __annotations__? td_annotations = tp.origin.__annotations__ # Set required keys for the `TypedDict` for k in req_keys: field_tp = td_annotations[k] field_name = repr(k) string = cls.get_string_for_annotation( tp.replace(origin=field_tp, index=field_name), extras) result_list.append(f'{field_name}: {string}') with fn_gen.try_(): fn_gen.add_lines('result = {', *(f' {r},' for r in result_list), '}') # Set optional keys for the `TypedDict` (if they exist) for k in opt_keys: field_tp = td_annotations[k] field_name = repr(k) string = cls.get_string_for_annotation( tp.replace(origin=field_tp, i=2, index=None), extras) with fn_gen.if_(f'(v2 := v1.get({field_name}, MISSING)) is not MISSING'): fn_gen.add_line(f'result[{field_name}] = {string}') fn_gen.add_line('return result') with fn_gen.except_(Exception, 'e'): with fn_gen.if_('type(e) is KeyError'): fn_gen.add_line('name = e.args[0]; e = KeyError(f"Missing required key: {name!r}")') with fn_gen.elif_('not isinstance(v1, dict)'): fn_gen.add_line('e = TypeError("Incorrect type for object")') fn_gen.add_line('raise ParseError(e, v1, {}) from None')
[docs] @classmethod @setup_recursive_safe_function_for_generic def load_to_union(cls, tp: TypeInfo, extras: Extras): fn_gen = extras['fn_gen'] config = extras['config'] actual_cls = extras['cls'] tag_key = config.tag_key or TAG auto_assign_tags = config.auto_assign_tags i = tp.field_i fields = f'fields_{i}' args = tp.args in_optional = NoneType in args _locals = extras['locals'] _locals[fields] = args _locals['tag_key'] = tag_key dataclass_tag_to_lines: dict[str, list] = {} type_checks = [] try_parse_at_end = [] for possible_tp in args: possible_tp = eval_forward_ref_if_needed(possible_tp, actual_cls) tp_new = TypeInfo(possible_tp, field_i=i) tp_new.in_optional = in_optional if possible_tp is NoneType: with fn_gen.if_('v1 is None'): fn_gen.add_line('return None') continue if is_dataclass(possible_tp): # we see a dataclass in `Union` declaration meta = get_meta(possible_tp) tag = meta.tag assign_tags_to_cls = auto_assign_tags or meta.auto_assign_tags cls_name = possible_tp.__name__ if assign_tags_to_cls and not tag: tag = cls_name # We don't want to mutate the base Meta class here if meta is AbstractMeta: create_meta(possible_tp, cls_name, tag=tag) else: meta.tag = cls_name if tag: string = cls.get_string_for_annotation(tp_new, extras) dataclass_tag_to_lines[tag] = [ f'if tag == {tag!r}:', f' return {string}' ] continue elif not config.v1_unsafe_parse_dataclass_in_union: e = ValueError('Cannot parse dataclass types in a Union without ' 'one of the following `Meta` settings:\n\n' ' * `auto_assign_tags = True`\n' f' - Set on class `{extras["cls_name"]}`.\n\n' f' * `tag = "{cls_name}"`\n' f' - Set on class `{possible_tp.__qualname__}`.\n\n' ' * `v1_unsafe_parse_dataclass_in_union = True`\n' f' - Set on class `{extras["cls_name"]}`\n\n' 'For more information, refer to:\n' ' https://dataclass-wizard.readthedocs.io/en/latest/common_use_cases/dataclasses_in_union_types.html') raise e from None string = cls.get_string_for_annotation(tp_new, extras) try_parse_lines = [ 'try:', f' return {string}', 'except Exception:', ' pass', ] # TODO disable for dataclasses if (possible_tp in _SIMPLE_TYPES or is_subclass_safe( get_origin_v2(possible_tp), _SIMPLE_TYPES)): tn = tp_new.type_name(extras) type_checks.extend([ f'if tp is {tn}:', ' return v1' ]) list_to_add = try_parse_at_end else: list_to_add = type_checks list_to_add.extend(try_parse_lines) if dataclass_tag_to_lines: with fn_gen.try_(): fn_gen.add_line(f'tag = v1[tag_key]') with fn_gen.except_(Exception): fn_gen.add_line('pass') with fn_gen.else_(): for lines in dataclass_tag_to_lines.values(): fn_gen.add_lines(*lines) fn_gen.add_line( "raise ParseError(" "TypeError('Object with tag was not in any of Union types')," f"v1,{fields}," "input_tag=tag," "tag_key=tag_key," f"valid_tags={list(dataclass_tag_to_lines)})" ) fn_gen.add_line('tp = type(v1)') if type_checks: fn_gen.add_lines(*type_checks) if try_parse_at_end: fn_gen.add_lines(*try_parse_at_end) # Invalid type for Union fn_gen.add_line("raise ParseError(" "TypeError('Object was not in any of Union types')," f"v1,{fields}," "tag_key=tag_key" ")")
[docs] @staticmethod @setup_recursive_safe_function_for_generic def load_to_literal(tp: TypeInfo, extras: Extras): fn_gen = extras['fn_gen'] fields = f'fields_{tp.field_i}' _locals = extras['locals'] _locals[fields] = frozenset(tp.args) with fn_gen.if_(f'{tp.v()} in {fields}', comment=repr(tp.args)): fn_gen.add_line('return v1') # No such Literal with the value of `o` fn_gen.add_line("e = ValueError('Value not in expected Literal values')") fn_gen.add_line(f'raise ParseError(e, v1, {fields}, ' f'allowed_values=list({fields}))')
# TODO Checks for Literal equivalence, as mentioned here: # https://www.python.org/dev/peps/pep-0586/#equivalence-of-two-literals # extras_cp['locals'][fields] = { # a: type(a) for a in tp.args # } # # with fn_gen.function(fn_name, ['v1'], None, _locals): # # with fn_gen.try_(): # with fn_gen.if_(f'type({tp.v()}) is {fields}[{tp.v()}]'): # fn_gen.add_line('return v1') # # # The value of `o` is in the ones defined for the Literal, but # # also confirm the type matches the one defined for the Literal. # fn_gen.add_line("e = TypeError('Value did not match expected type for the Literal')") # # fn_gen.add_line('raise ParseError(' # f'e, v1, {fields}, ' # 'have_type=type(v1), ' # f'desired_type={fields}[v1], ' # f'desired_value=next(v for v in {fields} if v == v1), ' # f'allowed_values=list({fields})' # ')') # with fn_gen.except_(KeyError): # # No such Literal with the value of `o` # fn_gen.add_line("e = ValueError('Value not in expected Literal values')") # fn_gen.add_line('raise ParseError(' # f'e, v1, {fields}, allowed_values=list({fields})' # f')')
[docs] @staticmethod def load_to_decimal(tp: TypeInfo, extras: Extras): o = tp.v() s = f'str({o}) if {o}.__class__ is float else {o}' return tp.wrap_builtin(Decimal, s, extras)
[docs] @staticmethod def load_to_path(tp: TypeInfo, extras: Extras): # alias: Path(o) return tp.wrap_builtin(Path, tp.v(), extras)
[docs] @classmethod @process_patterned_date_time def load_to_date(cls, tp: TypeInfo, extras: Extras): return cls._load_to_date(tp, extras, date)
[docs] @classmethod @process_patterned_date_time def load_to_datetime(cls, tp: TypeInfo, extras: Extras): return cls._load_to_date(tp, extras, datetime)
[docs] @staticmethod @process_patterned_date_time def load_to_time(tp: TypeInfo, extras: Extras): o = tp.v() tn = tp.type_name(extras, bound=time) tp_time = cast('type[time]', tp.origin) __fromisoformat = f'__{tn}_fromisoformat' tp.ensure_in_locals( extras, __as_time=as_time_v1, **{__fromisoformat: tp_time.fromisoformat} ) if PY311_OR_ABOVE: _parse_iso_string = f'{__fromisoformat}({o})' else: # pragma: no cover _parse_iso_string = f"{__fromisoformat}({o}.replace('Z', '+00:00', 1))" return (f'{_parse_iso_string} if {o}.__class__ is str ' f'else __as_time({o}, {tn})')
@staticmethod def _load_to_date(tp: TypeInfo, extras: Extras, cls: type[date] | type[datetime]): o = tp.v() tn = tp.type_name(extras, bound=cls) tp_date_or_datetime = cast('type[date]', tp.origin) _fromisoformat = f'__{tn}_fromisoformat' _fromtimestamp = f'__{tn}_fromtimestamp' name_to_func = { _fromisoformat: tp_date_or_datetime.fromisoformat, _fromtimestamp: tp_date_or_datetime.fromtimestamp, } if cls is datetime: _as_func = '__as_datetime' name_to_func[_as_func] = as_datetime_v1 else: _as_func = '__as_date' name_to_func[_as_func] = as_date_v1 tp.ensure_in_locals(extras, **name_to_func) if PY311_OR_ABOVE: _parse_iso_string = f'{_fromisoformat}({o})' else: # pragma: no cover _parse_iso_string = f"{_fromisoformat}({o}.replace('Z', '+00:00', 1))" return (f'{_parse_iso_string} if {o}.__class__ is str ' f'else {_as_func}({o}, {_fromtimestamp})')
[docs] @staticmethod def load_to_timedelta(tp: TypeInfo, extras: Extras): # alias: as_timedelta tn = tp.type_name(extras, bound=timedelta) tp.ensure_in_locals(extras, as_timedelta) return f'as_timedelta({tp.v()}, {tn})'
[docs] @staticmethod @setup_recursive_safe_function( fn_name=f'__{PACKAGE_NAME}_from_dict_{{cls_name}}__') def load_to_dataclass(tp: TypeInfo, extras: Extras): load_func_for_dataclass(tp.origin, extras)
[docs] @classmethod def get_string_for_annotation(cls, tp, extras): hooks = cls.__LOAD_HOOKS__ # type_ann = tp.origin type_ann = eval_forward_ref_if_needed(tp.origin, extras['cls']) origin = get_origin_v2(type_ann) name = getattr(origin, '__name__', origin) args = None if is_annotated(type_ann): # Given `Annotated[T, ...]`, we only need `T` type_ann, *field_extras = get_args(type_ann) origin = get_origin_v2(type_ann) name = getattr(origin, '__name__', origin) # Check for Custom Patterns for date / time / datetime for extra in field_extras: if isinstance(extra, PatternBase): extras['pattern'] = extra elif is_typed_dict_type_qualifier(origin): # Given `Required[T]` or `NotRequired[T]`, we only need `T` type_ann = get_args(type_ann)[0] origin = get_origin_v2(type_ann) name = getattr(origin, '__name__', origin) # TypeAliasType: Type aliases are created through # the `type` statement if (value := getattr(origin, '__value__', None)) is not None: type_ann = value origin = get_origin_v2(type_ann) name = getattr(origin, '__name__', origin) # `LiteralString` enforces stricter rules at # type-checking but behaves like `str` at runtime. # TODO maybe add `load_to_literal_string` if origin is PyLiteralString: load_hook = cls.load_to_str origin = str name = 'str' # -> Atomic, immutable types which don't require # any iterative / recursive handling. elif origin in _SIMPLE_TYPES or is_subclass_safe(origin, _SIMPLE_TYPES): load_hook = hooks.get(origin) elif (load_hook := hooks.get(origin)) is not None: try: args = get_args(type_ann) except ValueError: args = Any, # -> Union[x] elif is_union(origin): load_hook = cls.load_to_union args = get_args(type_ann) # Special case for Optional[x], which is actually Union[x, None] if len(args) == 2 and NoneType in args: new_tp = tp.replace(origin=args[0], args=None, name=None) new_tp.in_optional = True string = cls.get_string_for_annotation(new_tp, extras) return f'None if {tp.v()} is None else {string}' # -> Literal[X, Y, ...] elif origin is Literal: load_hook = cls.load_to_literal args = get_args(type_ann) # https://stackoverflow.com/questions/76520264/dataclasswizard-after-upgrading-to-python3-11-is-not-working-as-expected elif origin is Any: load_hook = cls.default_load_to elif is_subclass_safe(origin, tuple) and hasattr(origin, '_fields'): if getattr(origin, '__annotations__', None): # Annotated as a `typing.NamedTuple` subtype load_hook = cls.load_to_named_tuple else: # Annotated as a `collections.namedtuple` subtype load_hook = cls.load_to_named_tuple_untyped elif is_typed_dict(origin): load_hook = cls.load_to_typed_dict elif is_dataclass(origin): # return a dynamically generated `fromdict` # for the `cls` (base_type) load_hook = cls.load_to_dataclass elif is_subclass_safe(origin, Enum): load_hook = cls.load_to_enum elif origin in (abc.Sequence, abc.MutableSequence, abc.Collection): if origin is abc.Sequence: load_hook = cls.load_to_tuple # desired (non-generic) origin type name = 'tuple' origin = tuple # Re-map type arguments to variadic tuple format, # e.g. `Sequence[int]` -> `tuple[int, ...]` try: args = (get_args(type_ann)[0], ...) except (IndexError, ValueError): args = Any, else: load_hook = cls.load_to_iterable # desired (non-generic) origin type name = 'list' origin = list # Get type arguments, e.g. `Sequence[int]` -> `int` try: args = get_args(type_ann) except ValueError: args = Any, elif isinstance(origin, PatternBase): load_hook = origin.load_to_pattern else: # TODO everything should use `get_origin_v2` try: args = get_args(type_ann) except ValueError: args = Any, if load_hook is None: # TODO END for t in hooks: if issubclass(origin, (t,)): load_hook = hooks[t] break tp.origin = origin tp.args = args tp.name = name if load_hook is not None: result = load_hook(tp, extras) return result # No matching hook is found for the type. # TODO do we want to add a `Meta` field to not raise # an error but perform a default action? err = TypeError('Provided type is not currently supported.') pe = ParseError( err, origin, type_ann, resolution='Consider decorating the class with `@dataclass`', unsupported_type=origin ) raise pe from None
[docs] def setup_default_loader(cls=LoadMixin): """ Set up the default type hooks to use when converting `str` (json) or a Python `dict` object to a `dataclass` instance. Note: `cls` must be :class:`LoadMixIn` or a subclass of it. """ # TODO maybe `dict.update` might be better? # Simple types cls.register_load_hook(str, cls.load_to_str) cls.register_load_hook(float, cls.load_to_float) cls.register_load_hook(bool, cls.load_to_bool) cls.register_load_hook(int, cls.load_to_int) cls.register_load_hook(bytes, cls.load_to_bytes) cls.register_load_hook(bytearray, cls.load_to_bytearray) cls.register_load_hook(NoneType, cls.load_to_none) # Complex types cls.register_load_hook(UUID, cls.load_to_uuid) cls.register_load_hook(set, cls.load_to_iterable) cls.register_load_hook(frozenset, cls.load_to_iterable) cls.register_load_hook(deque, cls.load_to_iterable) cls.register_load_hook(list, cls.load_to_iterable) cls.register_load_hook(tuple, cls.load_to_tuple) cls.register_load_hook(defaultdict, cls.load_to_defaultdict) cls.register_load_hook(dict, cls.load_to_dict) cls.register_load_hook(Decimal, cls.load_to_decimal) cls.register_load_hook(Path, cls.load_to_path) # Dates and times cls.register_load_hook(datetime, cls.load_to_datetime) cls.register_load_hook(time, cls.load_to_time) cls.register_load_hook(date, cls.load_to_date) cls.register_load_hook(timedelta, cls.load_to_timedelta)
[docs] def check_and_raise_missing_fields( _locals, o, cls, fields: tuple[Field, ...] | None): if fields is None: # named tuple nt_tp = cast(NamedTuple, cls) # noinspection PyProtectedMember field_to_default = nt_tp._field_defaults fields = tuple([ dataclasses.field( default=field_to_default.get(field, MISSING), ) for field in cls.__annotations__]) for field, name in zip(fields, cls.__annotations__): field.name = name missing_fields = [f for f in cls.__annotations__ if f'__{f}' not in _locals and f not in field_to_default] missing_keys = None else: missing_fields = [f.name for f in fields if f.init and f'__{f.name}' not in _locals and (f.default is MISSING and f.default_factory is MISSING)] missing_keys = [v1_dataclass_field_to_alias(cls).get(field, [field])[0] for field in missing_fields] raise MissingFields( None, o, cls, fields, None, missing_fields, missing_keys ) from None
[docs] def load_func_for_dataclass( cls: type, extras: Extras | None = None, loader_cls=LoadMixin, base_meta_cls: type = AbstractMeta, ) -> Callable[[JSONObject], T] | None: # Tuple describing the fields of this dataclass. fields = dataclass_fields(cls) cls_init_fields = dataclass_init_fields(cls, True) cls_init_field_names = dataclass_init_field_names(cls) field_to_default = dataclass_field_to_default(cls) has_defaults = True if field_to_default else False # Get the loader for the class, or create a new one as needed. cls_loader = get_loader(cls, base_cls=loader_cls, v1=True) cls_name = cls.__name__ fn_name = f'__{PACKAGE_NAME}_from_dict_{cls_name}__' # Get the meta config for the class, or the default config otherwise. meta = get_meta(cls, base_meta_cls) if extras is None: # we are being run for the main dataclass is_main_class = True # If the `recursive` flag is enabled and a Meta config is provided, # apply the Meta recursively to any nested classes. # # Else, just use the base `AbstractMeta`. config: META = meta if meta.recursive else base_meta_cls # Initialize the FuncBuilder fn_gen = FunctionBuilder() new_locals = { 'cls': cls, 'fields': fields, } extras: Extras = { 'config': config, 'cls': cls, 'cls_name': cls_name, 'locals': new_locals, 'recursion_guard': {cls: fn_name}, 'fn_gen': fn_gen, } _globals = { 'MISSING': MISSING, 'ParseError': ParseError, 'raise_missing_fields': check_and_raise_missing_fields, 're_raise': re_raise, } # we are being run for a nested dataclass else: is_main_class = False # config for nested dataclasses config = extras['config'] # Initialize the FuncBuilder fn_gen = extras['fn_gen'] if config is not base_meta_cls: # we want to apply the meta config from the main dataclass # recursively. meta = meta | config meta.bind_to(cls, is_default=False) new_locals = extras['locals'] new_locals['fields'] = fields # TODO need a way to auto-magically do this extras['cls'] = cls extras['cls_name'] = cls_name key_case: KeyCase | None = cls_loader.transform_json_field auto_key_case = key_case is KeyCase.AUTO field_to_aliases = v1_dataclass_field_to_alias(cls) check_aliases = True if field_to_aliases else False field_to_paths = DATACLASS_FIELD_TO_ALIAS_PATH_FOR_LOAD[cls] has_alias_paths = True if field_to_paths else False # Fix for using `auto_assign_tags` and `raise_on_unknown_json_key` together # See https://github.com/rnag/dataclass-wizard/issues/137 has_tag_assigned = meta.tag is not None if (has_tag_assigned and # Ensure `tag_key` isn't a dataclass field, # to avoid issues with our logic. # See https://github.com/rnag/dataclass-wizard/issues/148 meta.tag_key not in cls_init_field_names): expect_tag_as_unknown_key = True else: expect_tag_as_unknown_key = False on_unknown_key = meta.v1_on_unknown_key catch_all_field: str | None = field_to_aliases.pop(CATCH_ALL, None) has_catch_all = catch_all_field is not None if has_catch_all: pre_assign = 'i+=1; ' catch_all_field_stripped = catch_all_field.rstrip('?') catch_all_idx = cls_init_field_names.index(catch_all_field_stripped) # remove catch all field from list, so we don't iterate over it del cls_init_fields[catch_all_idx] else: pre_assign = '' catch_all_field_stripped = catch_all_idx = None if on_unknown_key is not None: should_raise = on_unknown_key is KeyAction.RAISE should_warn = on_unknown_key is KeyAction.WARN if should_warn or should_raise: pre_assign = 'i+=1; ' set_aliases = True else: set_aliases = has_catch_all else: should_raise = should_warn = None set_aliases = has_catch_all if set_aliases: if expect_tag_as_unknown_key: # add an alias for the tag key, so we don't # capture or raise an error when we see it aliases = {meta.tag_key} else: aliases = set() new_locals['aliases'] = aliases else: aliases = None if has_alias_paths: new_locals['safe_get'] = v1_safe_get with fn_gen.function(fn_name, ['o'], MISSING, new_locals): if (_pre_from_dict := getattr(cls, '_pre_from_dict', None)) is not None: new_locals['__pre_from_dict__'] = _pre_from_dict fn_gen.add_line('o = __pre_from_dict__(o)') # Need to create a separate dictionary to copy over the constructor # args, as we don't want to mutate the original dictionary object. if has_defaults: fn_gen.add_line('init_kwargs = {}') if pre_assign: fn_gen.add_line('i = 0') vars_for_fields = [] if cls_init_fields: with fn_gen.try_(): if expect_tag_as_unknown_key and pre_assign: with fn_gen.if_(f'{meta.tag_key!r} in o'): fn_gen.add_line('i+=1') val = 'v1' _val_is_found = f'{val} is not MISSING' for i, f in enumerate(cls_init_fields): name = f.name var = f'__{name}' has_default = name in field_to_default val_is_found = _val_is_found if (check_aliases and (_aliases := field_to_aliases.get(name)) is not None): if len(_aliases) == 1: alias = _aliases[0] if set_aliases: aliases.add(alias) f_assign = f'field={name!r}; {val}=o.get({alias!r}, MISSING)' else: f_assign = None # add possible JSON keys if set_aliases: aliases.update(_aliases) fn_gen.add_line(f'field={name!r}') condition = [f'({val} := o.get({alias!r}, MISSING)) is not MISSING' for alias in _aliases] val_is_found = '(' + '\n or '.join(condition) + ')' elif (has_alias_paths and (paths := field_to_paths.get(name)) is not None): if len(paths) == 1: path = paths[0] # add the first part (top-level key) of the path if set_aliases: aliases.add(path[0]) f_assign = f'field={name!r}; {val}=safe_get(o, {path!r}, {not has_default})' else: f_assign = None fn_gen.add_line(f'field={name!r}') condition = [] last_idx = len(paths) - 1 for k, path in enumerate(paths): # add the first part (top-level key) of each path if set_aliases: aliases.add(path[0]) if k == last_idx: condition.append( f'({val} := safe_get(o, {path!r}, {not has_default})) is not MISSING') else: condition.append( f'({val} := safe_get(o, {path!r}, False)) is not MISSING') val_is_found = '(' + '\n or '.join(condition) + ')' # TODO raise some useful message like (ex. on IndexError): # Field "my_str" of type tuple[float, str] in A2 has invalid value ['123'] elif key_case is None: if set_aliases: aliases.add(name) f_assign = f'field={name!r}; {val}=o.get(field, MISSING)' elif auto_key_case: f_assign = None _aliases = possible_json_keys(name) if set_aliases: # add field name itself aliases.add(name) # add possible JSON keys aliases.update(_aliases) fn_gen.add_line(f'field={name!r}') condition = [f'({val} := o.get(field, MISSING)) is not MISSING'] for alias in _aliases: condition.append(f'({val} := o.get({alias!r}, MISSING)) is not MISSING') val_is_found = '(' + '\n or '.join(condition) + ')' else: alias = key_case(name) if set_aliases: aliases.add(alias) if alias != name: field_to_aliases[name] = (alias, ) f_assign = f'field={name!r}; {val}=o.get({alias!r}, MISSING)' string = generate_field_code(cls_loader, extras, f, i) if f_assign is not None: fn_gen.add_line(f_assign) if has_default: with fn_gen.if_(val_is_found): fn_gen.add_line(f'{pre_assign}init_kwargs[field] = {string}') else: # TODO confirm this is ok # vars_for_fields.append(f'{name}={var}') vars_for_fields.append(var) with fn_gen.if_(val_is_found): fn_gen.add_line(f'{pre_assign}{var} = {string}') # create a broad `except Exception` block, as we will be # re-raising all exception(s) as a custom `ParseError`. with fn_gen.except_(Exception, 'e', ParseError): fn_gen.add_line("re_raise(e, cls, o, fields, field, locals().get('v1'))") if has_catch_all: catch_all_def = f'{{k: o[k] for k in o if k not in aliases}}' if catch_all_field.endswith('?'): # Default value with fn_gen.if_('len(o) != i'): fn_gen.add_line(f'init_kwargs[{catch_all_field_stripped!r}] = {catch_all_def}') else: var = f'__{catch_all_field_stripped}' fn_gen.add_line(f'{var} = {{}} if len(o) == i else {catch_all_def}') vars_for_fields.insert(catch_all_idx, var) elif set_aliases: # warn / raise on unknown key line = 'extra_keys = set(o) - aliases' with fn_gen.if_('len(o) != i'): fn_gen.add_line(line) if should_raise: # Raise an error here (if needed) new_locals['UnknownKeysError'] = UnknownKeysError fn_gen.add_line("raise UnknownKeysError(extra_keys, o, cls, fields) from None") elif should_warn: # Show a warning here new_locals['LOG'] = LOG fn_gen.add_line(r"LOG.warning('Found %d unknown keys %r not mapped to the dataclass schema.\n" r" Class: %r\n Dataclass fields: %r', " "len(extra_keys), extra_keys, " "cls.__qualname__, [f.name for f in fields])") # Now pass the arguments to the constructor method, and return # the new dataclass instance. If there are any missing fields, # we raise them here. if has_defaults: vars_for_fields.append('**init_kwargs') init_parts = ', '.join(vars_for_fields) with fn_gen.try_(): fn_gen.add_line(f"return cls({init_parts})") with fn_gen.except_(UnboundLocalError): # raise `MissingFields`, as required dataclass fields # are not present in the input object `o`. fn_gen.add_line("raise_missing_fields(locals(), o, cls, fields)") # Save the load function for the main dataclass, so we don't need to run # this logic each time. if is_main_class: # noinspection PyUnboundLocalVariable functions = fn_gen.create_functions(_globals) cls_fromdict = functions[fn_name] # Check if the class has a `from_dict`, and it's # a class method bound to `fromdict`. if ((from_dict := getattr(cls, 'from_dict', None)) is not None and getattr(from_dict, '__func__', None) is fromdict): LOG.debug("setattr(%s, 'from_dict', %s)", cls_name, fn_name) _set_new_attribute(cls, 'from_dict', cls_fromdict) _set_new_attribute( cls, f'__{PACKAGE_NAME}_from_dict__', cls_fromdict) LOG.debug( "setattr(%s, '__%s_from_dict__', %s)", cls_name, PACKAGE_NAME, fn_name) # TODO in `v1`, we will use class attribute (set above) instead. CLASS_TO_LOAD_FUNC[cls] = cls_fromdict return cls_fromdict
[docs] def generate_field_code(cls_loader: LoadMixin, extras: Extras, field: Field, field_i: int) -> 'str | TypeInfo': cls = extras['cls'] field_type = field.type = eval_forward_ref_if_needed(field.type, cls) try: return cls_loader.get_string_for_annotation( TypeInfo(field_type, field_i=field_i), extras ) # except Exception as e: # re_raise(e, cls, None, dataclass_init_fields(cls), field, None) except ParseError as pe: pe.class_name = cls # noinspection PyPropertyAccess pe.field_name = field.name raise pe from None
[docs] def re_raise(e, cls, o, fields, field, value): # If the object `o` is None, then raise an error with # the relevant info included. if o is None: raise MissingData(cls) from None # Check if the object `o` is some other type than what we expect - # for example, we could be passed in a `list` type instead. if not isinstance(o, dict): base_err = TypeError('Incorrect type for `from_dict()`') e = ParseError(base_err, o, dict, cls, desired_type=dict) add_fields = True if type(e) is not ParseError: if isinstance(e, JSONWizardError): add_fields = False else: tp = getattr(next((f for f in fields if f.name == field), None), 'type', Any) e = ParseError(e, value, tp) # We run into a parsing error while loading the field value; # Add additional info on the Exception object before re-raising it. # # First confirm these values are not already set by an # inner dataclass. If so, it likely makes it easier to # debug the cause. Note that this should already be # handled by the `setter` methods. if add_fields: e.class_name, e.fields, e.field_name, e.json_object = cls, fields, field, o else: e.class_name, e.field_name, e.json_object = cls, field, o raise e from None