Source code for geotecha.inputoutput.timeout

Code to timeout with processes.

Code in this module comes from an activestate code recipe [1]_. For an
asynchronous solution see the active activestate code recipe [2]_.

.. _synchronous:

.. _asynchronous:

.. [1] timeout decorator (with multiprocessing) (Python recipe) synchronous_
.. [2] Timeout Any Function (Python recipe) asynchronous_

>>> timed_longcos = timeout(2)(_longcos)
>>> timed_longcos(1, 0)
>>> timed_longcos(1, 2) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
TimeoutException: timed out after 2 seconds

The following examples from the original activestate code recipe
demonstrate how to use timeout as a decorator.  They don't seem to work for
me as the functions must be defined in __main__ to be pickled.  you get the
idea though.

.. code-block:: python

    def sleep(x):
        print "ABOUT TO SLEEP {0} SECONDS".format(x)
        return x

    Traceback (most recent call last):
    TimeoutException: timed out after 0 seconds


    def exc():
        raise Exception('Houston we have problems!')

    Traceback (most recent call last):
    Exception: Houston we have problems!

#Someuseful stuff

import multiprocessing
import time
import logging
#logger = multiprocessing.log_to_stderr()

[docs]class TimeoutException(Exception): pass
[docs]class RunableProcessing(multiprocessing.Process): def __init__(self, func, *args, **kwargs): self.queue = multiprocessing.Queue(maxsize=1) args = (func,) + args multiprocessing.Process.__init__(self, target=self.run_func, args=args, kwargs=kwargs)
[docs] def run_func(self, func, *args, **kwargs): try: result = func(*args, **kwargs) self.queue.put((True, result)) except Exception as e: self.queue.put((False, e))
[docs] def done(self): return self.queue.full()
[docs] def result(self): return self.queue.get()
[docs]def timeout(seconds, force_kill=True): def wrapper(function): def inner(*args, **kwargs): now = time.time() proc = RunableProcessing(function, *args, **kwargs) proc.start() proc.join(seconds) if proc.is_alive(): if force_kill: proc.terminate() runtime = int(time.time() - now) raise TimeoutException('timed out after {0} seconds'.format(runtime)) assert proc.done() success, result = proc.result() if success: return result else: raise result return inner return wrapper
def _longcos(x, wait=0): """calc cos(x) after waiting `wait` seconds. max wait is 5 seconds""" import math wait=min(wait,5) time.sleep(wait) return math.cos(x) if __name__ == '__main__': #note you can't really run these nose tests within spyder import nose nose.runmodule(argv=['nose', '--verbosity=3', '--with-doctest', '--doctest-options=+ELLIPSIS']) # nose.runmodule(argv=['nose', '--verbosity=3'])