xotl.tools.bound
– Helpers for bounded execution of co-routines¶
New in version 1.6.3.
A bounded execution model¶
Some features are easy to implement using a generator or co-routine
(PEP 342). For instance, you might want to “report units of work” one at a
time. These kind of features could be easily programmed without any bounds
whatsoever, and then you might “weave” the bounds.
This module helps to separate the work-doing function from the boundary-tests definitions.
This document uses the following terminology:
- unbounded function
This is the function that does the actual work without testing for any boundary condition. Boundary conditions are not “natural causes” of termination for the algorithm but conditions imposed elsewhere: the environment, resource management, etc.
This function must return a generator, called the unbounded generator.
- unbounded generator
- The generator returned by an unbounded function. This generator
is allowed to yield forever, although it could terminate by itself. So
this is actually a
possibly
unbounded generator, but we keep the term to emphasize. - boundary condition
It’s a condition that does not belong to the logical description of any algorithm. When this condition is met it indicates that the unbounded generator should be closed. The boundary condition is tested each time the unbounded generator yields.
A boundary condition is usually implemented in a single function called the boundary definition.
- boundary definition
A function that implements a boundary condition. This function must comply with the boundary protocol (see
boundary()
).Sometimes we identify the boundary condition with its
boundary definition
.- bounded function
- It’s the result of applying a
boundary definition
to anunbounded function
. - bounded generator
- It’s the result of applying a
boundary condition
to anunbounded generator
.
The bounded execution model takes at least an unbounded generator
and a
boundary condition
. Applying the boundary condition to the unbounded
generator ultimately results in a bounded generator
, which will behave
almost equivalently to the unbounded generator
but will stop when the
boundary condition yields True or when the unbounded generator itself is
exhausted.
Included boundary conditions¶
-
xotl.tools.bound.
timed
(maxtime)[source]¶ Becomes True after a given amount of time.
The bounded generator will be allowed to yields values until the
maxtime
time frame has elapsed.Usage:
@timed(timedelta(seconds=60)) def do_something_in_about_60s(): while True: yield
Note
This is a very soft limit.
We can’t actually guarrant any enforcement of the time limit. If the bounded generator takes too much time or never yields this predicated can’t do much. This usually helps with batch processing that must not exceed (by too much) a given amount of time.
The timer starts just after the
next()
function has been called for the predicate initialization. So if themaxtime
given is too short this predicated might halt the execution of the bounded function without allowing any processing at all.If
maxtime
is not a timedelta, the timedelta will be computed astimedelta(seconds=maxtime)
.
-
xotl.tools.bound.
times
(n)[source]¶ Becomes True after a given after the
nth
item have been produced.
-
xotl.tools.bound.
accumulated
(mass, *attrs, initial=0)[source]¶ Becomes True after accumulating a given “mass”.
mass
is the maximum allowed to accumulate. This is usually a positive number. Each value produced by theunbounded generator
is added together. Yield True when this amount to more than the givenmass
.If any
attrs
are provided, they will be considered attributes (or keys) to search inside the yielded data from the bounded function. If noattrs
are provided the whole data is accumulated, so it must allow addition. The attribute to be summed is extracted withget_first_of()
, so only the first attribute found is added.If the keyword argument
initial
is provided the accumulator is initialized with that value. By default this is 0.
-
xotl.tools.bound.
pred
(func, skipargs=True)[source]¶ Allow “normal” functions to engage within the boundary protocol.
func
should take a single argument and return True if the boundary condition has been met.If
skipargs
is True then functionfunc
will not be called with the tuple(args, kwargs)
upon initialization of the boundary, in that case only yielded values from theunbounded generator
are passed. If you need to get the original arguments, setskipargs
to False, in this case the first timefunc
is called will be passed a single argument(arg, kwargs)
.Example:
>>> @pred(lambda x: x > 10) ... def fibonacci(): ... a, b = 1, 1 ... while True: ... yield a ... a, b = b, a + b >>> fibonacci() 13
-
xotl.tools.bound.
until_errors
(*errors)[source]¶ Becomes True after any of
errors
has been raised.Any other exceptions (except GeneratorExit) is propagated. You must pass at least an error.
Normally this will allow some possibly long jobs to be interrupted (SoftTimeLimitException in celery task, for instance) but leave some time for the caller to clean up things.
It’s assumed that your job can be properly finalized after any of the given exceptions has been raised.
Parameters: on_error – A callable that will only be called if the boundary condition is ever met, i.e if any of errors
was raised. The callback is called before yielding True.New in version 1.7.2.
Changed in version 1.7.5: Added the keyword argument
on_error
.
-
xotl.tools.bound.
until
(time=None, times=None, errors=None)[source]¶ An idiomatic alias to other boundary definitions.
until(maxtime=n)
is the same astimed(n)
.until(times=n)
is the same astimes(n)
.until(pred=func, skipargs=skip)
is the same aspred(func, skipargs=skip)
.until(errors=errors, **kwargs)
is the same asuntil_errors(*errors, **kwargs)
.until(accumulate=mass, path=path, initial=initial)
is the same asaccumulated(mass, *path.split('.'), initial=initial)
Warning
You cannot mix many calls.
New in version 1.7.2.
Chaining several boundary conditions¶
To created a more complex boundary than the one provided by a single condition you could use the following high-level boundaries:
-
xotl.tools.bound.
whenany
(*boundaries)[source]¶ An OR-like boundary condition.
It takes several boundaries and returns a single one that behaves like the logical OR, i.e, will yield True when any of its subordinate boundary conditions yield True.
Calls
close()
of all subordinates upon termination.Each
boundary
should be either:- A “bare” boundary definition that takes no arguments.
- A boundary condition (i.e an instance of
BoundaryCondition
). This is result of calling a boundary definition. - A generator object that complies with the boundary protocol. This cannot be tested upfront, a misbehaving generator will cause a RuntimeError if a boundary protocol rule is not followed.
Any other type is a TypeError.
-
xotl.tools.bound.
whenall
(*boundaries)[source]¶ An AND-like boundary condition.
It takes several boundaries and returns a single one that behaves like the logical AND i.e, will yield True when all of its subordinate boundary conditions have yielded True.
It ensures that once a subordinate yields True it won’t be sent more data, no matter if other subordinates keep on running and consuming data.
Calls
close()
of all subordinates upon termination.Each
boundary
should be either:- A “bare” boundary definition that takes no arguments.
- A boundary condition (i.e an instance of
BoundaryCondition
). This is result of calling a boundary definition. - A generator object that complies with the boundary protocol. This cannot be tested upfront, a misbehaving generator will cause a RuntimeError if a boundary protocol rule is not followed.
Any other type is a TypeError.
Defining boundaries¶
If none of the boundaries defined deals with a boundary condition you have,
you may create another one using boundary()
. This is usually employed
as decorator on the boundary definition.
-
xotl.tools.bound.
boundary
(definition)[source]¶ Helper to define a boundary condition.
The
definition
must be a function that returns a generator. The following rules must be followed. Collectively these rules are called theboundary protocol
.The
boundary definition
will yield True when and only when the boundary condition is met. Only the value True will signal the boundary condition.The
boundary definition
must yield at least 2 times:- First it will be called its
next()
method to allow for initialization of internal state. - Immediately after, it will be called its
send()
passing the tuple(args, kwargs)
with the arguments passed to theunbounded function
. At this point the boundary definition may yield True to halt the execution. In this case, theunbounded generator
won’t be asked for any value.
- First it will be called its
The
boundary definition
must yield True before terminating with a StopIteration. For instance the following definition is invalid cause it ends without yielding True:@boundary def invalid(): yield yield False
The
boundary definition
must deal with GeneratorExit exceptions properly since we call theclose()
method of the generator upon termination. Termination occurs when theunbounded generator
stops by any means, even when the boundary condition yielded True or the generator itself is exhausted or there’s an error in the generator.Both
whenall()
andwhenany()
call theclose()
method of all their subordinate boundary conditions.Most of the time this reduces to not catching GeneratorExit exceptions.
A RuntimeError may happen if any of these rules is not followed by the
definition
. Furthermore, this error will occur when invoking thebounded function
and not when applying the boundary to theunbounded generator
.
Illustration of a boundary¶
Let’s explain in detail the implementation of times()
as an example of
how a boundary condition could be implemented.
1 2 3 4 5 6 7 8 9 | @boundary def times(n): '''Becomes True after the `nth` item have been produced.''' passed = 0 yield False while passed < n: yield False passed += 1 yield True |
We implemented the boundary condition via the boundary()
helper. This
helpers allows to implement the boundary condition via a boundary definition
(the function above). The boundary
helper takes the definition and builds
a BoundaryCondition
instance. This instance can then be used to
decorate the unbounded function
, returning a bounded function
(a
Bounded
instance).
When the bounded function
is called, what actually happens is that:
First the boundary condition is invoked passing the
n
argument, and thus we obtain the generator from thetimes
function.We also get the generator from the unbounded function.
Then we call
next(boundary)
to allow thetimes
boundary to initialize itself. This runs the code of thetimes
definition up to the line 5 (the firstyield
statement).The
bounded function
ignores the message from the boundary at this point.Then it sends the arguments passed to original function via the
send()
method of the boundary condition generator.This unfreezes the boundary condition that now tests whether
passes
is less thatn
. If this is true, the boundary yields False and suspends there at line 7.The
bounded function
see that message is not True and asks theunbounded generator
for its next value.Then it sends that value to the boundary condition generator, which resumes execution at line 8. The value sent is ignored and
passes
gets incremented by 1.Again the generator asks if
passes
is less thatn
. If passes has reachedn
, it will execute line 9, yielding True.The
bounded function
see that the boundary condition is True and calls theclose()
method to the boundary condition generator.This is like raising a GeneratorExit just after resuming the
times
below line 9. The error is not trapped and propagates theclose()
method of the generator knows this means the generator has properly finished.Note
Other boundaries might need to deal with GeneratorExit explicitly.
Then the
bounded function
regains control and calls theclose()
method of theunbounded generator
, this effectively raises a GeneratorExit inside the unbounded generator, which if untreated means everything went well.
If you look at the implementation of the included boundary conditions, you’ll see that all have the same pattern:
- Initialization code, followed by a
yield False
statement. This is a clear indicator that the included boundary conditions disregard the first message (the arguments to the unbounded function). - A looping structure that tests the condition has not been met and yields False at each cycle.
- The
yield True
statement outside the loop to indicate the boundary condition has been met.
This pattern is not an accident. Exceptionally whenall()
and
whenany()
lack the first standalone yield False
because they must not
assume all its subordinate predicates will ignore the first message.
Internal API¶
-
class
xotl.tools.bound.
Bounded
(target)[source]¶ The bounded function.
This is the result of applying a
boundary definition
to anunbounded function
(or generator).If
target
is a function this instance can be called several times. If it’s a generator then it will be closed after either calling (__call__
) this instance, or consuming the generator given bygenerate()
.This class is actually subclassed inside the
apply()
so that the weaving boundary definition with thetarget
unbounded function is not exposed.
-
class
xotl.tools.bound.
BoundaryCondition
(definition, name=None, errors=None)[source]¶ Embodies the boundary protocol.
The
definition
argument must a function that implements aboundary definition
. This function may take arguments to initialize the state of the boundary condition.Instances are callables that will return a
Bounded
subclass specialized with the application of theboundary condition
to a given unbounded function (target
). For instance,times(6)
returns a class, that when instantiated with atarget
represents the bounded function that takes the 6th valued yielded by target.If the
definition
takes no arguments for initialization you may pass thetarget
directly. This is means that if__call__()
receives arguments they will be used to instantiate theBounded
subclass, ie. this case allows only a single argumenttarget
.If
errors
is not None it should be a tuple of exceptions to catch and throw inside the boundary condition definition. Other exceptions, beside GeneratorExit and StopIteration, are not handled (so the bubble up). Seeuntil_error()
.
An example: time bounded batch processing¶
We have a project in which we need to send emails inside a cron
task
(celery is not available). Emails to be sent are placed inside an Outbox
but we may only spent about 60 seconds to send as many emails as we can. If
our emails are reasonably small (i.e will be delivered to the SMTP server in a
few miliseconds) we could use the timed()
predicate to bound the
execution of the task:
@timed(50)
def send_emails():
outbox = Outbox.open()
try:
for message in outbox:
emailbackend.send(message)
outbox.remove(message)
yield message
except GeneratorExit:
# This means the time we were given is off.
pass
finally:
outbox.close() # commit the changes to the outbox
Notice that you must enclose your batch-processing code in a try
statement if you need to somehow commit changes. Since we may call the
close()
method of the generator to signal that it must stop.
A finally
clause is not always appropriated cause an error that is not
GeneratorExit error should not commit the data unless you’re sure data changes
that were made before the error could be produced. In the code above the only
place in the code above where an error could happen is the sending of the
email, and the data is only touched for each email that is actually sent. So
we can safely close our outbox and commit the removal of previous message from
the outbox.
Using the Bounded.generate()
method¶
Calling a bounded generator
simply returns the last valued produced by the
unbounded generator
, but sometimes you need to actually see all the values
produced. This is useful if you need to meld several generators
with
partially overlapping boundary conditions.
Let’s give an example by extending a bit the example given in the previous section. Assume you now need to extend your cron task to also read an Inbox as much as it can and then send as many messages as it can. Both things should be done under a given amount of time, however the accumulated size of sent messages should not surpass a threshold of bytes to avoid congestion.
For this task you may use both timed()
and accumulated()
. But you
must apply accumulated()
only to the process of sending the messages and
the timed
boundary to the overall process.
This can be accomplished like this:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | def communicate(interval, bandwidth): from itertools import chain as meld def receive(): for message in Inbox.receive(): yield message @accumulated(bandwith, 'size') def send(): for message in Outbox.messages(): yield message @timed(interval) def execute(): for _ in meld(receive(), send.generate()): yield return execute() |
Let’s break this into its parts:
The
receive
function reads the Inbox and yields each message received.It is actually an unbounded function but we don’t want to bound its execution in isolation.
The
send
unbounded function sends every message we have in the Outbox and yields each one. In this case we can apply theaccumulated
boundary to get aBounded
instance.Then we define an
execute
function bounded bytimed
. This function melds thereceive
andsend
processes, but we can’t actually callsend
because we need to yield after each message has been received or sent. That’s why we need to call thegenerate()
so that the time boundary is also applied to the sending process.
Note
The structure from this example is actually taken from a real
program, although simplified to serve better for learning. For instance,
in our real-world program bandwidth
could be None to indicate no size
limit should be applied to the sending process. Also in the example we’re
not actually saving nor sending messages!