Source code for xoutil.params

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------
# Copyright (c) Merchise Autrement [~º/~] and Contributors
# All rights reserved.
#
# This is free software; you can do what the LICENCE file allows you to.
#

'''Tools for managing function arguments.

Process function arguments could be messy when a flexible schema is needed.
With this module you can outline parameters schema using a smart way of
processing actual arguments:

A parameter row (see `ParamSchemeRow`:class:), allow several keywords IDs (one
is required used as the final identifier for the actual argument). Also
integer IDs expressing logical order for positional argument passing (negative
values are for right-to-left indexing, like in sequences).  Several values
means several possibilities.

.. versionadded:: 1.8.0

'''

from __future__ import (division as _py3_division,
                        print_function as _py3_print,
                        absolute_import as _py3_import)


#: The maximum number of positional arguments allowed when calling a function.
MAX_ARG_COUNT = 1024 * 1024    # just any large number

from xoutil.symbols import Undefined    # used implicitly for absent default


[docs]def issue_9137(args): '''Parse arguments for methods, fixing `issue 9137`__ (self ambiguity). There are methods that expect 'self' as valid keyword argument, this is not possible if this name is used explicitly:: def update(self, *args, **kwds): ... To solve this, declare the arguments as ``method_name(*args, **kwds)``, and in the function code:: self, args = issue_9137(args) :returns: (self, remainder positional arguments in a tuple) .. versionadded:: 1.8.0 __ https://bugs.python.org/issue9137 ''' self = args[0] # Issue 9137 args = args[1:] return self, args
[docs]def check_count(args, low, high=MAX_ARG_COUNT, caller=None): '''Check the positional arguments actual count against constrains. :param args: The args to check count, normally is a tuple, but an integer is directly accepted. :param low: Integer expressing the minimum count allowed. :param high: Integer expressing the maximum count allowed. :param caller: Name of the function issuing the check, its value is used only for error reporting. .. versionadded:: 1.8.0 ''' from xoutil.eight import integer_types # TODO: Shouldn't we use the TypeError and ValueError? assert isinstance(low, integer_types) and low >= 0 assert isinstance(high, integer_types) and high >= low if isinstance(args, int): count = args if count < 0: msg = "check_count() don't accept a negative argument count: {}" raise ValueError(msg.format(count)) else: count = len(args) if count < low: error = True adv = 'exactly' if low == high else 'at least' if low == 1: aux = '{} one argument'.format(adv) else: aux = '{} {} arguments'.format(adv, low) elif count > high: error = True if low == high: if low == 0: aux = 'no arguments' elif low == 1: aux = 'exactly one argument' else: aux = 'exactly {} arguments'.format(low) elif high == 1: aux = 'at most one argument' else: aux = 'at most {} arguments'.format(high) else: error = False if error: if caller: name = '{}()'.format(caller) else: name = 'called function or method' raise TypeError('{} takes {} ({} given)'.format(name, aux, count))
[docs]def check_default(absent=Undefined): '''Get a default value passed as a last excess positional argument. :param absent: The value to be used by default if no one is given. Defaults to `~xoutil.symbols.Undefined`:obj:. For example:: def get(self, name, *default): from xoutil.params import check_default, Undefined if name in self.inner_data: return self.inner_data[name] elif check_default()(*default) is not Undefined: return default[0] else: raise KeyError(name) .. versionadded:: 1.8.0 ''' def default(res=absent): return res return default
[docs]def single(args, kwds): '''Return a true value only when a unique argument is given. When needed, the most suitable result will be wrapped using the `~xoutil.fp.option.Maybe`:class:\ . .. versionadded:: 1.8.0 ''' from xoutil.fp.option import Just, Wrong, take if len(args) == 1 and not kwds: res = take(args[0]) if not res: res = Just(res) res = Just(res) elif not args and len(kwds) == 1: res = kwds else: res = Wrong((args, kwds)) return res
[docs]def keywords_only(func): '''Make a function to accepts its keywords arguments as keywords-only. In Python 3 parlance this would make:: func(a, b=None) become:: func(a, *, b=None). In Python 3 this decorator does nothing. If `func` does not have any keyword arguments, return `func`. There's a pathological case when you define:: func(a, b=None, *args) In such a case if you call ``func(1, 2, b=3)`` we can't actually call the original function with ``a=1``, ``args=(2, )`` and ``b=3``. This case also raises a TypeError. .. versionadded:: 1.8.0 ''' import sys from functools import wraps from xoutil.future.inspect import getfullargspec if sys.version_info >= (3, 0): return func spec = getfullargspec(func) if not spec.defaults: return func l = len(spec.args) - len(spec.defaults) kargs = spec.args[l:] if len(kargs) > 1: display_kargs = ', '.join("'%s'" % arg for arg in spec.args[l:-1]) display_kargs += " and '%s'" % spec.args[-1] else: display_kargs = "'%s'" % spec.args[l] InvalidSignature = TypeError( 'Arguments %s must be passed as keyword' % (display_kargs, ) ) @wraps(func) def inner(*args, **kwargs): if len(args) > l: # The case of ``def f(a, b=X, *args)`` because if we call # ``f(1, 2, b=3)`` we cannot properly call the original # function with a=1, args=(2, ) and b=3. raise InvalidSignature return func(*args, **kwargs) return inner
[docs]def pop_keyword_arg(kwargs, names, default=Undefined): '''Return the value of a keyword argument. :param kwargs: The mapping with passed keyword arguments. :param names: Could be a single name, or a collection of names. :param default: The default value to return if no value is found. .. versionadded:: 1.8.0 ''' from xoutil.eight import string_types from xoutil.objects import pop_first_of if isinstance(names, string_types): names = (names,) return pop_first_of(kwargs, *names, default=default)
[docs]def pop_keyword_values(kwargs, *names, **options): '''Return a list with all keyword argument values. :param kwargs: The mapping with passed keyword arguments. :param names: Each item will be a definition of keyword argument name to retrieve. Could be a string with a name, or a list of alternatives (aliases). :keyword default: Keyword only option to define a default value to be used in place of not given arguments. If not given, it is used special value `~xoutil.symbols.Undefined`:obj:. :keyword defaults: A dictionary with default values per argument name. If none is given, use `default`. .. note:: `defaults` trumps `default`. .. warning:: For the case where a single name has several alternatives, you may choose any of the alternatives. If you pass several diverging defaults for different alternatives, the result is undefined. :keyword ignore_error: By default, when there are remaining values in `kwargs`, after all names are processed, a `TypeError`:class: is raised. If this keyword only option is True, this function returns normally. Examples:: >>> pop_keyword_values({'b': 1}, 'a', 'b') [Undefined, 1] >>> kwargs = {'a': 1, 'b': 2, 'c': 3} >>> try: ... res = pop_keyword_values(kwargs, 'a', 'b') ... except TypeError as error: ... res = error >>> type(res) TypeError >>> kwargs = {'a': 1, 'b': 2, 'c': 3} >>> options = dict(ignore_error=True, default=None) >>> pop_keyword_values(kwargs, 'a', ('B', 'b'), **options) [1, 2] .. versionadded:: 1.8.3 ''' default = options.get('default', Undefined) defaults = options.get('defaults', {}) res = [] for item in names: val = pop_keyword_arg(kwargs, item, default=Undefined) if val is Undefined: val = pop_keyword_arg(defaults, item, default=default) res.append(val) if kwargs and not options.get('ignore_error', False): msg = 'calling function got unexpected keyword arguments "{}"' raise TypeError(msg.format(tuple(kwargs))) return res
[docs]class ParamManager: '''Function parameters parser. For example:: def wraps(*args, **kwargs): pm = ParamManager(args, kwargs) name = pm(0, 1, 'name', coerce=str) wrapped = pm(0, 1, 'wrapped', coerce=valid(callable)) ... When an instance of this class is called (``__call__`` operator), it is used the same protocol as when creating an instance of a parameter definition row (`ParamSchemeRow`:class:). See `ParamScheme`:class: class as another way to define and validate schemes for extracting parameter values in a consistent way. .. versionadded:: 1.8.0 '''
[docs] def __init__(self, args, kwds): '''Created with actual parameters of a client function.''' self.args = args self.kwds = kwds self.consumed = set() # consumed identifiers
[docs] def __call__(self, *ids, **options): '''Get a parameter value.''' from xoutil.fp.option import Just, Wrong, none # TODO: Change this ``from xoutil.values import coercer`` from xoutil.fp.prove.semantic import predicate as coercer args, kwds = self.args, self.kwds i, res = 0, none while isinstance(res, Wrong) and i < len(ids): key = ids[i] if key in self.consumed: pass elif isinstance(key, int): try: res = args[key] except IndexError: pass elif key in kwds: res = kwds[key] if not isinstance(res, Wrong) and 'coerce' in options: aux = coercer(options['coerce'])(res) res = aux.inner if isinstance(aux, Just) else aux if not isinstance(res, Wrong): self.consumed.add(key) if isinstance(key, int) and key < 0: # consume both, negative and adjusted value key = len(args) + key self.consumed.add(key) else: i += 1 if isinstance(res, Wrong): if 'default' in options: return options['default'] elif isinstance(res.inner, BaseException): from xoutil.eight.exceptions import throw throw(res.inner) else: raise TypeError('value for "{}" is not found'.format(ids)) else: return res.inner if isinstance(res, Just) else res
[docs] def remainder(self): '''Return not consumed values in a mapping.''' from xoutil.eight import range passed = set(range(len(self.args))) | set(self.kwds) ids = passed - self.consumed args, kwds = self.args, self.kwds return {k: args[k] if isinstance(k, int) else kwds[k] for k in ids}
[docs]class ParamSchemeRow: '''Scheme row for a `ParamManager`:class: instance call. This class validates identifiers and options at this level; these checks are not done in a call to get a parameter value. Normally this class is used as part of a full `ParamScheme`:class: composition. Additionally to the options can be passed to `ParamManager.__call__`:meth:', this class can be instanced with: :param ids: positional variable number arguments, could be aliases for keyword parameter passing, or integers for order (negative values are means right-to-left indexing, like in sequences); :param key: an identifier to be used when the parameter is only positional or when none of the possible keyword aliases must be used as the primary-key; :param default: keyword argument, value used if the parameter is absent; :param coerce: check if a value is valid or not and convert to its definitive value; see `xoutil.values`:mod: module for more information. .. versionadded:: 1.8.0 ''' __slots__ = ('ids', 'options', '_key') def __init__(self, *ids, **options): from collections import Counter from xoutil.eight import iteritems, string_types as strs from xoutil.eight.string import safe_isidentifier as iskey from xoutil.eight import type_name from xoutil.fp.option import none # TODO: Change this ``from xoutil.values import coercer`` from xoutil.fp.prove.semantic import predicate as coercer aux = {k: c for k, c in iteritems(Counter(ids)) if c > 1} if aux: parts = ['{!r} ({})'.format(k, aux[k]) for k in aux] msg = '{}() repeated identifiers: {}' raise TypeError(msg.format(type_name(self), ', '.join(parts))) else: def ok(k): return (isinstance(k, strs) and iskey(k) or isinstance(k, int)) bad = [k for k in ids if not ok(k)] if bad: msg = ('{}() identifiers with wrong type (only int and str ' 'allowed): {}') raise TypeError(msg.format(type_name(self), bad)) key = options.pop('key', none) if not (key is none or iskey(key)): msg = ('"key" option must be an identifier, "{}" of type "{}" ' 'given') raise TypeError(msg.format(key), type_name(key)) if 'default' in options: aux = {'default': options.pop('default')} else: aux = {} if 'coerce' in options: aux['coerce'] = coercer(options.pop('coerce')) if options: msg = '{}(): received invalid keyword parameters: {}' raise TypeError(msg.format(type_name(self), set(options))) self.ids = ids self.options = aux self._key = key def __str__(self): from xoutil.eight import iteritems parts = [repr(k) for k in self.ids] for key, value in iteritems(self.options): parts.append('{}={!r}'.format(key, value)) aux = ', '.join(parts) return 'ParamSchemeRow({})'.format(aux) __repr__ = __str__
[docs] def __call__(self, *args, **kwds): '''Execute a scheme-row using as argument a `ParamManager` instance. The concept of `ParamManager`:class: instance argument is a little tricky: when a variable number of arguments is used, if only one positional and is already an instance of `ParamManager`:class:, it is directly used; if two, the first is a `tuple` and the second is a `dict`, these are considered the constructor arguments of the new instance; otherwise all arguments are used to build the new instance. ''' count = len(args) if count == 1 and not kwds and isinstance(args[0], ParamManager): manager = args[0] else: if count == 2 and not kwds: a, k = args if isinstance(a, tuple) and isinstance(k, dict): args, kwds = a, k manager = ParamManager(args, kwds) return manager(*self.ids, **self.options)
@property def default(self): '''Returned value if parameter value is absent. If not defined, special value ``none`` is returned. ''' from xoutil.fp.option import none return self.options.get('default', none) @property def key(self): '''The primary key for this scheme-row definition. This concept is a little tricky (the first string identifier if some is given, if not then the first integer). This definition is useful, for example, to return remainder not consumed values after a scheme process is completed (see `ParamManager.remainder`:meth: for more information). ''' # TODO: calculate the key value in the constructor from xoutil.eight import string_types as strs from xoutil.fp.option import none res = self._key if res is none: res = next((k for k in self.ids if isinstance(k, strs)), None) if res is None: res = self.ids[0] self._key = res return res
[docs]class ParamScheme: '''Full scheme for a `ParamManager`:class: instance call. This class receives a set of `ParamSchemeRow`:class: instances and validate them as a whole. .. versionadded:: 1.8.0 ''' __slots__ = ('rows', 'cache') def __init__(self, *rows): from xoutil.eight import string_types as strs, type_name from xoutil.params import check_count check_count(len(rows) + 1, 2, caller=type_name(self)) used = set() for idx, row in enumerate(rows): if isinstance(row, ParamSchemeRow): this = {k for k in row.ids if isinstance(k, strs)} aux = used & this if not aux: used |= this else: msg = ('{}() repeated keyword identifiers "{}" in ' 'row {}').format(type_name(self), aux, idx) raise ValueError(msg) self.rows = rows self.cache = None def __str__(self): from xoutil.eight import type_name aux = ',\n\i'.join(str(row) for row in self) return '{}({})'.format(type_name(self), aux) def __repr__(self): from xoutil.eight import type_name return '{}({} rows)'.format(type_name(self), len(self))
[docs] def __len__(self): '''The defined scheme-rows number.''' return len(self.rows)
[docs] def __getitem__(self, idx): '''Obtain the scheme-row by a given index.''' from xoutil.eight import string_types if isinstance(idx, string_types): cache = self._getcache() return cache[idx] else: return self.rows[idx]
[docs] def __iter__(self): '''Iterate over all defined scheme-rows.''' return iter(self.rows)
[docs] def __call__(self, args, kwds, strict=True): '''Get a mapping with all resulting values. If special value 'none' is used as 'default' option in a scheme-row, corresponding value isn't returned in the mapping if the parameter value is missing. ''' from xoutil.eight import type_name def ok(v): from xoutil.fp.option import Wrong return not isinstance(v, Wrong) pm = ParamManager(args, kwds) aux = ((row.key, row(pm)) for row in self) res = {key: value for key, value in aux if ok(value)} rem = pm.remainder() if strict: if rem: msg = ('after a full `{}` process, there are still remainder ' 'parameters: {}') raise TypeError(msg.format(type_name(self), set(rem))) else: res.update(rem) return res
def keys(self): '''Partial compatibility with mappings.''' from xoutil.eight import iterkeys return iterkeys(self._getcache()) def items(self): '''Partial compatibility with mappings.''' from xoutil.eight import iteritems return iteritems(self._getcache()) @property def defaults(self): '''Return a mapping with all valid default values.''' def ok(v): from xoutil.fp.option import Wrong return not isinstance(v, Wrong) aux = ((row.key, row.default) for row in self) return {k: d for k, d in aux if ok(d)} def _getcache(self): if not self.cache: self.cache = {row.key: row for row in self} return self.cache