Source code for xoutil.threading

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
# ---------------------------------------------------------------------
# xoutil.threading
# ---------------------------------------------------------------------
# Copyright (c) 2015 Merchise and Contributors
# Copyright (c) 2013, 2014 Merchise Autrement and Contributors
# All rights reserved.
#
# This is free software; you can redistribute it and/or modify it under
# the terms of the LICENCE attached in the distribution package.
#
# Created on 2013-05-28

from __future__ import (division as _py3_division,
                        print_function as _py3_print,
                        unicode_literals as _py3_unicode,
                        absolute_import as _py3_abs_imports)

from xoutil.modules import copy_members
threading = copy_members('threading')
del copy_members


[docs]def async_call(func, args=None, kwargs=None, callback=None, onerror=None): '''Executes a function `func` with the given positional and keyword arguments asynchronously. If `callback` is provided, it is called with a single positional argument: the result of calling `func(*args, **kwargs)`. If the called function ends with an exception and `onerror` is provided, it is called with the exception object. :returns: An event object that gets signalled when the function ends its execution whether normally or with an error. :rtype: :class:`threading.Event` ''' event = threading.Event() event.clear() if not args: args = () if not kwargs: kwargs = {} def async(): try: result = func(*args, **kwargs) if callback: callback(result) except Exception as error: if onerror: onerror(error) finally: event.set() thread = threading.Thread(target=async) thread.setDaemon(True) # XXX: Why? thread.start() return event
class _SyncronizedCaller(object): def __init__(self, pooling=0.005): self.lock = threading.RLock() self._not_bailed = True self.pooling = pooling def __call__(self, funcs, callback, timeout=None): def _syncronized_callback(result): with self.lock: if self._not_bailed: callback(result) events, threads = [], [] for which in funcs: event, thread = async_call(which, callback=_syncronized_callback) events.append(event) threads.append(thread) if timeout: def set_all_events(): with self.lock: self._not_bailed = False for e in events: e.set() timer = threading.Timer(timeout, set_all_events) timer.start() while events: terminated = [] for event in events: flag = event.wait(self.pooling) if flag: terminated.append(event) for e in terminated: events.remove(e) if timeout: timer.cancel()
[docs]def sync_call(funcs, callback, timeout=None): '''Calls several functions each in it's own thread, and waits for all to end. Each time a function ends the `callback` is called (wrapped in a lock to avoid race conditions) with the result of the as a single positional argument. If `timeout` is not None it sould be a float number indicading the seconds to wait before aborting. Functions that terminated before the timeout will have called `callback`, but those that are still working will be ignored. .. todo:: Abort the execution of a thread. :param funcs: A sequences of callables that receive no arguments. ''' sync_caller = _SyncronizedCaller() sync_caller(funcs, callback, timeout)