Source code for xotl.tools.future.collections

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------
# Copyright (c) Merchise Autrement [~º/~] and Contributors
# All rights reserved.
#
# This is free software; you can do what the LICENCE file allows you to.
#

"""Extensions to Python's `collections` module.

You may use it as drop-in replacement of `collections`.  Although we don't
document all items here.  Refer to `collections`:mod: documentation.

"""

from collections import *  # noqa
from reprlib import recursive_repr

try:
    from collections.abc import *  # noqa
    from collections.abc import Container, Iterable, Mapping, MutableMapping, MutableSet, Set, Sized
except ImportError:
    from collections import (
        Set,
        Iterable,
        Sized,
        Container,
        MutableSet,
        Mapping,
        MutableMapping,
    )

import collections as _stdlib  # noqa
from collections import _repeat  # noqa
from collections import _chain, _count_elements, _itemgetter, _starmap  # noqa

try:
    from collections import _heapq  # noqa
except ImportError:
    pass

from xotl.tools.deprecation import deprecated  # noqa
from xotl.tools.objects import SafeDataItem as safe  # noqa
from xotl.tools.symbols import Unset  # noqa


class safe_dict_iter(tuple):
    """Iterate a dictionary in a safe way.

    This is useful when a dictionary can be modified while iterating on it,
    for example::

      >>> d = {1: 2, 3: 4, 5: 6}
      >>> di = safe_dict_iter(d)

      >>> for k, v in di.items():
      ...     d[v] = k

      >>> [(k, v) for (k, v) in di.items()]
      [(1, 2), (3, 4), (5, 6)]

      >>> del d[1]

      >>> [(k, v) for (k, v) in di.items()]
      [(3, 4), (5, 6)]

      >>> [k for k in di]
      [3, 5]

    """

    def __new__(cls, mapping):
        self = super().__new__(cls, mapping)
        self._mapping = mapping
        return self

    def __str__(self):
        cls_name = type(self).__name__
        res = str(", ").join(str(i) for i in self)
        return str("{}({})").format(cls_name, res)

    __repr__ = __str__

    def __len__(self):
        return sum(1 for key in self)

    def __contains__(self, key):
        res = super().__contains__(key)
        return res and key in self.mapping

    def __nonzero__(self):
        return bool(len(self))

    __bool__ = __nonzero__

    def __iter__(self):
        for key in super().__iter__():
            if key in self._mapping:
                yield key

    keys = __iter__

    def values(self):
        for key in self:
            if key in self._mapping:
                yield self._mapping[key]

    def items(self):
        for key in self:
            if key in self._mapping:
                yield (key, self._mapping[key])


[docs]class defaultdict(_stdlib.defaultdict): """A hack for ``collections.defaultdict`` that passes the key and a copy of self as a plain dict (to avoid infinity recursion) to the callable. Examples:: >>> from xotl.tools.future.collections import defaultdict >>> d = defaultdict(lambda key, d: 'a') >>> d['abc'] 'a' Since the second parameter is actually a dict-copy, you may (naively) do the following:: >>> d = defaultdict(lambda k, d: d[k]) >>> d['abc'] Traceback (most recent call last): ... KeyError: 'abc' You may use this class as a drop-in replacement for ``collections.defaultdict``:: >>> d = defaultdict(lambda: 1) >>> d['abc'] 1 """ def __missing__(self, key): if self.default_factory is not None: try: return self.default_factory(key, dict(self)) except TypeError: # This is the error when the arguments are not expected. return super().__missing__(key) else: raise KeyError(key)
[docs]class OpenDictMixin: """A mixin for mappings implementation that expose keys as attributes. For example: >>> from xotl.tools.objects import SafeDataItem as safe >>> class MyOpenDict(OpenDictMixin, dict): ... __slots__ = safe.slot(OpenDictMixin.__cache_name__, dict) >>> d = MyOpenDict({'es': 'spanish'}) >>> d.es 'spanish' >>> d['es'] = 'espanol' >>> d.es 'espanol' When setting or deleting an attribute, the attribute name is regarded as key in the mapping if neither of the following condition holds: - The name is a `slot`. - The object has a ``__dict__`` attribute and the name is key there. This mixin defines the following features that can be redefined: ``_key2identifier`` Protected method, receives a key as argument and must return a valid identifier that is used instead the key as an extended attribute. ``__cache_name__`` Inner field to store a cached mapping between actual keys and calculated attribute names. The field must be always implemented as a `SafeDataItem` descriptor and must be of type `dict`. There are two ways of implementing this: - As a slot. The first time of this implementation is an example. Don't forget to pass the second parameter with the constructor `dict`. - As a normal descriptor:: >>> from xotl.tools.objects import SafeDataItem as safe >>> class MyOpenDict(OpenDictMixin, dict): ... safe(OpenDictMixin.__cache_name__, dict) Classes or Mixins that can be integrated with `dict` by inheritance must not have a `__slots__` definition. Because of that, this mixin must not declare any slot. If needed, it must be declared explicitly in customized classed like in the example in the first part of this documentation or in the definition of `opendict` class. """ __cache_name__ = str("_inverted_cache") def __dir__(self): """Return normal "dir" plus valid keys as attributes.""" # TODO: Check if super must be called if defined from xotl.tools.objects import fulldir return list(set(~self) | fulldir(self)) def __getattr__(self, name): from xotl.tools.future.inspect import get_attr_value _mark = object() res = get_attr_value(self, name, _mark) if res is not _mark: return res else: key = (~self).get(name) if key: return self[key] else: msg = "'%s' object has no attribute '%s'" raise AttributeError(msg % (type(self).__name__, name)) def __setattr__(self, name, value): cls = type(self) desc = getattr(cls, name, None) if desc is not None: # Prioritize descriptors try: desc.__set__(self, value) except Exception: pass key = (~self).get(name) if key: self[key] = value else: super().__setattr__(name, value) def __delattr__(self, name): key = (~self).get(name) if key: del self[key] else: super().__delattr__(name) def __invert__(self): """Return an inverted mapping between key and attribute names. Keys of the resulting dictionary are identifiers for attribute names and values are original key names. Class attribute "__separators__" are used to calculate it and is cached in '_inverted_cache slot safe variable. Several keys could have the same identifier, only one will be valid and used. To obtain this mapping you can use as the unary operator "~". """ KEY_LENGTH = "length" KEY_MAPPING = "mapping" cache = self._cache cached_length = cache.setdefault(KEY_LENGTH, 0) length = len(self) if cached_length != length: cache[KEY_LENGTH] = length aux = ((self._key2identifier(k), k) for k in self) res = {key: attr for key, attr in aux if key} cache[KEY_MAPPING] = res else: res = cache.get(KEY_MAPPING) if res is None: assert cached_length == 0 res = {} cache[KEY_MAPPING] = res return res @property def _cache(self): from xotl.tools.future.inspect import get_attr_value try: return get_attr_value(self, type(self).__cache_name__) except AttributeError: res = setattr(self, type(self).__cache_name__, dict()) return res @staticmethod def _key2identifier(key): """Convert keys to valid identifiers. This method could be redefined in sub-classes to change this feature. This function must return a valid identifier or None if the conversion is not possible. """ # TODO: Improve this in order to obtain a full-mapping. For example, # the corresponding attribute names for the keys ``'-x-y'`` and # ``'x-y'`` are the same, in that case only one will be returning. from xotl.tools.keywords import suffix_kwd from xotl.tools.string import slugify from xotl.tools.validators import is_valid_identifier res = key if is_valid_identifier(key) else slugify(key, "_") return suffix_kwd(res)
[docs]class SmartDictMixin: """A mixin that extends the `update` method of dictionaries Standard method allow only one positional argument, this allow several. Note on using mixins in Python: method resolution order is calculated in the order of inheritance, if a mixin is defined to overwrite behavior already existent, use first that classes with it. See `SmartDict`:class: below. """ def update(*args, **kwds): """Update this dict from a set of iterables `args` and keyword values `kwargs`. Each positional argument could be: - another mapping (any object implementing "keys" and `~object.__getitem__`:meth: methods. - an iterable of (key, value) pairs. """ from xotl.tools.params import issue_9137 self, args = issue_9137(args) for key, value in smart_iter_items(*args, **kwds): self[key] = value # TODO: Include new argument ``full=True`` to also search in string # values. Maybe this kind of feature will be better in a function # instead a method. def search(self, pattern): """Return new mapping with items which key match a `pattern` regexp. This function always tries to return a valid new mapping of the same type of the caller instance. If the constructor of corresponding type can't be called without arguments, then look up for a class variable named `__search_result_type__` or return a standard Python dictionary if not found. """ from re import compile regexp = compile(pattern) cls = type(self) try: res = cls() except Exception: from xotl.tools.future.inspect import get_attr_value creator = get_attr_value(cls, "__search_result_type__", None) res = creator() if creator else {} for key in self: if regexp.search(key): res[key] = self[key] return res
class SmartDict(SmartDictMixin, dict): """A "smart" dictionary that can receive a wide variety of arguments. See `SmartDictMixin.update`:meth: and :meth:`SmartDictMixin.search`. """ def __init__(*args, **kwargs): from xotl.tools.params import issue_9137 self, args = issue_9137(args) super().__init__() self.update(*args, **kwargs)
[docs]class opendict(OpenDictMixin, dict): """A dictionary implementation that mirrors its keys as attributes. For example:: >>> d = opendict(es='spanish') >>> d.es 'spanish' >>> d['es'] = 'espanol' >>> d.es 'espanol' Setting attributes not already included *does not* makes them keys:: >>> d.en = 'English' >>> set(d) {'es'} """ __slots__ = safe.slot(OpenDictMixin.__cache_name__, dict)
[docs] @classmethod def from_enum(cls, enumclass): """Creates an opendict from an enumeration class. If `enumclass` lacks the ``__members__`` dictionary, take the ``__dict__`` of the class disregarding the keys that cannot be `public identifiers <xotl.tools.validators.identifiers.is_valid_public_identifier>`:func:. If `enumclass` has the ``__members__`` attribute this is the same as ``opendict(enumclass.__members__)``. Example: .. code-block:: python >>> from xotl.tools.future.collections import opendict >>> @opendict.from_enum >>> class Foo: ... x = 1 ... _y = 2 >>> type(Foo) is opendict True >>> dict(Foo) {'x': 1} """ from xotl.tools.symbols import Unset from xotl.tools.validators.identifiers import is_valid_public_identifier members = getattr(enumclass, "__members__", Unset) if members is Unset: members = {k: v for k, v in enumclass.__dict__.items() if is_valid_public_identifier(k)} return cls(members)
[docs]class codedict(OpenDictMixin, dict): """A dictionary implementation that evaluate keys as Python expressions. This is also a open dict (see `OpenDictMixin`:class: for more info). Example: >>> cd = codedict(x=1, y=2, z=3.0) >>> '{_[x + y]} is 3 -- {_[x + z]} is 4.0'.format(_=cd) '3 is 3 -- 4.0 is 4.0' It supports the right shift (``>>``) operator as a format operand (using ``_`` as the special name for the code dict): >>> cd >> '{_[x + y]} is 3 -- {_[x + z]} is 4.0 -- {x} is 1' '3 is 3 -- 4.0 is 4.0 -- 1 is 1' It also implements the left shift (``<<``) operator: >>> '{_[x + y]} is 3 -- {_[x + z]} is 4.0 -- {x} is 1' << cd '3 is 3 -- 4.0 is 4.0 -- 1 is 1' .. versionadded:: 1.8.3 """ def __getitem__(self, key): try: return super().__getitem__(key) except KeyError: if isinstance(key, str): return eval(key, dict(self)) else: raise def __rshift__(self, arg): return arg.format(_=self, **self) __rlshift__ = __rshift__
[docs]class StackedDict(OpenDictMixin, SmartDictMixin, MutableMapping): """A multi-level mapping. A level is entered by using the `push`:meth: and is leaved by calling `pop`:meth:. The property `level`:attr: returns the actual number of levels. When accessing keys they are searched from the latest level "upwards", if such a key does not exists in any level a KeyError is raised. Deleting a key only works in the *current level*; if it's not defined there a KeyError is raised. This means that you can't delete keys from the upper levels without `popping <pop>`:func:. Setting the value for key, sets it in the current level. .. versionchanged:: 1.5.2 Based on the newly introduced `ChainMap`:class:. """ __slots__ = ( safe.slot("inner", ChainMap), safe.slot(OpenDictMixin.__cache_name__, dict), ) def __init__(*args, **kwargs): # Each data item is stored as {key: {level: value, ...}} from xotl.tools.params import issue_9137 self, args = issue_9137(args) self.update(*args, **kwargs) @property def level(self): """Return the current level number. The first level is 0. Calling `push`:meth: increases the current level (and returns it), while calling `pop`:meth: decreases the current level (if possible). """ return len(self.inner.maps) - 1
[docs] def push_level(self, *args, **kwargs): """Pushes a whole new level to the stacked dict. :param args: Several mappings from which the new level will be initialled filled. :param kwargs: Values to fill the new level. :returns: The pushed `level`:attr: number. """ self.inner = self.inner.new_child() self.update(*args, **kwargs) return self.level
[docs] @deprecated(push_level) def push(self, *args, **kwargs): """Don't use thid method, use new `push_level`:meth: instead.""" return self.push_level(*args, **kwargs)
[docs] def pop(self, *args): """Remove this, always use original `MutableMapping.pop`:meth:. If none arguments are given, `pop_level`:meth: is called and a deprecation warning is printed in `sys.stderr` the first time. If one or two arguments are given, those are interpreted as (key, default) values of the super class `pop`:meth:. """ if len(args) == 0: cls = type(self) if not hasattr(cls, "_bad_pop_called"): import warnings cls._bad_pop_called = True msg = "Use `pop` method without parameters is deprecated, use `pop_level` instead" warnings.warn(msg, stacklevel=2) return self.pop_level() else: return super().pop(*args)
[docs] def pop_level(self): """Pops the last pushed level and returns the whole level. If there are no levels in the stacked dict, a TypeError is raised. :returns: A dict containing the poped level. """ if self.level > 0: stack = self.inner res = stack.maps[0] self.inner = stack.parents return res else: raise TypeError("Cannot pop from StackedDict without any levels")
[docs] def peek(self): """Peeks the top level of the stack. Returns a copy of the top-most level without any of the keys from lower levels. Example:: >>> sdict = StackedDict(a=1, b=2) >>> sdict.push(c=3) # it returns the level... 1 >>> sdict.peek() {'c': 3} """ return dict(self.inner.maps[0])
def __str__(self): # TODO: Optimize return str(dict(self)) def __repr__(self): return "%s(%s)" % (type(self).__name__, str(self)) def __len__(self): return len(self.inner) def __iter__(self): return iter(self.inner) def __getitem__(self, key): return self.inner[key] def __setitem__(self, key, value): self.inner[key] = value def __delitem__(self, key): del self.inner[key]
class RankedDict(SmartDictMixin, dict): """Mapping that remembers modification order. Differences with `OrderedDict`:class: are: - Can be ranked (change precedence order) at any time; see `rank`:meth: and `swap_ranks`:meth: methods for more information. - Based in modification, not in insertion order. - Keeps the standard semantics of Python for `popitem`:meth: method returning a random pair when called without parameters. """ def __init__(*args, **kwds): """Initialize a ranked dictionary. The signature is the same as regular dictionaries, but keyword arguments are not recommended because their insertion order is arbitrary. Use `rank`:meth: to change the precedence order in any moment. """ from xotl.tools.params import issue_9137 self, args = issue_9137(args) # Ensure direct calls to ``__init__`` don't clear previous contents try: self._ranks except AttributeError: self._ranks = [] self.update(*args, **kwds) def rank(self, *keys): """Arrange mapping keys into a systematic precedence order. :param keys: Variable number of key values, given are ordered in the highest precedence levels (0 is the most priority). Not given keys are added at the end in the its current order. """ if keys: ranks = [] for key in keys: if key in self: ranks.append(key) else: raise KeyError("{}".format(key)) aux = set(keys) for key in self: if key not in aux: ranks.append(key) self._ranks = ranks def swap_ranks(self, *args, **kwds): """Exchange ranks of given keys. :param args: Each item must be a pair of keys to exchange the order of precedence. :param kwds: Every keyword argument will have a pair to exchange (name, value). """ from xotl.tools.params import check_count check_count(len(args) + len(kwds) + 1, 2, caller="swap_ranks") for key1, key2 in args: self._swap_ranks(key1, key2) for key1 in kwds: key2 = kwds[key1] self._swap_ranks(key1, key2) def move_to_end(self, key, last=True): """Move an existing element to the end. Or move it to the beginning if ``last==False``. Raises KeyError if the element does not exist. When ``last==True``, acts like a fast version of ``self[key] = self.pop(key)``. .. note:: This method is kept for compatibility with `OrderedDict`:class:. Last example using ``self.pop(key)`` works well in both, in OD and this class, but here in `RankedDict`:class: it's semantically equivalent to ``self[key] = self[key]``. """ try: ranks = self._ranks if last: if key != ranks[-1]: ranks.remove(key) ranks.append(key) else: if key != ranks[0]: ranks.remove(key) ranks.insert(0, key) except ValueError: raise KeyError(key) def _swap_ranks(self, key1, key2): """Protected method to swap a pair of ranks.""" if key1 in self and key2 in self: ranks = self._ranks idx1, idx2 = ranks.index(key1), ranks.index(key2) if idx1 != idx2: aux = ranks[idx1] ranks[idx1] = ranks[idx2] ranks[idx2] = aux else: raise KeyError("{!r} and/or {!r}".format(key1, key2)) def __setitem__(self, key, value): """rd.__setitem__(i, y) <==> rd[i]=y""" ranks = self._ranks if key in self: if ranks[-1] != key: ranks.remove(key) ranks.append(key) else: ranks.append(key) super().__setitem__(key, value) def __delitem__(self, key): """rd.__delitem__(y) <==> del rd[y]""" super().__delitem__(key) self._ranks.remove(key) def __iter__(self): """rd.__iter__() <==> iter(rd)""" return iter(self._ranks) def __reversed__(self): """rd.__reversed__() <==> reversed(rd)""" return reversed(self._ranks) def clear(self): """rd.clear() -> None. Remove all items from rd.""" super().clear() self._ranks = [] def popitem(self, index=None): """rd.popitem([index]) -> (key, value), return and remove a pair. :param index: Position of pair to return (default last). This method isn't have the same semantic as in `OrderedDict`:class: one, none the defined in method with the same name in standard Python mappings; here is similar to `~list.pop`:meth:. """ if self: if index is None or index is True: index = -1 key = self._ranks.pop(index) return key, super().pop(key) else: raise KeyError("popitem(): dictionary is empty") def __sizeof__(self): """D.__sizeof__() -> size of D in memory, in bytes. .. note:: Why ``sys.getsizeof(obj)`` doesn't return the same result as ``obj.__sizeof__()``? """ return super().__sizeof__() + self._ranks.__sizeof__() def keys(self): """D.keys() -> an object providing a view on D's keys.""" return self.__iter__() def values(self): """D.values() -> an object providing a view on D's values.""" for key in self: yield self[key] def items(self): """D.items() -> an object providing a view on D's items.""" for key in self: yield (key, self[key]) def __eq__(self, other): """rd.__eq__(y) <==> rd==y. Comparison to another `RankedDict`:class: instance is order-sensitive while comparison to a regular mapping is order-insensitive. """ res = super().__eq__(other) if res: if isinstance(other, RankedDict): return self._ranks == other._ranks elif isinstance(other, OrderedDict): return self._ranks == list(other) else: return True else: return False def __ne__(self, other): return not self.__eq__(other) def pop(self, key, *args): """rd.pop(k[,d]) -> v Remove specified key and return the corresponding value. If key is not found, d is returned if given, otherwise KeyError is raised. """ from xotl.tools.params import check_count count = len(args) check_count(count + 1, 1, 2, caller="pop") res = super().pop(key, Unset) if res is Unset: if count == 1: return args[0] else: raise KeyError(key) else: self._ranks.remove(key) return res def setdefault(self, key, default=None): """D.setdefault(k[,d]) -> D.get(k,d), also set D[k]=d if k not in D""" if key in self: return self[key] else: self[key] = default return default @recursive_repr() def __repr__(self): """x.__repr__() <==> repr(x)""" aux = ", ".join("({!r}, {!r})".format(k, self[k]) for k in self) if aux: aux = "[{}]".format(aux) return "{}({})".format(type(self).__name__, aux) def __reduce__(self): """Return state information for pickling.""" cls = type(self) inst_dict = vars(self).copy() for k in vars(cls()): inst_dict.pop(k, None) return cls, (), inst_dict or None, None, iter(self.items()) def copy(self): """D.copy() -> a shallow copy of D.""" return type(self)(self) @classmethod def fromkeys(cls, iterable, value=None): """RD.fromkeys(S[, v]) -> New ranked dictionary with keys from S. If not specified, the value defaults to None. """ return cls((key, value) for key in iterable)
[docs]class OrderedSmartDict(SmartDictMixin, OrderedDict): """A combination of the `OrderedDict` with the `SmartDictMixin`. .. warning:: Initializing with kwargs does not ensure any initial ordering, since Python's keyword dict is not ordered. Use a list/tuple of pairs instead. """ def __init__(*args, **kwds): """Initialize an ordered dictionary. The signature is the same as regular dictionaries, but keyword arguments are not recommended because their insertion order is arbitrary. """ from xotl.tools.params import issue_9137 self, args = issue_9137(args) super().__init__() self.update(*args, **kwds)
class MetaSet(type): """Allow syntax sugar creating sets. This is pythonic syntax (stop limit is never included), for example:: >>> from xotl.tools.future.collections import PascalSet as srange >>> [i for i in srange[1:4, 15, 20:23]] [1, 2, 3, 15, 20, 21, 22, 23] """ def __getitem__(cls, ranges): return cls(*ranges) if isinstance(ranges, tuple) else cls(ranges)
[docs]class PascalSet(metaclass=MetaSet): """Collection of unique integer elements (implemented with intervals). :: PascalSet(*others) -> new set object .. versionadded:: 1.7.1 """ __slots__ = ("_items",) def __init__(self, *others): """Initialize self. :param others: Any number of integer or collection of integers that will be the set members. """ self._items = [] # a list of list of two elements self.update(*others) def __str__(self): def aux(s, e): if s == e: return str(s) elif s + 1 == e: return "%s, %s" % (s, e) else: return "%s..%s" % (s, e) l = self._items ranges = ((l[i], l[i + 1]) for i in range(0, len(l), 2)) return str("{%s}" % ", ".join((aux(s, e) for (s, e) in ranges))) def __repr__(self): cls = type(self) cname = cls.__name__ return str("%s([%s])" % (cname, ", ".join((str(i) for i in self)))) def __iter__(self): l = self._items i, count = 0, len(l) while i < count: s, e = l[i], l[i + 1] while s <= e: yield s s += 1 i += 2 def __len__(self): res = 0 l = self._items i, count = 0, len(l) while i < count: res += l[i + 1] - l[i] + 1 i += 2 return res def __nonzero__(self): return bool(self._items) __bool__ = __nonzero__ def __contains__(self, other): """True if set has an element ``other``, else False.""" return isinstance(other, int) and self._search(other)[0] def __hash__(self): """Compute the hash value of a set.""" return Set._hash(self) def __eq__(self, other): """Python 2 and 3 have several differences in operator definitions. For example:: >>> from xotl.tools.future.collections import PascalSet >>> s1 = PascalSet[0:10] >>> assert s1 == set(s1) # OK (True) in 2 and 3 >>> assert set(s1) == s1 # OK in 3, fails in 2 """ if isinstance(other, Set): ls, lo = len(self), len(other) if ls == lo: if isinstance(other, PascalSet): return self._items == other._items else: return self.count(other) == ls else: return False else: return False def __ne__(self, other): return not self.__eq__(other) def __gt__(self, other): if isinstance(other, Set): if other: return self.issuperset(other) and len(self) > len(other) else: return bool(self._items) else: return NotImplemented def __ge__(self, other): if isinstance(other, Set): if other: return self.issuperset(other) else: return bool(self._items) else: return NotImplemented def __lt__(self, other): if isinstance(other, Set): if other: return self.issubset(other) and len(self) < len(other) else: return not self._items else: return NotImplemented def __le__(self, other): if isinstance(other, Set): if other: return self.issubset(other) else: return not self._items else: return NotImplemented def __sub__(self, other): if isinstance(other, Set): return self.difference(other) else: return NotImplemented def __isub__(self, other): if isinstance(other, Set): self.difference_update(other) return self else: return NotImplemented def __rsub__(self, other): if isinstance(other, Set): return other - type(other)(self) else: return NotImplemented def __and__(self, other): if isinstance(other, Set): return self.intersection(other) else: return NotImplemented def __iand__(self, other): if isinstance(other, Set): self.intersection_update(other) return self else: return NotImplemented def __rand__(self, other): if isinstance(other, Set): return other & type(other)(self) else: return NotImplemented def __or__(self, other): if isinstance(other, Set): return self.union(other) else: return NotImplemented def __ior__(self, other): if isinstance(other, Set): self.update(other) return self else: return NotImplemented def __ror__(self, other): if isinstance(other, Set): return other | type(other)(self) else: return NotImplemented def __xor__(self, other): if isinstance(other, Set): return self.symmetric_difference(other) else: return NotImplemented def __ixor__(self, other): if isinstance(other, Set): self.symmetric_difference_update(other) return self else: return NotImplemented def __rxor__(self, other): if isinstance(other, Set): return other ^ type(other)(self) else: return NotImplemented def count(self, other): """Number of occurrences of any member of other in this set. If other is an integer, return 1 if present, 0 if not. """ if isinstance(other, int): return 1 if other in self else 0 else: return sum((i in self for i in other), 0) def add(self, other): """Add an element to a set. This has no effect if the element is already present. """ self._insert(other) def union(self, *others): """Return the union of sets as a new set. (i.e. all elements that are in either set.) """ res = self.copy() res.update(*others) return res def update(self, *others): """Update a set with the union of itself and others.""" for other in others: if isinstance(other, PascalSet): l = other._items if self._items: count = len(l) i = 0 while i < count: self._insert(l[i], l[i + 1]) i += 2 else: self._items = l[:] elif isinstance(other, int): self._insert(other) elif isinstance(other, Iterable): for i in other: self._insert(i) elif isinstance(other, slice): start, stop, step = other.start, other.stop, other.step if step is None: step = 1 if step in (1, -1): stop -= step if step == -1: start, stop = stop, start self._insert(start, stop) else: for i in range(start, stop, step): self._insert(i) else: raise self._invalid_value(other) def intersection(self, *others): """Return the intersection of two or more sets as a new set. (i.e. elements that are common to all of the sets.) """ res = self.copy() res.intersection_update(*others) return res def intersection_update(self, *others): """Update a set with the intersection of itself and another.""" l = self._items oi, count = 0, len(others) while l and oi < count: other = others[oi] if not isinstance(other, PascalSet): # safe mode for intersection other = PascalSet(i for i in other if isinstance(i, int)) o = other._items if o: sl, el = l[0], l[-1] so, eo = o[0], o[-1] if sl < so: self._remove(sl, so - 1) if eo < el: self._remove(eo + 1, el) i = 2 while l and i < len(o): s, e = o[i - 1] + 1, o[i] - 1 if s <= e: self._remove(s, e) i += 2 else: del l[:] oi += 1 def difference(self, *others): """Return the difference of two or more sets as a new set. (i.e. all elements that are in this set but not the others.) """ res = self.copy() res.difference_update(*others) return res def difference_update(self, *others): """Remove all elements of another set from this set.""" for other in others: if isinstance(other, PascalSet): l = other._items count = len(l) i = 0 while i < count: self._remove(l[i], l[i + 1]) i += 2 else: for i in other: if isinstance(i, int): self._remove(i) def symmetric_difference(self, other): """Return the symmetric difference of two sets as a new set. (i.e. all elements that are in exactly one of the sets.) """ res = self.copy() res.symmetric_difference_update(other) return res def symmetric_difference_update(self, other): "Update a set with the symmetric difference of itself and another." if not isinstance(other, PascalSet): other = PascalSet(other) if self: if other: # TODO: Implement more efficiently aux = other - self self -= other self |= aux else: self._items = other._items[:] def discard(self, other): """Remove an element from a set if it is a member. If the element is not a member, do nothing. """ self._remove(other) def remove(self, other): """Remove an element from a set; it must be a member. If the element is not a member, raise a KeyError. """ if other in self: self._remove(other) else: raise KeyError('"%s" is not a member!' % other) def pop(self): """Remove and return an arbitrary set element. Raises KeyError if the set is empty. """ l = self._items if l: res = l[0] if l[0] < l[1]: l[0] += 1 else: del l[0:2] return res else: raise KeyError("pop from an empty set!") def clear(self): """Remove all elements from this set.""" self._items = [] def copy(self): """Return a shallow copy of a set.""" return type(self)(self) def isdisjoint(self, other): """Return True if two sets have a null intersection.""" if isinstance(other, PascalSet): if self and other: l, o = self._items, other._items i, lcount, ocount = 0, len(l), len(o) maybe = True while maybe and i < lcount: found, idx = other._search(l[i]) if idx == ocount: # exhausted # assert not found i = lcount elif found or l[i + 1] >= o[idx]: maybe = False else: i += 2 return maybe else: return True else: return not any(i in self for i in other) def issubset(self, other): """Report whether another set contains this set.""" ls = len(self) if isinstance(other, PascalSet): if self: if ls > len(other): # Fast check for obvious cases return False else: l, o = self._items, other._items i, lcount = 0, len(l) maybe = True while maybe and i < lcount: found, idx = other._search(l[i]) if found and l[i + 1] <= o[idx + 1]: i += 2 else: maybe = False return maybe else: return True elif isinstance(other, Sized) and ls > len(other): # Fast check for obvious cases return False elif isinstance(other, Container): aux = next((i for i in self if i not in other), Unset) return aux is Unset else: # Generator cases from functools import reduce from operator import add lo = reduce(add, (i in self for i in other), 0) return lo == ls def issuperset(self, other): """Report whether this set contains another set.""" ls = len(self) if isinstance(other, PascalSet): if other: if ls < len(other): # Fast check for obvious cases return False else: l, o = self._items, other._items i, ocount = 0, len(o) maybe = True while maybe and i < ocount: found, idx = self._search(o[i]) if found and o[i + 1] <= l[idx + 1]: i += 2 else: maybe = False return maybe else: return True elif isinstance(other, Sized) and ls < len(other): # Fast check for obvious cases return False else: aux = next((i for i in other if i not in self), Unset) return aux is Unset def _search(self, other): """Search the pair where ``other`` is placed. Return a duple :``(if found or not, index)``. """ if isinstance(other, int): l = self._items start, end = 0, len(l) res, pivot = False, 2 * (end // 4) while not res and start < end: s, e = l[pivot], l[pivot + 1] if other < s: end = pivot elif other > e: start = pivot + 2 else: res = True pivot = start + 2 * ((end - start) // 4) return res, pivot else: raise self._invalid_value(other) def _insert(self, start, end=None): """Insert an interval of integers.""" if not end: end = start assert start <= end l = self._items count = len(l) found, idx = self._search(start) if not found: if idx > 0 and start == l[idx - 1] + 1: found = True idx -= 2 l[idx + 1] = start if idx < count - 2 and end == l[idx + 2] - 1: end = l[idx + 3] elif idx < count and end >= l[idx] - 1: found = True l[idx] = start if found: while end > l[idx + 1]: if idx < count - 2 and end >= l[idx + 2] - 1: if end <= l[idx + 3]: l[idx + 1] = l[idx + 3] del l[idx + 2 : idx + 4] count -= 2 else: l[idx + 1] = end else: if idx < count: l.insert(idx, start) l.insert(idx + 1, end) else: l.extend((start, end)) count += 2 def _remove(self, start, end=None): """Remove an interval of integers.""" if not end: end = start assert start <= end l = self._items sfound, sidx = self._search(start) efound, eidx = self._search(end) if sfound and efound and sidx == eidx: first = l[sidx] < start last = l[eidx + 1] > end if first and last: l.insert(eidx + 1, end + 1) l.insert(sidx + 1, start - 1) elif first: l[sidx + 1] = start - 1 elif last: l[eidx] = end + 1 else: del l[sidx : eidx + 2] else: if sfound and l[sidx] < start: l[sidx + 1] = start - 1 sidx += 2 if efound: if l[eidx + 1] > end: l[eidx] = end + 1 else: eidx += 2 if sidx < eidx: del l[sidx:eidx] def _invalid_value(self, value): cls_name = type(self).__name__ vname = type(value).__name__ msg = 'Unsupported type for value "%s" of type "%s" for a "%s", ' "must be an integer!" return TypeError(msg % (value, vname, cls_name)) @classmethod def _prime_numbers_until(cls, limit): """This is totally a funny test method.""" res = cls[2:limit] for i in range(2, limit // 2 + 1): if i in res: aux = i + i while aux < limit: if aux in res: res.remove(aux) aux += i return res
MutableSet.register(PascalSet)
[docs]class BitPascalSet(metaclass=MetaSet): """Collection of unique integer elements (implemented with bit-wise sets). :: BitPascalSet(*others) -> new bit-set object .. versionadded:: 1.7.1 """ __slots__ = ("_items",) _bit_length = 62 # How many values are stored in each item def __init__(self, *others): """Initialize self. :param others: Any number of integer or collection of integers that will be the set members. In this case `_items` is a dictionary with keys containing number division seeds and values bit-wise integers (each bit is the division modulus position). """ self._items = {} self.update(*others) def __str__(self): if self: return str(PascalSet(self)) else: cname = type(self).__name__ return str("%s([])") % cname def __repr__(self): cname = type(self).__name__ res = str(", ").join(str(i) for i in self) return str("%s([%s])") % (cname, res) def __iter__(self): bl = self._bit_length sm = self._items for k in sorted(sm): v = sm[k] base = k * bl i = 0 ref = 1 while i < bl: if ref & v: yield base + i ref <<= 1 i += 1 def __len__(self): return sum((1 for i in self), 0) def __nonzero__(self): return bool(self._items) __bool__ = __nonzero__ def __contains__(self, other): """True if this bit-set has the element ``other``, else False.""" res = self._search(other) if res: k, ref, v = res return bool(v & (1 << ref)) else: return False def __hash__(self): """Compute the hash value of a set.""" return Set._hash(self) def __eq__(self, other): """Python 2 and 3 have several differences in operator definitions. For example:: >>> from xotl.tools.future.collections import BitPascalSet >>> s1 = BitPascalSet[0:10] >>> assert s1 == set(s1) # OK (True) in 2 and 3 >>> assert set(s1) == s1 # OK in 3, fails in 2 """ if isinstance(other, Set): if isinstance(other, BitPascalSet): return self._items == other._items else: ls, lo = len(self), len(other) return ls == lo == self.count(other) else: return False def __ne__(self, other): return not self.__eq__(other) def __gt__(self, other): if isinstance(other, Set): if other: return self.issuperset(other) and len(self) > len(other) else: return bool(self._items) else: return NotImplemented def __ge__(self, other): if isinstance(other, Set): return self.issuperset(other) if other else bool(self._items) else: return NotImplemented def __lt__(self, other): if isinstance(other, Set): if other: return self.issubset(other) and len(self) < len(other) else: return not self._items else: return NotImplemented def __le__(self, other): if isinstance(other, Set): return self.issubset(other) if other else not self._items else: return NotImplemented def __sub__(self, other): if isinstance(other, Set): return self.difference(other) else: return NotImplemented def __isub__(self, other): if isinstance(other, Set): self.difference_update(other) return self else: return NotImplemented def __rsub__(self, other): if isinstance(other, Set): return other - type(other)(self) else: return NotImplemented def __and__(self, other): if isinstance(other, Set): return self.intersection(other) else: return NotImplemented def __iand__(self, other): if isinstance(other, Set): self.intersection_update(other) return self else: return NotImplemented def __rand__(self, other): if isinstance(other, Set): return other & type(other)(self) else: return NotImplemented def __or__(self, other): if isinstance(other, Set): return self.union(other) else: return NotImplemented def __ior__(self, other): if isinstance(other, Set): self.update(other) return self else: return NotImplemented def __ror__(self, other): if isinstance(other, Set): return other | type(other)(self) else: return NotImplemented def __xor__(self, other): if isinstance(other, Set): return self.symmetric_difference(other) else: return NotImplemented def __ixor__(self, other): if isinstance(other, Set): self.symmetric_difference_update(other) return self else: return NotImplemented def __rxor__(self, other): if isinstance(other, Set): return other ^ type(other)(self) else: return NotImplemented def count(self, other): """Number of occurrences of any member of other in this set. If other is an integer, return 1 if present, 0 if not. """ if isinstance(other, int): return 1 if other in self else 0 else: return sum((i in self for i in other), 0) def add(self, other): """Add an element to a bit-set. This has no effect if the element is already present. """ self._insert(other) def union(self, *others): """Return the union of bit-sets as a new set. (i.e. all elements that are in either set.) """ res = self.copy() res.update(*others) return res def update(self, *others): """Update a bit-set with the union of itself and others.""" for other in others: if isinstance(other, BitPascalSet): sm = self._items om = other._items for k, v in safe_dict_iter(om).items(): if k in sm: sm[k] |= v else: sm[k] = v elif isinstance(other, int): self._insert(other) elif isinstance(other, Iterable): for i in other: self._insert(i) elif isinstance(other, slice): start, stop, step = other.start, other.stop, other.step if step is None: step = 1 for i in range(start, stop, step): self._insert(i) else: raise self._invalid_value(other) def intersection(self, *others): """Return the intersection of two or more bit-sets as a new set. (i.e. elements that are common to all of the sets.) """ res = self.copy() res.intersection_update(*others) return res def intersection_update(self, *others): """Update a bit-set with the intersection of itself and another.""" sm = self._items oi, count = 0, len(others) while sm and oi < count: other = others[oi] if not isinstance(other, BitPascalSet): # safe mode for intersection other = PascalSet(i for i in other if isinstance(i, int)) om = other._items for k, v in safe_dict_iter(sm).items(): v &= om.get(k, 0) if v: sm[k] = v else: del sm[k] oi += 1 def difference(self, *others): """Return the difference of two or more bit-sets as a new set. (i.e. all elements that are in this set but not the others.) """ res = self.copy() res.difference_update(*others) return res def difference_update(self, *others): """Remove all elements of another bit-set from this set.""" for other in others: if isinstance(other, BitPascalSet): sm = self._items om = other._items for k, v in safe_dict_iter(om).items(): if k in sm: v = sm[k] & ~v if v: sm[k] = v else: del sm[k] else: for i in other: if isinstance(i, int): self._remove(i) def symmetric_difference(self, other): """Return the symmetric difference of two bit-sets as a new set. (i.e. all elements that are in exactly one of the sets.) """ res = self.copy() res.symmetric_difference_update(other) return res def symmetric_difference_update(self, other): "Update a bit-set with the symmetric difference of itself and another." if not isinstance(other, BitPascalSet): other = BitPascalSet(other) if self: if other: # TODO: Implement more efficiently aux = other - self self -= other self |= aux else: self._items = other._items[:] def discard(self, other): """Remove an element from a bit-set if it is a member. If the element is not a member, do nothing. """ self._remove(other) def remove(self, other): """Remove an element from a bit-set; it must be a member. If the element is not a member, raise a KeyError. """ self._remove(other, fail=True) def pop(self): """Remove and return an arbitrary bit-set element. Raises KeyError if the set is empty. """ sm = self._items if sm: bl = self._bit_length k, v = next(sm.items()) assert v base = k * bl i = 0 ref = 1 res = None while res is None: if ref & v: res = base + i else: ref <<= 1 i += 1 v &= ~ref if v: sm[k] = v else: del sm[k] return res else: raise KeyError("pop from an empty set!") def clear(self): """Remove all elements from this bit-set.""" self._items = {} def copy(self): """Return a shallow copy of a set.""" return type(self)(self) def isdisjoint(self, other): """Return True if two bit-sets have a null intersection.""" if isinstance(other, BitPascalSet): sm, om = self._items, other._items if sm and om: return all(sm.get(k, 0) & v == 0 for k, v in om.items()) else: return True else: return not any(i in self for i in other) def issubset(self, other): """Report whether another set contains this bit-set.""" if isinstance(other, BitPascalSet): sm, om = self._items, other._items if sm: return all(om.get(k, 0) & v == v for k, v in sm.items()) else: return True elif isinstance(other, Container): return not any(i not in other for i in self) else: # Generator cases return sum((i in self for i in other), 0) == len(self) def issuperset(self, other): """Report whether this bit set contains another set.""" if isinstance(other, BitPascalSet): sm, om = self._items, other._items if om: return all(sm.get(k, 0) & v == v for k, v in om.items()) else: return True else: return not any(i not in self for i in other) def _search(self, other): """Search the bit-wise value where ``other`` could be placed. Return a duple :``(seed, bits to shift left)``. """ if isinstance(other, int): sm = self._items bl = self._bit_length k, ref = divmod(other, bl) return k, ref, sm.get(k, 0) else: return None def _insert(self, other): """Add a member in this bit-set.""" aux = self._search(other) if aux: k, ref, v = aux self._items[k] = v | (1 << ref) else: raise self._invalid_value(other) def _remove(self, other, fail=False): """Remove an interval of integers from this bit-set.""" aux = self._search(other) ok = False if aux: k, ref, v = aux if v: aux = v & ~(1 << ref) if v != aux: ok = True sm = self._items if aux: sm[k] = aux else: del sm[k] if not ok and fail: raise KeyError('"%s" is not a member!' % other) def _invalid_value(self, value): cls_name = type(self).__name__ vname = type(value).__name__ msg = 'Unsupported type for value "%s" of type "%s" for a "%s", ' "must be an integer!" return TypeError(msg % (value, vname, cls_name)) @classmethod def _prime_numbers_until(cls, limit): """This is totally a funny test method.""" res = cls[2:limit] for i in range(2, limit // 2 + 1): if i in res: aux = i + i while aux < limit: if aux in res: res.remove(aux) aux += i return res
MutableSet.register(BitPascalSet) # Smart Tools def pair(arg): """Check if `arg` is a pair (two elements tuple or list).""" return arg if isinstance(arg, (tuple, list)) and len(arg) == 2 else None def smart_iter_items(*args, **kwds): """Unify iteration over (key, value) pairs. If a pair of pairs is found, e.g. ``[(1, 2), (3, 4)]``, instead of yielding one pair (the outermost), inner two pairs takes precedence. """ for arg in args: if isinstance(arg, Mapping): for key in arg: yield key, arg[key] elif hasattr(arg, "keys") and hasattr(arg, "__getitem__"): # Custom mappings for key in arg.keys(): yield key, arg[key] elif pair(arg) and not (pair(arg[0]) and pair(arg[1])): yield arg else: for item in arg: if pair(item): yield item else: msg = "'{}' object '{}' is not a pair" raise TypeError(msg.format(type(item).__name__, item)) for key in kwds: yield key, kwds[key] # get rid of unused global variables del deprecated, recursive_repr