Add timeout argument to python’s Queue.join()
I want to be able to join() the Queue class but timeouting after some time if the call hasn’t returned yet. What is the best way to do it? Is it possible to do it by subclassing queue\using metaclass?
4 Answers 4
Subclassing Queue is probably the best way. Something like this should work (untested):
def join_with_timeout(self, timeout): self.all_tasks_done.acquire() try: endtime = time() + timeout while self.unfinished_tasks: remaining = endtime - time() if remaining
Thanks! Where did you get info about all_task_done? I looked in docs.python.org/library/queue.html#module-Queue but I don't see any mention of that memeber.
You can read the source code for Queue. It has a timeout parameter implemented for put and get , it was easy enough to extend join to use a similar approach.
Any idea why all_tasks_done is not documented? This may mean that this method could be changed/broken in any release.
The join() method is all about waiting for all the tasks to be done. If you don't care whether the tasks have actually finished, you can periodically poll the unfinished task count:
stop = time() + timeout while q.unfinished_tasks and time() < stop: sleep(1)
This loop will exist either when the tasks are done or when the timeout period has elapsed.
At first, you should ensure that all your working threads in the queue exit with task_done()
To implement a timeout functionality with Queue , you can wrap the Queue's code in a Thread and add a timeout for this Thread using Thread.join([timeout])
untested example to outline what I suggest
def worker(): while True: item = q.get() do_work(item) q.task_done() def queuefunc(): q = Queue() for i in range(num_worker_threads): t = Thread(target=worker) t.setDaemon(True) t.start() for item in source(): q.put(item) q.join() # block until all tasks are done t = Thread(target=queuefunc) t.start() t.join(100) # timeout applies here
Python - Join Multiple Threads With Timeout
I have multiple Process threads running and I'd like to join all of them together with a timeout parameter. I understand that if no timeout were necessary, I'd be able to write:
for thread in threads: thread.join()
One solution I thought of was to use a master thread that joined all the threads together and attempt to join that thread. However, I received the following error in Python:
AssertionError: can only join a child process
def join_all(threads): for thread in threads: thread.join() if __name__ == '__main__': for thread in threads: thread.start() master = multiprocessing.Process(target=join_all, args=(threads,)) master.start() master.join(timeout=60)
4 Answers 4
You could loop over each thread repeatedly, doing non-blocking checks to see if the thread is done:
import time def timed_join_all(threads, timeout): start = cur_time = time.time() while cur_time
This answer is initially based on that by dano but has a number of changes.
join_all takes a list of threads and a timeout (in seconds) and attempts to join all of the threads. It does this by making a non-blocking call to Thread.join (by setting the timeout to 0 , as join with no arguments will never timeout).
Once all the threads have finished (by checking is_alive() on each of them) the loop will exit prematurely.
If some threads are still running by the time the timeout occurs, the function raises a RuntimeError with information about the remaining threads.
import time def join_all(threads, timeout): """ Args: threads: a list of thread objects to join timeout: the maximum time to wait for the threads to finish Raises: RuntimeError: is not all the threads have finished by the timeout """ start = cur_time = time.time() while cur_time threads: '.format(num, names)) if __name__ == '__main__': for thread in threads: thread.start() join_all(threads, 60)
In my usage of this, it was inside a test suite where the threads were dæmonised versions of ExcThread so that if the threads never finished running, it wouldn't matter.
How does threading.join() detect a timeout?
We are running quit a large Python code to randomly scan the parameter space of some physics models (So, it is very difficult to give a minimal example, sorry). Evaluating one parameter point takes about 300ms, but sometimes (I don't know why) the evaluation suddenly takes several hours which kills the CPU budget we have on a computing cluster. So, my idea was to use threading to give each evaluation of a parameter point a maximal time for running. If the evaluation takes longer, then I can ignore this point as being unphysical. Now, this does not seem to work. I start the calculation in a new thread, join it to the main thread with a timeout set to, say, 1 second, but the main thread still keeps on waiting for the calculation to terminate (which takes significantly longer than 1 second). How is this possible? How does threading measure the time the new thread is already running? I have to say that during the evaluation of one parameter point I heavily use nlopt, numpy and scipy. Most of which is, as I assume, written not directly in python but rather some binaries are used to speed up the calculation. Does this affect threading (since the functions are "black boxes" to it)? Thanks!
Did you read the documentation for join() ? Quote: As join() always returns None , you must call is_alive() after join() to decide whether a timeout happened – if the thread is still alive, the join() call timed out.
Also: the standard interface does not provide any way of killing a thread. You are better off using multi processing, which are way easier to kill.
1 Answer 1
I don't think threading.join checks timeout. You have to check if it has timed out.
In either case to get a working solution, a minimal code snippet would help. This is mostly a guess, but if the main process isn't checking the timeout then it will just keep on keeping on.
Let's see where the timeout parameter goes:
self._wait_for_tstate_lock(timeout=max(timeout, 0))
def _wait_for_tstate_lock(self, block=True, timeout=-1): # Issue #18808: wait for the thread state to be gone. # At the end of the thread's life, after all knowledge of the thread # is removed from C data structures, C code releases our _tstate_lock. # This method passes its arguments to _tstate_lock.acquire(). # If the lock is acquired, the C code is done, and self._stop() is # called. That sets ._is_stopped to True, and ._tstate_lock to None. lock = self._tstate_lock if lock is None: # already determined that the C code is done assert self._is_stopped elif lock.acquire(block, timeout): lock.release() self._stop()
If no lock make sure the thread is stopped. Otherwise acquire the lock with given the parameters block and timeout .
def acquire(self, blocking=True, timeout=-1): """Acquire a lock, blocking or non-blocking. When invoked without arguments: if this thread already owns the lock, increment the recursion level by one, and return immediately. Otherwise, if another thread owns the lock, block until the lock is unlocked. Once the lock is unlocked (not owned by any thread), then grab ownership, set the recursion level to one, and return. If more than one thread is blocked waiting until the lock is unlocked, only one at a time will be able to grab ownership of the lock. There is no return value in this case. When invoked with the blocking argument set to true, do the same thing as when called without arguments, and return true. When invoked with the blocking argument set to false, do not block. If a call without an argument would block, return false immediately; otherwise, do the same thing as when called without arguments, and return true. When invoked with the floating-point timeout argument set to a positive value, block for at most the number of seconds specified by timeout and as long as the lock cannot be acquired. Return true if the lock has been acquired, false if the timeout has elapsed. """ me = get_ident() if self._owner == me: self._count += 1 return 1 rc = self._block.acquire(blocking, timeout) if rc: self._owner = me self._count = 1 return rc
To acquire the lock get the thread identity. Increment a count.
self._block = _allocate_lock()
_allocate_lock = _thread.allocate_lock