Source code for xotl.tools.future.threading

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# ---------------------------------------------------------------------
# Copyright (c) Merchise Autrement [~º/~] and Contributors
# All rights reserved.
#
# This is free software; you can do what the LICENCE file allows you to.
#

"""Extensions to Python's `threading` module.

You may use it as drop-in replacement of ``threading``. Although we don't
document all items here.  Refer to `threading`:mod: documentation.

"""

import threading as _stdlib  # noqa
from threading import *  # noqa
from threading import Event, RLock, Thread, Timer


[docs]def async_call(func, args=None, kwargs=None, callback=None, onerror=None): """Executes a function asynchronously. The function receives the given positional and keyword arguments If `callback` is provided, it is called with a single positional argument: the result of calling `func(*args, **kwargs)`. If the called function ends with an exception and `onerror` is provided, it is called with the exception object. :returns: An event object that gets signalled when the function ends its execution whether normally or with an error. :rtype: `Event`:class: """ event = Event() event.clear() if not args: args = () if not kwargs: kwargs = {} def async_(): try: result = func(*args, **kwargs) if callback: callback(result) except Exception as error: if onerror: onerror(error) finally: event.set() thread = Thread(target=async_) thread.setDaemon(True) # XXX: Why? thread.start() return event
class _SyncronizedCaller: """Protected to be used in `sync_call`:func:""" def __init__(self, pooling=0.005): self.lock = RLock() self._not_bailed = True self.pooling = pooling def __call__(self, funcs, callback, timeout=None): def _syncronized_callback(result): with self.lock: if self._not_bailed: callback(result) events, threads = [], [] for which in funcs: event, thread = async_call(which, callback=_syncronized_callback) events.append(event) threads.append(thread) if timeout: def set_all_events(): with self.lock: self._not_bailed = False for e in events: e.set() timer = Timer(timeout, set_all_events) timer.start() while events: terminated = [] for event in events: flag = event.wait(self.pooling) if flag: terminated.append(event) for e in terminated: events.remove(e) if timeout: timer.cancel()
[docs]def sync_call(funcs, callback, timeout=None): """Calls several functions, each one in it's own thread. Waits for all to end. Each time a function ends the `callback` is called (wrapped in a lock to avoid race conditions) with the result of the as a single positional argument. If `timeout` is not None it sould be a float number indicading the seconds to wait before aborting. Functions that terminated before the timeout will have called `callback`, but those that are still working will be ignored. .. todo:: Abort the execution of a thread. :param funcs: A sequences of callables that receive no arguments. """ sync_caller = _SyncronizedCaller() sync_caller(funcs, callback, timeout)
from threading import __all__ # noqa __all__ = list(__all__) + ["async_call", "sync_call"]