Source code for dataclass_wizard.parsers

__all__ = ['IdentityParser',
           'SingleArgParser',
           'Parser',
           'RecursionSafeParser',
           'PatternedDTParser',
           'LiteralParser',
           'UnionParser',
           'OptionalParser',
           'IterableParser',
           'TupleParser',
           'VariadicTupleParser',
           'NamedTupleParser',
           'NamedTupleUntypedParser',
           'MappingParser',
           'DefaultDictParser',
           'TypedDictParser']

from dataclasses import dataclass, InitVar, is_dataclass
from typing import (
    Type, Any, Optional, Tuple, Dict, Iterable, Callable, List
)

from .abstractions import AbstractParser
from .bases import AbstractMeta
from .class_helper import get_meta, _META
from .constants import TAG
from .errors import ParseError
from .models import PatternedDT, Extras
from .type_def import (
    FrozenKeys, NoneType, DefFactory,
    T, M, S, DD, LSQ, N, NT, DT
)
from .utils.typing_compat import (
    get_origin, get_args,
    get_keys_for_typed_dict, eval_forward_ref_if_needed)


# Type defs
GetParserType = Callable[[Type[T], Type, Extras], AbstractParser]
LoadHookType = Callable[[Any], T]
TupleOfParsers = Tuple[AbstractParser, ...]


[docs] @dataclass class IdentityParser(AbstractParser[Type[T], T]): __slots__ = () def __call__(self, o: Any) -> T: return o
[docs] @dataclass class SingleArgParser(AbstractParser[Type[T], T]): __slots__ = ('hook', ) hook: LoadHookType # noinspection PyDataclass def __post_init__(self, *_): if not self.hook: self.hook = lambda o: o def __call__(self, o: Any) -> T: return self.hook(o)
[docs] @dataclass class Parser(AbstractParser[T, T]): __slots__ = ('hook', ) hook: Callable[[Any, type[T]], T] def __call__(self, o: Any) -> T: return self.hook(o, self.base_type)
[docs] @dataclass class RecursionSafeParser(AbstractParser): """ Parser to handle cyclic or self-referential dataclasses. For example:: @dataclass class A: a: A | None = None instance = fromdict(A, {'a': {'a': {'a': None}}}) """ __slots__ = ('extras', 'hook') extras: Extras hook: Optional[LoadHookType]
[docs] def load_hook_func(self) -> LoadHookType: from .loaders import load_func_for_dataclass return load_func_for_dataclass( self.base_type, is_main_class=False, config=self.extras['config'] )
# TODO: decorating `load_hook_func` with `@cached_property` could # be an alternate, bit cleaner approach. def __call__(self, o: Any) -> T: load_hook = self.hook if load_hook is None: load_hook = self.hook = self.load_hook_func() return load_hook(o)
[docs] @dataclass class LiteralParser(AbstractParser[M, M]): __slots__ = ('value_to_type', ) base_type: type[M] # noinspection PyDataclass def __post_init__(self, *_): self.value_to_type = { val: type(val) for val in get_args(self.base_type) } def __contains__(self, item) -> bool: """ Return true if the LiteralParser is expected to handle the specified item type. Checks that the item is incorporated in the given expected values of the Literal. """ return item in self.value_to_type def __call__(self, o: Any) -> M: """ Checks for Literal equivalence, as mentioned here: https://www.python.org/dev/peps/pep-0586/#equivalence-of-two-literals """ try: type_does_not_match = type(o) is not self.value_to_type[o] except KeyError: # No such Literal with the value of `o` e: Exception = ValueError('Value not in expected Literal values') raise ParseError( e, o, self.base_type, allowed_values=list(self.value_to_type)) else: # The value of `o` is in the ones defined for the Literal, but # also confirm the type matches the one defined for the Literal. if type_does_not_match: expected_val = next(v for v in self.value_to_type if v == o) # pragma: no branch e = TypeError( 'Value did not match expected type for the Literal') raise ParseError( e, o, self.base_type, have_type=type(o), desired_type=self.value_to_type[o], desired_value=expected_val, allowed_values=list(self.value_to_type)) return o
[docs] @dataclass class PatternedDTParser(AbstractParser[PatternedDT, DT]): __slots__ = ('hook', ) base_type: PatternedDT # noinspection PyDataclass def __post_init__(self, _cls: Type, extras: Extras, *_): if not isinstance(self.base_type, PatternedDT): dt_cls = self.base_type self.base_type = extras['pattern'] self.base_type.cls = dt_cls self.hook = self.base_type.get_transform_func() def __call__(self, date_string: str) -> DT: try: return self.hook(date_string) except ValueError as e: raise ParseError( e, date_string, self.base_type.cls, pattern=self.base_type.pattern )
[docs] @dataclass class OptionalParser(AbstractParser[T, Optional[T]]): __slots__ = ('parser', ) get_parser: InitVar[GetParserType] def __post_init__(self, cls: Type, extras: Extras, get_parser: GetParserType): self.parser: AbstractParser = getattr( p := get_parser(self.base_type, cls, extras), '__call__', p ) def __contains__(self, item): """Check if parser is expected to handle the specified item type.""" if type(item) is NoneType: return True return super().__contains__(item) def __call__(self, o: Any) -> Optional[T]: if o is None: return o return self.parser(o)
[docs] @dataclass class UnionParser(AbstractParser[Tuple[Type[T], ...], Optional[T]]): __slots__ = ('parsers', 'tag_to_parser', 'tag_key') base_type: Tuple[Type[T], ...] get_parser: InitVar[GetParserType] def __post_init__(self, cls: Type, extras: Extras, get_parser: GetParserType): # Tag key to search for when a dataclass is in a `Union` with # other types. config = extras.get('config') if config: self.tag_key: str = config.tag_key or TAG auto_assign_tags = config.auto_assign_tags else: self.tag_key = TAG auto_assign_tags = False parsers_list = [] self.tag_to_parser = {} for t in self.base_type: t = eval_forward_ref_if_needed(t, cls) if t is not NoneType: parser = get_parser(t, cls, extras) if isinstance(parser, AbstractParser): parsers_list.append(parser) elif is_dataclass(t): meta = get_meta(t) tag = meta.tag if not tag and (auto_assign_tags or meta.auto_assign_tags): cls_name = t.__name__ tag = cls_name # We don't want to mutate the base Meta class here if meta is AbstractMeta: from .bases_meta import BaseJSONWizardMeta cls_dict = {'__slots__': (), 'tag': tag} # noinspection PyTypeChecker meta: type[M] = type(cls_name + 'Meta', (BaseJSONWizardMeta, ), cls_dict) _META[t] = meta else: meta.tag = cls_name if tag: # TODO see if we can use a mapping of dataclass type to # load func (maybe one passed in to __post_init__), # rather than generating one on the fly like this. self.tag_to_parser[tag] = parser self.parsers = tuple(parsers_list) def __contains__(self, item): """Check if parser is expected to handle the specified item type.""" return type(item) in self.base_type def __call__(self, o: Any) -> Optional[T]: if o is None: return o for parser in self.parsers: if o in parser: return parser(o) # Attempt to parse to the desired dataclass type, using the "tag" # field in the input dictionary object. try: tag = o[self.tag_key] except (TypeError, KeyError): # Invalid type (`o` is not a dictionary object) or no such key. pass else: try: return self.tag_to_parser[tag](o) except KeyError: raise ParseError( TypeError('Object with tag was not in any of Union types'), o, [p.base_type for p in self.parsers], input_tag=tag, tag_key=self.tag_key, valid_tags=list(self.tag_to_parser.keys())) raise ParseError( TypeError('Object was not in any of Union types'), o, [p.base_type for p in self.parsers], tag_key=self.tag_key )
[docs] @dataclass class IterableParser(AbstractParser[Type[LSQ], LSQ]): """ Parser for a :class:`list`, :class:`set`, :class:`frozenset`, :class:`deque`, or a subclass of either type. """ __slots__ = ('hook', 'elem_parser') base_type: Type[LSQ] hook: Callable[[Iterable, Type[LSQ], AbstractParser], LSQ] get_parser: InitVar[GetParserType] def __post_init__(self, cls: Type, extras: Extras, get_parser: GetParserType): # Get the subscripted element type # ex. `List[str]` -> `str` try: elem_type, = get_args(self.base_type) except ValueError: elem_type = Any # Base type of the object which is instantiable # ex. `List[str]` -> `list` self.base_type = get_origin(self.base_type) self.elem_parser = getattr( p := get_parser(elem_type, cls, extras), '__call__', p, ) def __call__(self, o: Iterable) -> LSQ: """ Load an object `o` into a new object of type `base_type`. See the declaration of :var:`LSQ` for more info. """ return self.hook(o, self.base_type, self.elem_parser)
[docs] @dataclass class TupleParser(AbstractParser[Type[S], S]): """ Parser for subscripted and un-subscripted :class:`Tuple`'s. See :class:`VariadicTupleParser` for the parser that handles the variadic form, i.e. ``Tuple[str, ...]`` """ __slots__ = ('hook', 'elem_parsers', 'total_count', 'required_count', 'elem_types') # Base type of the object which is instantiable # ex. `Tuple[bool, int]` -> `tuple` base_type: Type[S] hook: Callable[[Any, Type[S], Optional[TupleOfParsers]], S] get_parser: InitVar[GetParserType] def __post_init__(self, cls: Type, extras: Extras, get_parser: GetParserType): # Get the subscripted values # ex. `Tuple[bool, int]` -> (bool, int) self.elem_types = elem_types = get_args(self.base_type) self.base_type = get_origin(self.base_type) # A collection with a parser for each type argument elem_parsers = tuple(get_parser(t, cls, extras) for t in elem_types) # Total count is generally the number of type arguments to `Tuple`, but # can be `Infinity` when a `Tuple` appears in its un-subscripted form. self.total_count: N = len(elem_parsers) or float('inf') # Minimum number of *required* type arguments # Check for the count of parsers which don't handle `NoneType` - # this should exclude the parsers for `Optional` or `Union` types # that have `None` in the list of args. self.required_count: int = len(tuple(p for p in elem_parsers if not isinstance(p, AbstractParser) or None not in p)) self.elem_parsers = elem_parsers or None def __call__(self, o: S) -> S: """ Load an object `o` into a new object of type `base_type` (generally a :class:`tuple` or a sub-class of one) """ # Confirm that the number of arguments in `o` matches the count in the # typed annotation. if not self.required_count <= len(o) <= self.total_count: e = TypeError('Wrong number of elements.') if self.required_count != self.total_count: desired_count = f'{self.required_count} - {self.total_count}' else: desired_count = str(self.total_count) # self.elem_parsers can be None at this moment elem_parsers_types = [getattr(p, 'base_type', tp) for p, tp in zip(self.elem_parsers, self.elem_types)] \ if self.elem_parsers else self.elem_types raise ParseError( e, o, elem_parsers_types, desired_count=desired_count, actual_count=len(o)) return self.hook(o, self.base_type, self.elem_parsers)
[docs] @dataclass class VariadicTupleParser(TupleParser): """ Parser that handles the variadic form of :class:`Tuple`'s, i.e. ``Tuple[str, ...]`` Per `PEP 484`_, only **one** required type is allowed before the ``Ellipsis``. That is, ``Tuple[int, ...]`` is valid whereas ``Tuple[int, str, ...]`` would be invalid. `See here`_ for more info. .. _PEP 484: https://www.python.org/dev/peps/pep-0484/ .. _See here: https://github.com/python/typing/issues/180 """ __slots__ = ('first_elem_parser', ) def __post_init__(self, cls: Type, extras: Extras, get_parser: GetParserType): # Get the subscripted values # ex. `Tuple[str, ...]` -> (str, ) elem_types = get_args(self.base_type) # Base type of the object which is instantiable # ex. `Tuple[bool, int]` -> `tuple` self.base_type = get_origin(self.base_type) # A one-element tuple containing the parser for the first type # argument. # Given `Tuple[T, ...]`, we only need a parser for `T` self.first_elem_parser: Tuple[AbstractParser] self.first_elem_parser = get_parser(elem_types[0], cls, extras), # Total count should be `Infinity` here, since the variadic form # accepts any number of possible arguments. self.total_count: N = float('inf') self.required_count = 0 def __call__(self, o: M) -> M: """ Load an object `o` into a new object of type `base_type` (generally a :class:`tuple` or a sub-class of one) """ self.elem_parsers = self.first_elem_parser * len(o) return super().__call__(o)
[docs] @dataclass class NamedTupleParser(AbstractParser[tuple, NT]): __slots__ = ('hook', 'field_to_parser', 'field_parsers') hook: Callable[ [Any, type[tuple], Optional['FieldToParser'], List[AbstractParser]], NT ] get_parser: InitVar[GetParserType] def __post_init__(self, cls: Type, extras: Extras, get_parser: GetParserType): # Get the field annotations for the `NamedTuple` type type_anns: Dict[str, type[T]] = self.base_type.__annotations__ self.field_to_parser: Optional['FieldToParser'] = { f: getattr(p := get_parser(ftype, cls, extras), '__call__', p) for f, ftype in type_anns.items() } self.field_parsers = list(self.field_to_parser.values()) def __call__(self, o: Any) -> NT: """ Load a dictionary or list to a `NamedTuple` sub-class (or an un-annotated `namedtuple`) """ return self.hook(o, self.base_type, self.field_to_parser, self.field_parsers)
[docs] @dataclass class NamedTupleUntypedParser(AbstractParser[tuple, NT]): __slots__ = ('hook', 'dict_parser', 'list_parser') hook: Callable[[Any, Type[tuple], AbstractParser, AbstractParser], NT] get_parser: InitVar[GetParserType] def __post_init__(self, cls: Type, extras: Extras, get_parser: GetParserType): self.dict_parser = get_parser(dict, cls, extras).__call__ self.list_parser = get_parser(list, cls, extras).__call__ def __call__(self, o: Any) -> NT: """ Load a dictionary or list to a `NamedTuple` sub-class (or an un-annotated `namedtuple`) """ return self.hook(o, self.base_type, self.dict_parser, self.list_parser)
[docs] @dataclass class MappingParser(AbstractParser[Type[M], M]): __slots__ = ('hook', 'key_parser', 'val_parser', 'val_type') base_type: Type[M] hook: Callable[[Any, Type[M], AbstractParser, AbstractParser], M] get_parser: InitVar[GetParserType] def __post_init__(self, cls: Type, extras: Extras, get_parser: GetParserType): try: key_type, val_type = get_args(self.base_type) except ValueError: key_type = val_type = Any # Base type of the object which is instantiable # ex. `Dict[str, Any]` -> `dict` self.base_type: Type[M] = get_origin(self.base_type) self.val_type = val_type val_parser = get_parser(val_type, cls, extras) self.key_parser = getattr(p := get_parser(key_type, cls, extras), '__call__', p) self.val_parser = getattr(val_parser, '__call__', val_parser) def __call__(self, o: M) -> M: return self.hook(o, self.base_type, self.key_parser, self.val_parser)
[docs] @dataclass class DefaultDictParser(MappingParser[DD]): __slots__ = ('default_factory', ) # Override the type annotations here base_type: Type[DD] hook: Callable[ [Any, Type[DD], DefFactory, AbstractParser, AbstractParser], DD] def __post_init__(self, cls: Type, extras: Extras, get_parser: GetParserType): super().__post_init__(cls, extras, get_parser) # The default factory argument to pass to the `defaultdict` subclass val_type = self.val_type val_base_type = getattr(val_type, '__origin__', val_type) self.default_factory: DefFactory = val_base_type def __call__(self, o: DD) -> DD: return self.hook(o, self.base_type, self.default_factory, self.key_parser, self.val_parser)
[docs] @dataclass class TypedDictParser(AbstractParser[Type[M], M]): __slots__ = ('hook', 'key_to_parser', 'required_keys', 'optional_keys') base_type: Type[M] hook: Callable[[Any, Type[M], 'FieldToParser', FrozenKeys, FrozenKeys], M] get_parser: InitVar[GetParserType] def __post_init__(self, cls: Type, extras: Extras, get_parser: GetParserType): self.key_to_parser: 'FieldToParser' = { k: getattr(p := get_parser(v, cls, extras), '__call__', p) for k, v in self.base_type.__annotations__.items() } self.required_keys, self.optional_keys = get_keys_for_typed_dict( self.base_type ) def __call__(self, o: M) -> M: try: return self.hook(o, self.base_type, self.key_to_parser, self.required_keys, self.optional_keys) except KeyError as e: err: Exception = KeyError(f'Missing required key: {e.args[0]}') raise ParseError(err, o, self.base_type) except Exception: if not isinstance(o, dict): err = TypeError('Incorrect type for object') raise ParseError( err, o, self.base_type, desired_type=self.base_type) else: raise