xoutil.bound – Helpers for bounded execution of co-routines

New in version 1.6.3.

A bounded execution model

Some features are easy to implement using a generator or co-routine (PEP 342). For instance, you might want to “report units of work” one at a time. These kind of features could be easily programmed without any bounds whatsoever, and then you might “weave” the bounds.

This module helps to separate the work-doing function from the boundary-tests definitions.

This document uses the following terminology:

unbounded function

This is the function that does the actual work without testing for any boundary condition. Boundary conditions are not “natural causes” of termination for the algorithm but conditions imposed elsewhere: the environment, resource management, etc.

This function must return a generator, called the unbounded generator.

unbounded generator
The generator returned by an unbounded function. This generator is allowed to yield forever, although it could terminate by itself. So this is actually a possibly unbounded generator, but we keep the term to emphasize.
boundary condition

It’s a condition that does not belong to the logical description of any algorithm. When this condition is met it indicates that the unbounded generator should be closed. The boundary condition is tested each time the unbounded generator yields.

A boundary condition is usually implemented in a single function called the boundary definition.

boundary definition

A function that implements a boundary condition. This function must comply with the boundary protocol (see boundary()).

Sometimes we identify the boundary condition with its boundary definition.

bounded function
It’s the result of applying a boundary definition to an unbounded function.
bounded generator
It’s the result of applying a boundary condition to an unbounded generator.

The bounded execution model takes at least an unbounded generator and a boundary condition. Applying the boundary condition to the unbounded generator ultimately results in a bounded generator, which will behave almost equivalently to the unbounded generator but will stop when the boundary condition yields True or when the unbounded generator itself is exhausted.

Included boundary conditions

xoutil.bound.timed(maxtime)[source]

Becomes True after a given amount of time.

The bounded generator will be allowed to yields values until the maxtime time frame has elapsed.

Usage:

@timed(timedelta(seconds=60))
def do_something_in_about_60s():
    while True:
        yield

Note

This is a very soft limit.

We can’t actually guarrant any enforcement of the time limit. If the bounded generator takes too much time or never yields this predicated can’t do much. This usually helps with batch processing that must not exceed (by too much) a given amount of time.

The timer starts just after the next() function has been called for the predicate initialization. So if the maxtime given is too short this predicated might halt the execution of the bounded function without allowing any processing at all.

If maxtime is not a timedelta, the timedelta will be computed as timedelta(seconds=maxtime).

xoutil.bound.times(n)[source]

Becomes True after a given after the nth item have been produced.

xoutil.bound.accumulated(mass, *attrs, initial=0)[source]

Becomes True after accumulating a given “mass”.

mass is the maximum allowed to accumulate. This is usually a positive number. Each value produced by the unbounded generator is added together. Yield True when this amount to more than the given mass.

If any attrs are provided, they will be considered attributes (or keys) to search inside the yielded data from the bounded function. If no attrs are provided the whole data is accumulated, so it must allow addition. The attribute to be summed is extracted with get_first_of(), so only the first attribute found is added.

If the keyword argument initial is provided the accumulator is initialized with that value. By default this is 0.

xoutil.bound.pred(func, skipargs=True)[source]

Allow “normal” functions to engage within the boundary protocol.

func should take a single argument and return True if the boundary condition has been met.

If skipargs is True then function func will not be called with the tuple (args, kwargs) upon initialization of the boundary, in that case only yielded values from the unbounded generator are passed. If you need to get the original arguments, set skipargs to False, in this case the first time func is called will be passed a single argument (arg, kwargs).

Example:

>>> @pred(lambda x: x > 10)
... def fibonacci():
...     a, b = 1, 1
...     while True:
...        yield a
...        a, b = b, a + b

>>> fibonacci()
13
xoutil.bound.until_errors(*errors)[source]

Becomes True after any of errors has been raised.

Any other exceptions (except GeneratorExit) is propagated. You must pass at least an error.

Normally this will allow some possibly long jobs to be interrupted (SoftTimeLimitException in celery task, for instance) but leave some time for the caller to clean up things.

It’s assumed that your job can be properly finalized after any of the given exceptions has been raised.

Parameters:on_error – A callable that will only be called if the boundary condition is ever met, i.e if any of errors was raised. The callback is called before yielding True.

New in version 1.7.2.

Changed in version 1.7.5: Added the keyword argument on_error.

xoutil.bound.until(time=None, times=None, errors=None)[source]

An idiomatic alias to other boundary definitions.

  • until(maxtime=n) is the same as timed(n).
  • until(times=n) is the same as times(n).
  • until(pred=func, skipargs=skip) is the same as pred(func, skipargs=skip).
  • until(errors=errors, **kwargs) is the same as until_errors(*errors, **kwargs).
  • until(accumulate=mass, path=path, initial=initial) is the same as
    accumulated(mass, *path.split('.'), initial=initial)

Warning

You cannot mix many calls.

New in version 1.7.2.

Chaining several boundary conditions

To created a more complex boundary than the one provided by a single condition you could use the following high-level boundaries:

xoutil.bound.whenany(*boundaries)[source]

An OR-like boundary condition.

It takes several boundaries and returns a single one that behaves like the logical OR, i.e, will yield True when any of its subordinate boundary conditions yield True.

Calls close() of all subordinates upon termination.

Each boundary should be either:

  • A “bare” boundary definition that takes no arguments.
  • A boundary condition (i.e an instance of BoundaryCondition). This is result of calling a boundary definition.
  • A generator object that complies with the boundary protocol. This cannot be tested upfront, a misbehaving generator will cause a RuntimeError if a boundary protocol rule is not followed.

Any other type is a TypeError.

xoutil.bound.whenall(*boundaries)[source]

An AND-like boundary condition.

It takes several boundaries and returns a single one that behaves like the logical AND i.e, will yield True when all of its subordinate boundary conditions have yielded True.

It ensures that once a subordinate yields True it won’t be sent more data, no matter if other subordinates keep on running and consuming data.

Calls close() of all subordinates upon termination.

Each boundary should be either:

  • A “bare” boundary definition that takes no arguments.
  • A boundary condition (i.e an instance of BoundaryCondition). This is result of calling a boundary definition.
  • A generator object that complies with the boundary protocol. This cannot be tested upfront, a misbehaving generator will cause a RuntimeError if a boundary protocol rule is not followed.

Any other type is a TypeError.

Defining boundaries

If none of the boundaries defined deals with a boundary condition you have, you may create another one using boundary(). This is usually employed as decorator on the boundary definition.

xoutil.bound.boundary(definition)[source]

Helper to define a boundary condition.

The definition must be a function that returns a generator. The following rules must be followed. Collectively these rules are called the boundary protocol.

  • The boundary definition will yield True when and only when the boundary condition is met. Only the value True will signal the boundary condition.

  • The boundary definition must yield at least 2 times:

    • First it will be called its next() method to allow for initialization of internal state.
    • Immediately after, it will be called its send() passing the tuple (args, kwargs) with the arguments passed to the unbounded function. At this point the boundary definition may yield True to halt the execution. In this case, the unbounded generator won’t be asked for any value.
  • The boundary definition must yield True before terminating with a StopIteration. For instance the following definition is invalid cause it ends without yielding True:

    @boundary
    def invalid():
        yield
        yield False
    
  • The boundary definition must deal with GeneratorExit exceptions properly since we call the close() method of the generator upon termination. Termination occurs when the unbounded generator stops by any means, even when the boundary condition yielded True or the generator itself is exhausted or there’s an error in the generator.

    Both whenall() and whenany() call the close() method of all their subordinate boundary conditions.

    Most of the time this reduces to not catching GeneratorExit exceptions.

A RuntimeError may happen if any of these rules is not followed by the definition. Furthermore, this error will occur when invoking the bounded function and not when applying the boundary to the unbounded generator.

Illustration of a boundary

Let’s explain in detail the implementation of times() as an example of how a boundary condition could be implemented.

1
2
3
4
5
6
7
8
9
@boundary
def times(n):
    '''Becomes True after the `nth` item have been produced.'''
    passed = 0
    yield False
    while passed < n:
        yield False
        passed += 1
    yield True

We implemented the boundary condition via the boundary() helper. This helpers allows to implement the boundary condition via a boundary definition (the function above). The boundary helper takes the definition and builds a BoundaryCondition instance. This instance can then be used to decorate the unbounded function, returning a bounded function (a Bounded instance).

When the bounded function is called, what actually happens is that:

  • First the boundary condition is invoked passing the n argument, and thus we obtain the generator from the times function.

  • We also get the generator from the unbounded function.

  • Then we call next(boundary) to allow the times boundary to initialize itself. This runs the code of the times definition up to the line 5 (the first yield statement).

  • The bounded function ignores the message from the boundary at this point.

  • Then it sends the arguments passed to original function via the send() method of the boundary condition generator.

  • This unfreezes the boundary condition that now tests whether passes is less that n. If this is true, the boundary yields False and suspends there at line 7.

  • The bounded function see that message is not True and asks the unbounded generator for its next value.

  • Then it sends that value to the boundary condition generator, which resumes execution at line 8. The value sent is ignored and passes gets incremented by 1.

  • Again the generator asks if passes is less that n. If passes has reached n, it will execute line 9, yielding True.

  • The bounded function see that the boundary condition is True and calls the close() method to the boundary condition generator.

  • This is like raising a GeneratorExit just after resuming the times below line 9. The error is not trapped and propagates the close() method of the generator knows this means the generator has properly finished.

    Note

    Other boundaries might need to deal with GeneratorExit explicitly.

  • Then the bounded function regains control and calls the close() method of the unbounded generator, this effectively raises a GeneratorExit inside the unbounded generator, which if untreated means everything went well.

If you look at the implementation of the included boundary conditions, you’ll see that all have the same pattern:

  1. Initialization code, followed by a yield False statement. This is a clear indicator that the included boundary conditions disregard the first message (the arguments to the unbounded function).
  2. A looping structure that tests the condition has not been met and yields False at each cycle.
  3. The yield True statement outside the loop to indicate the boundary condition has been met.

This pattern is not an accident. Exceptionally whenall() and whenany() lack the first standalone yield False because they must not assume all its subordinate predicates will ignore the first message.

Internal API

class xoutil.bound.Bounded(target)[source]

The bounded function.

This is the result of applying a boundary definition to an unbounded function (or generator).

If target is a function this instance can be called several times. If it’s a generator then it will be closed after either calling (__call__) this instance, or consuming the generator given by generate().

This class is actually subclassed inside the apply() so that the weaving boundary definition with the target unbounded function is not exposed.

__call__(*args, **kwargs)[source]

Return the last value from the underlying bounded generator.

generate(*args, **kwargs)[source]

Return the bounded generator.

This method exposes the bounded generator. This allows you to “see” all the values yielded by the unbounded generator up to the point when the boundary condition is met.

class xoutil.bound.BoundaryCondition(definition, name=None, errors=None)[source]

Embodies the boundary protocol.

The definition argument must a function that implements a boundary definition. This function may take arguments to initialize the state of the boundary condition.

Instances are callables that will return a Bounded subclass specialized with the application of the boundary condition to a given unbounded function (target). For instance, times(6) returns a class, that when instantiated with a target represents the bounded function that takes the 6th valued yielded by target.

If the definition takes no arguments for initialization you may pass the target directly. This is means that if __call__() receives arguments they will be used to instantiate the Bounded subclass, ie. this case allows only a single argument target.

If errors is not None it should be a tuple of exceptions to catch and throw inside the boundary condition definition. Other exceptions, beside GeneratorExit and StopIteration, are not handled (so the bubble up). See until_error().

An example: time bounded batch processing

We have a project in which we need to send emails inside a cron task (celery is not available). Emails to be sent are placed inside an Outbox but we may only spent about 60 seconds to send as many emails as we can. If our emails are reasonably small (i.e will be delivered to the SMTP server in a few miliseconds) we could use the timed() predicate to bound the execution of the task:

@timed(50)
def send_emails():
   outbox = Outbox.open()
   try:
      for message in outbox:
         emailbackend.send(message)
         outbox.remove(message)
         yield message
   except GeneratorExit:
      # This means the time we were given is off.
      pass
   finally:
      outbox.close()  # commit the changes to the outbox

Notice that you must enclose your batch-processing code in a try statement if you need to somehow commit changes. Since we may call the close() method of the generator to signal that it must stop.

A finally clause is not always appropriated cause an error that is not GeneratorExit error should not commit the data unless you’re sure data changes that were made before the error could be produced. In the code above the only place in the code above where an error could happen is the sending of the email, and the data is only touched for each email that is actually sent. So we can safely close our outbox and commit the removal of previous message from the outbox.

Using the Bounded.generate() method

Calling a bounded generator simply returns the last valued produced by the unbounded generator, but sometimes you need to actually see all the values produced. This is useful if you need to meld several generators with partially overlapping boundary conditions.

Let’s give an example by extending a bit the example given in the previous section. Assume you now need to extend your cron task to also read an Inbox as much as it can and then send as many messages as it can. Both things should be done under a given amount of time, however the accumulated size of sent messages should not surpass a threshold of bytes to avoid congestion.

For this task you may use both timed() and accumulated(). But you must apply accumulated() only to the process of sending the messages and the timed boundary to the overall process.

This can be accomplished like this:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
def communicate(interval, bandwidth):
    from itertools import chain as meld

    def receive():
        for message in Inbox.receive():
           yield message

    @accumulated(bandwith, 'size')
    def send():
        for message in Outbox.messages():
            yield message

    @timed(interval)
    def execute():
        for _ in meld(receive(), send.generate()):
            yield
    return execute()

Let’s break this into its parts:

  • The receive function reads the Inbox and yields each message received.

    It is actually an unbounded function but don’t want to bound its execution in isolation.

  • The send unbounded function sends every message we have in the Outbox and yields each one. In this case we can apply the accumulated boundary to get a Bounded instance.

  • Then we define an execute function bounded by timed. This function melds the receive and send processes, but we can’t actually call send because we need to yield after each message has been received or sent. That’s why we need to call the generate() so that the time boundary is also applied to the sending process.

Note

The structure from this example is actually taken from a real program, although simplified to serve better for learning. For instance, in our real-world program bandwidth could be None to indicate no size limit should be applied to the sending process. Also in the example we’re not actually saving nor sending messages!