Mailing List Archive

bpo-44733: Add max_tasks_per_child to ProcessPoolExecutor (GH-27373)
https://github.com/python/cpython/commit/fdc0e09c3316098b038996c428e88931f0a4fcdb
commit: fdc0e09c3316098b038996c428e88931f0a4fcdb
branch: main
author: Logan Jones <loganasherjones@gmail.com>
committer: pitrou <pitrou@free.fr>
date: 2021-11-20T21:19:41+01:00
summary:

bpo-44733: Add max_tasks_per_child to ProcessPoolExecutor (GH-27373)

Co-authored-by: Antoine Pitrou <antoine@python.org>

files:
A Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst
M Doc/library/concurrent.futures.rst
M Lib/concurrent/futures/process.py
M Lib/test/test_concurrent_futures.py

diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst
index 897efc2f54442..b4213b451157e 100644
--- a/Doc/library/concurrent.futures.rst
+++ b/Doc/library/concurrent.futures.rst
@@ -231,7 +231,7 @@ that :class:`ProcessPoolExecutor` will not work in the interactive interpreter.
Calling :class:`Executor` or :class:`Future` methods from a callable submitted
to a :class:`ProcessPoolExecutor` will result in deadlock.

-.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())
+.. class:: ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

An :class:`Executor` subclass that executes calls asynchronously using a pool
of at most *max_workers* processes. If *max_workers* is ``None`` or not
@@ -252,6 +252,11 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
pending jobs will raise a :exc:`~concurrent.futures.process.BrokenProcessPool`,
as well as any attempt to submit more jobs to the pool.

+ *max_tasks_per_child* is an optional argument that specifies the maximum
+ number of tasks a single process can execute before it will exit and be
+ replaced with a fresh worker process. The default *max_tasks_per_child* is
+ ``None`` which means worker processes will live as long as the pool.
+
.. versionchanged:: 3.3
When one of the worker processes terminates abruptly, a
:exc:`BrokenProcessPool` error is now raised. Previously, behaviour
@@ -264,6 +269,10 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.

Added the *initializer* and *initargs* arguments.

+ .. versionchanged:: 3.11
+ The *max_tasks_per_child* argument was added to allow users to
+ control the lifetime of workers in the pool.
+

.. _processpoolexecutor-example:

diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py
index 9904db78c5b4c..19e93a608b276 100644
--- a/Lib/concurrent/futures/process.py
+++ b/Lib/concurrent/futures/process.py
@@ -141,10 +141,11 @@ def __init__(self, future, fn, args, kwargs):
self.kwargs = kwargs

class _ResultItem(object):
- def __init__(self, work_id, exception=None, result=None):
+ def __init__(self, work_id, exception=None, result=None, exit_pid=None):
self.work_id = work_id
self.exception = exception
self.result = result
+ self.exit_pid = exit_pid

class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
@@ -201,17 +202,19 @@ def _process_chunk(fn, chunk):
return [fn(*args) for args in chunk]


-def _sendback_result(result_queue, work_id, result=None, exception=None):
+def _sendback_result(result_queue, work_id, result=None, exception=None,
+ exit_pid=None):
"""Safely send back the given result or exception"""
try:
result_queue.put(_ResultItem(work_id, result=result,
- exception=exception))
+ exception=exception, exit_pid=exit_pid))
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
- result_queue.put(_ResultItem(work_id, exception=exc))
+ result_queue.put(_ResultItem(work_id, exception=exc,
+ exit_pid=exit_pid))


-def _process_worker(call_queue, result_queue, initializer, initargs):
+def _process_worker(call_queue, result_queue, initializer, initargs, max_tasks=None):
"""Evaluates calls from call_queue and places the results in result_queue.

This worker is run in a separate process.
@@ -232,25 +235,38 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
# The parent will notice that the process stopped and
# mark the pool broken
return
+ num_tasks = 0
+ exit_pid = None
while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(os.getpid())
return
+
+ if max_tasks is not None:
+ num_tasks += 1
+ if num_tasks >= max_tasks:
+ exit_pid = os.getpid()
+
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
exc = _ExceptionWithTraceback(e, e.__traceback__)
- _sendback_result(result_queue, call_item.work_id, exception=exc)
+ _sendback_result(result_queue, call_item.work_id, exception=exc,
+ exit_pid=exit_pid)
else:
- _sendback_result(result_queue, call_item.work_id, result=r)
+ _sendback_result(result_queue, call_item.work_id, result=r,
+ exit_pid=exit_pid)
del r

# Liberate the resource as soon as possible, to avoid holding onto
# open files or shared memory that is not needed anymore
del call_item

+ if exit_pid is not None:
+ return
+

class _ExecutorManagerThread(threading.Thread):
"""Manages the communication between this process and the worker processes.
@@ -301,6 +317,10 @@ def weakref_cb(_,
# A queue.Queue of work ids e.g. Queue([5, 6, ...]).
self.work_ids_queue = executor._work_ids

+ # Maximum number of tasks a worker process can execute before
+ # exiting safely
+ self.max_tasks_per_child = executor._max_tasks_per_child
+
# A dict mapping work ids to _WorkItems e.g.
# {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
self.pending_work_items = executor._pending_work_items
@@ -320,15 +340,23 @@ def run(self):
return
if result_item is not None:
self.process_result_item(result_item)
+
+ process_exited = result_item.exit_pid is not None
+ if process_exited:
+ p = self.processes.pop(result_item.exit_pid)
+ p.join()
+
# Delete reference to result_item to avoid keeping references
# while waiting on new results.
del result_item

- # attempt to increment idle process count
- executor = self.executor_reference()
- if executor is not None:
- executor._idle_worker_semaphore.release()
- del executor
+ if executor := self.executor_reference():
+ if process_exited:
+ with self.shutdown_lock:
+ executor._adjust_process_count()
+ else:
+ executor._idle_worker_semaphore.release()
+ del executor

if self.is_shutting_down():
self.flag_executor_shutting_down()
@@ -578,7 +606,7 @@ class BrokenProcessPool(_base.BrokenExecutor):

class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None, mp_context=None,
- initializer=None, initargs=()):
+ initializer=None, initargs=(), *, max_tasks_per_child=None):
"""Initializes a new ProcessPoolExecutor instance.

Args:
@@ -589,6 +617,11 @@ def __init__(self, max_workers=None, mp_context=None,
object should provide SimpleQueue, Queue and Process.
initializer: A callable used to initialize worker processes.
initargs: A tuple of arguments to pass to the initializer.
+ max_tasks_per_child: The maximum number of tasks a worker process can
+ complete before it will exit and be replaced with a fresh
+ worker process, to enable unused resources to be freed. The
+ default value is None, which means worker process will live
+ as long as the executor will live.
"""
_check_system_limits()

@@ -616,6 +649,13 @@ def __init__(self, max_workers=None, mp_context=None,
self._initializer = initializer
self._initargs = initargs

+ if max_tasks_per_child is not None:
+ if not isinstance(max_tasks_per_child, int):
+ raise TypeError("max_tasks_per_child must be an integer")
+ elif max_tasks_per_child <= 0:
+ raise ValueError("max_tasks_per_child must be >= 1")
+ self._max_tasks_per_child = max_tasks_per_child
+
# Management thread
self._executor_manager_thread = None

@@ -678,7 +718,8 @@ def _adjust_process_count(self):
args=(self._call_queue,
self._result_queue,
self._initializer,
- self._initargs))
+ self._initargs,
+ self._max_tasks_per_child))
p.start()
self._processes[p.pid] = p

diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py
index 84209ca2520b8..bbb6aa1eef81f 100644
--- a/Lib/test/test_concurrent_futures.py
+++ b/Lib/test/test_concurrent_futures.py
@@ -49,7 +49,6 @@ def create_future(state=PENDING, exception=None, result=None):

INITIALIZER_STATUS = 'uninitialized'

-
def mul(x, y):
return x * y

@@ -1038,6 +1037,36 @@ def test_idle_process_reuse_multiple(self):
self.assertLessEqual(len(executor._processes), 2)
executor.shutdown()

+ def test_max_tasks_per_child(self):
+ executor = self.executor_type(1, max_tasks_per_child=3)
+ f1 = executor.submit(os.getpid)
+ original_pid = f1.result()
+ # The worker pid remains the same as the worker could be reused
+ f2 = executor.submit(os.getpid)
+ self.assertEqual(f2.result(), original_pid)
+ self.assertEqual(len(executor._processes), 1)
+ f3 = executor.submit(os.getpid)
+ self.assertEqual(f3.result(), original_pid)
+
+ # A new worker is spawned, with a statistically different pid,
+ # while the previous was reaped.
+ f4 = executor.submit(os.getpid)
+ new_pid = f4.result()
+ self.assertNotEqual(original_pid, new_pid)
+ self.assertEqual(len(executor._processes), 1)
+
+ executor.shutdown()
+
+ def test_max_tasks_early_shutdown(self):
+ executor = self.executor_type(3, max_tasks_per_child=1)
+ futures = []
+ for i in range(6):
+ futures.append(executor.submit(mul, i, i))
+ executor.shutdown()
+ for i, future in enumerate(futures):
+ self.assertEqual(future.result(), mul(i, i))
+
+
create_executor_tests(ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
ProcessPoolForkserverMixin,
diff --git a/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst b/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst
new file mode 100644
index 0000000000000..666b5f7d0a035
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2021-07-26-13-33-37.bpo-44733.88LrP1.rst
@@ -0,0 +1,3 @@
+Add ``max_tasks_per_child`` to :class:`concurrent.futures.ProcessPoolExecutor`.
+This allows users to specify the maximum number of tasks a single process
+should execute before the process needs to be restarted.

_______________________________________________
Python-checkins mailing list
Python-checkins@python.org
https://mail.python.org/mailman/listinfo/python-checkins