Mailing List Archive

gh-110693: Pending Calls Machinery Cleanups (gh-118296)
https://github.com/python/cpython/commit/09c29475813ff2a763931fc0b45aaaef57cd2ac7
commit: 09c29475813ff2a763931fc0b45aaaef57cd2ac7
branch: main
author: Eric Snow <ericsnowcurrently@gmail.com>
committer: ericsnowcurrently <ericsnowcurrently@gmail.com>
date: 2024-04-26T01:05:51Z
summary:

gh-110693: Pending Calls Machinery Cleanups (gh-118296)

This does some cleanup in preparation for later changes.

files:
M Include/internal/pycore_ceval.h
M Include/internal/pycore_ceval_state.h
M Include/internal/pycore_runtime_init.h
M Lib/test/test_capi/test_misc.py
M Modules/_testcapimodule.c
M Modules/_testinternalcapi.c
M Python/ceval_gil.c

diff --git a/Include/internal/pycore_ceval.h b/Include/internal/pycore_ceval.h
index 8d88b5c1d15cb8..cfb88c3f4c8e15 100644
--- a/Include/internal/pycore_ceval.h
+++ b/Include/internal/pycore_ceval.h
@@ -48,8 +48,12 @@ extern void _PyEval_SignalReceived(void);
#define _Py_PENDING_MAINTHREADONLY 1
#define _Py_PENDING_RAWFREE 2

+typedef int _Py_add_pending_call_result;
+#define _Py_ADD_PENDING_SUCCESS 0
+#define _Py_ADD_PENDING_FULL -1
+
// Export for '_testinternalcapi' shared extension
-PyAPI_FUNC(int) _PyEval_AddPendingCall(
+PyAPI_FUNC(_Py_add_pending_call_result) _PyEval_AddPendingCall(
PyInterpreterState *interp,
_Py_pending_call_func func,
void *arg,
diff --git a/Include/internal/pycore_ceval_state.h b/Include/internal/pycore_ceval_state.h
index 168295534e036c..1831f58899b745 100644
--- a/Include/internal/pycore_ceval_state.h
+++ b/Include/internal/pycore_ceval_state.h
@@ -14,28 +14,56 @@ extern "C" {

typedef int (*_Py_pending_call_func)(void *);

+struct _pending_call {
+ _Py_pending_call_func func;
+ void *arg;
+ int flags;
+};
+
+#define PENDINGCALLSARRAYSIZE 32
+
+#define MAXPENDINGCALLS PENDINGCALLSARRAYSIZE
+/* For interpreter-level pending calls, we want to avoid spending too
+ much time on pending calls in any one thread, so we apply a limit. */
+#if MAXPENDINGCALLS > 100
+# define MAXPENDINGCALLSLOOP 100
+#else
+# define MAXPENDINGCALLSLOOP MAXPENDINGCALLS
+#endif
+
+#define MAXPENDINGCALLS_MAIN PENDINGCALLSARRAYSIZE
+/* For the main thread, we want to make sure all pending calls are
+ run at once, for the sake of prompt signal handling. This is
+ unlikely to cause any problems since there should be very few
+ pending calls for the main thread. */
+#define MAXPENDINGCALLSLOOP_MAIN 0
+
struct _pending_calls {
int busy;
PyMutex mutex;
/* Request for running pending calls. */
- int32_t calls_to_do;
-#define NPENDINGCALLS 32
- struct _pending_call {
- _Py_pending_call_func func;
- void *arg;
- int flags;
- } calls[NPENDINGCALLS];
+ int32_t npending;
+ /* The maximum allowed number of pending calls.
+ If the queue fills up to this point then _PyEval_AddPendingCall()
+ will return _Py_ADD_PENDING_FULL. */
+ int32_t max;
+ /* We don't want a flood of pending calls to interrupt any one thread
+ for too long, so we keep a limit on the number handled per pass.
+ A value of 0 means there is no limit (other than the maximum
+ size of the list of pending calls). */
+ int32_t maxloop;
+ struct _pending_call calls[PENDINGCALLSARRAYSIZE];
int first;
- int last;
+ int next;
};

+
typedef enum {
PERF_STATUS_FAILED = -1, // Perf trampoline is in an invalid state
PERF_STATUS_NO_INIT = 0, // Perf trampoline is not initialized
PERF_STATUS_OK = 1, // Perf trampoline is ready to be executed
} perf_status_t;

-
#ifdef PY_HAVE_PERF_TRAMPOLINE
struct code_arena_st;

@@ -48,6 +76,7 @@ struct trampoline_api_st {
};
#endif

+
struct _ceval_runtime_state {
struct {
#ifdef PY_HAVE_PERF_TRAMPOLINE
@@ -62,10 +91,15 @@ struct _ceval_runtime_state {
#endif
} perf;
/* Pending calls to be made only on the main thread. */
+ // The signal machinery falls back on this
+ // so it must be especially stable and efficient.
+ // For example, we use a preallocated array
+ // for the list of pending calls.
struct _pending_calls pending_mainthread;
PyMutex sys_trace_profile_mutex;
};

+
#ifdef PY_HAVE_PERF_TRAMPOLINE
# define _PyEval_RUNTIME_PERF_INIT \
{ \
diff --git a/Include/internal/pycore_runtime_init.h b/Include/internal/pycore_runtime_init.h
index 33c7a9dadfd2a1..41331df8320a9c 100644
--- a/Include/internal/pycore_runtime_init.h
+++ b/Include/internal/pycore_runtime_init.h
@@ -114,6 +114,10 @@ extern PyTypeObject _PyExc_MemoryError;
.autoTSSkey = Py_tss_NEEDS_INIT, \
.parser = _parser_runtime_state_INIT, \
.ceval = { \
+ .pending_mainthread = { \
+ .max = MAXPENDINGCALLS_MAIN, \
+ .maxloop = MAXPENDINGCALLSLOOP_MAIN, \
+ }, \
.perf = _PyEval_RUNTIME_PERF_INIT, \
}, \
.gilstate = { \
@@ -166,6 +170,10 @@ extern PyTypeObject _PyExc_MemoryError;
.imports = IMPORTS_INIT, \
.ceval = { \
.recursion_limit = Py_DEFAULT_RECURSION_LIMIT, \
+ .pending = { \
+ .max = MAXPENDINGCALLS, \
+ .maxloop = MAXPENDINGCALLSLOOP, \
+ }, \
}, \
.gc = { \
.enabled = 1, \
diff --git a/Lib/test/test_capi/test_misc.py b/Lib/test/test_capi/test_misc.py
index 0701eafb7c36e0..49d1056f050467 100644
--- a/Lib/test/test_capi/test_misc.py
+++ b/Lib/test/test_capi/test_misc.py
@@ -1172,6 +1172,12 @@ class MyType:
self.assertEqual(get_type_fullyqualname(MyType), 'my_qualname')


+ def test_gen_get_code(self):
+ def genf(): yield
+ gen = genf()
+ self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
+
+
@requires_limited_api
class TestHeapTypeRelative(unittest.TestCase):
"""Test API for extending opaque types (PEP 697)"""
@@ -1452,7 +1458,7 @@ class TestPendingCalls(unittest.TestCase):
# about when pending calls get run. This is especially relevant
# here for creating deterministic tests.

- def pendingcalls_submit(self, l, n):
+ def main_pendingcalls_submit(self, l, n):
def callback():
#this function can be interrupted by thread switching so let's
#use an atomic operation
@@ -1467,12 +1473,27 @@ def callback():
if _testcapi._pending_threadfunc(callback):
break

- def pendingcalls_wait(self, l, n, context = None):
+ def pendingcalls_submit(self, l, n, *, main=True, ensure=False):
+ def callback():
+ #this function can be interrupted by thread switching so let's
+ #use an atomic operation
+ l.append(None)
+
+ if main:
+ return _testcapi._pending_threadfunc(callback, n,
+ blocking=False,
+ ensure_added=ensure)
+ else:
+ return _testinternalcapi.pending_threadfunc(callback, n,
+ blocking=False,
+ ensure_added=ensure)
+
+ def pendingcalls_wait(self, l, numadded, context = None):
#now, stick around until l[0] has grown to 10
count = 0
- while len(l) != n:
+ while len(l) != numadded:
#this busy loop is where we expect to be interrupted to
- #run our callbacks. Note that callbacks are only run on the
+ #run our callbacks. Note that some callbacks are only run on the
#main thread
if False and support.verbose:
print("(%i)"%(len(l),),)
@@ -1482,12 +1503,12 @@ def pendingcalls_wait(self, l, n, context = None):
continue
count += 1
self.assertTrue(count < 10000,
- "timeout waiting for %i callbacks, got %i"%(n, len(l)))
+ "timeout waiting for %i callbacks, got %i"%(numadded, len(l)))
if False and support.verbose:
print("(%i)"%(len(l),))

@threading_helper.requires_working_threading()
- def test_pendingcalls_threaded(self):
+ def test_main_pendingcalls_threaded(self):

#do every callback on a separate thread
n = 32 #total callbacks
@@ -1501,15 +1522,15 @@ class foo(object):pass
context.lock = threading.Lock()
context.event = threading.Event()

- threads = [threading.Thread(target=self.pendingcalls_thread,
+ threads = [threading.Thread(target=self.main_pendingcalls_thread,
args=(context,))
for i in range(context.nThreads)]
with threading_helper.start_threads(threads):
self.pendingcalls_wait(context.l, n, context)

- def pendingcalls_thread(self, context):
+ def main_pendingcalls_thread(self, context):
try:
- self.pendingcalls_submit(context.l, context.n)
+ self.main_pendingcalls_submit(context.l, context.n)
finally:
with context.lock:
context.nFinished += 1
@@ -1519,20 +1540,54 @@ def pendingcalls_thread(self, context):
if nFinished == context.nThreads:
context.event.set()

- def test_pendingcalls_non_threaded(self):
+ def test_main_pendingcalls_non_threaded(self):
#again, just using the main thread, likely they will all be dispatched at
#once. It is ok to ask for too many, because we loop until we find a slot.
#the loop can be interrupted to dispatch.
#there are only 32 dispatch slots, so we go for twice that!
l = []
n = 64
- self.pendingcalls_submit(l, n)
+ self.main_pendingcalls_submit(l, n)
self.pendingcalls_wait(l, n)

- def test_gen_get_code(self):
- def genf(): yield
- gen = genf()
- self.assertEqual(_testcapi.gen_get_code(gen), gen.gi_code)
+ def test_max_pending(self):
+ with self.subTest('main-only'):
+ maxpending = 32
+
+ l = []
+ added = self.pendingcalls_submit(l, 1, main=True)
+ self.pendingcalls_wait(l, added)
+ self.assertEqual(added, 1)
+
+ l = []
+ added = self.pendingcalls_submit(l, maxpending, main=True)
+ self.pendingcalls_wait(l, added)
+ self.assertEqual(added, maxpending)
+
+ l = []
+ added = self.pendingcalls_submit(l, maxpending+1, main=True)
+ self.pendingcalls_wait(l, added)
+ self.assertEqual(added, maxpending)
+
+ with self.subTest('not main-only'):
+ # Per-interpreter pending calls has the same low limit
+ # on how many may be pending at a time.
+ maxpending = 32
+
+ l = []
+ added = self.pendingcalls_submit(l, 1, main=False)
+ self.pendingcalls_wait(l, added)
+ self.assertEqual(added, 1)
+
+ l = []
+ added = self.pendingcalls_submit(l, maxpending, main=False)
+ self.pendingcalls_wait(l, added)
+ self.assertEqual(added, maxpending)
+
+ l = []
+ added = self.pendingcalls_submit(l, maxpending+1, main=False)
+ self.pendingcalls_wait(l, added)
+ self.assertEqual(added, maxpending)

class PendingTask(types.SimpleNamespace):

diff --git a/Modules/_testcapimodule.c b/Modules/_testcapimodule.c
index 034a30fa47ed30..0bdd252efdabc7 100644
--- a/Modules/_testcapimodule.c
+++ b/Modules/_testcapimodule.c
@@ -819,25 +819,55 @@ static int _pending_callback(void *arg)
* run from any python thread.
*/
static PyObject *
-pending_threadfunc(PyObject *self, PyObject *arg)
+pending_threadfunc(PyObject *self, PyObject *arg, PyObject *kwargs)
{
+ static char *kwlist[] = {"callback", "num",
+ "blocking", "ensure_added", NULL};
PyObject *callable;
- int r;
- if (PyArg_ParseTuple(arg, "O", &callable) == 0)
+ unsigned int num = 1;
+ int blocking = 0;
+ int ensure_added = 0;
+ if (!PyArg_ParseTupleAndKeywords(arg, kwargs,
+ "O|I$pp:_pending_threadfunc", kwlist,
+ &callable, &num, &blocking, &ensure_added))
+ {
return NULL;
+ }

/* create the reference for the callbackwhile we hold the lock */
- Py_INCREF(callable);
+ for (unsigned int i = 0; i < num; i++) {
+ Py_INCREF(callable);
+ }

- Py_BEGIN_ALLOW_THREADS
- r = Py_AddPendingCall(&_pending_callback, callable);
- Py_END_ALLOW_THREADS
+ PyThreadState *save_tstate = NULL;
+ if (!blocking) {
+ save_tstate = PyEval_SaveThread();
+ }
+
+ unsigned int num_added = 0;
+ for (; num_added < num; num_added++) {
+ if (ensure_added) {
+ int r;
+ do {
+ r = Py_AddPendingCall(&_pending_callback, callable);
+ } while (r < 0);
+ }
+ else {
+ if (Py_AddPendingCall(&_pending_callback, callable) < 0) {
+ break;
+ }
+ }
+ }
+
+ if (!blocking) {
+ PyEval_RestoreThread(save_tstate);
+ }

- if (r<0) {
+ for (unsigned int i = num_added; i < num; i++) {
Py_DECREF(callable); /* unsuccessful add, destroy the extra reference */
- Py_RETURN_FALSE;
}
- Py_RETURN_TRUE;
+ /* The callable is decref'ed above in each added _pending_callback(). */
+ return PyLong_FromUnsignedLong((unsigned long)num_added);
}

/* Test PyOS_string_to_double. */
@@ -3232,7 +3262,8 @@ static PyMethodDef TestMethods[] = {
{"_spawn_pthread_waiter", spawn_pthread_waiter, METH_NOARGS},
{"_end_spawned_pthread", end_spawned_pthread, METH_NOARGS},
#endif
- {"_pending_threadfunc", pending_threadfunc, METH_VARARGS},
+ {"_pending_threadfunc", _PyCFunction_CAST(pending_threadfunc),
+ METH_VARARGS|METH_KEYWORDS},
#ifdef HAVE_GETTIMEOFDAY
{"profile_int", profile_int, METH_NOARGS},
#endif
diff --git a/Modules/_testinternalcapi.c b/Modules/_testinternalcapi.c
index cc9e1403f87ecd..b0bba3422a50a0 100644
--- a/Modules/_testinternalcapi.c
+++ b/Modules/_testinternalcapi.c
@@ -1062,37 +1062,56 @@ static PyObject *
pending_threadfunc(PyObject *self, PyObject *args, PyObject *kwargs)
{
PyObject *callable;
+ unsigned int num = 1;
+ int blocking = 0;
int ensure_added = 0;
- static char *kwlist[] = {"", "ensure_added", NULL};
+ static char *kwlist[] = {"callback", "num",
+ "blocking", "ensure_added", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs,
- "O|$p:pending_threadfunc", kwlist,
- &callable, &ensure_added))
+ "O|I$pp:pending_threadfunc", kwlist,
+ &callable, &num, &blocking, &ensure_added))
{
return NULL;
}
PyInterpreterState *interp = _PyInterpreterState_GET();

/* create the reference for the callbackwhile we hold the lock */
- Py_INCREF(callable);
+ for (unsigned int i = 0; i < num; i++) {
+ Py_INCREF(callable);
+ }

- int r;
- Py_BEGIN_ALLOW_THREADS
- r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
- Py_END_ALLOW_THREADS
- if (r < 0) {
- /* unsuccessful add */
- if (!ensure_added) {
- Py_DECREF(callable);
- Py_RETURN_FALSE;
+ PyThreadState *save_tstate = NULL;
+ if (!blocking) {
+ save_tstate = PyEval_SaveThread();
+ }
+
+ unsigned int num_added = 0;
+ for (; num_added < num; num_added++) {
+ if (ensure_added) {
+ _Py_add_pending_call_result r;
+ do {
+ r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
+ assert(r == _Py_ADD_PENDING_SUCCESS
+ || r == _Py_ADD_PENDING_FULL);
+ } while (r == _Py_ADD_PENDING_FULL);
+ }
+ else {
+ if (_PyEval_AddPendingCall(interp, &_pending_callback, callable, 0) < 0) {
+ break;
+ }
}
- do {
- Py_BEGIN_ALLOW_THREADS
- r = _PyEval_AddPendingCall(interp, &_pending_callback, callable, 0);
- Py_END_ALLOW_THREADS
- } while (r < 0);
}

- Py_RETURN_TRUE;
+ if (!blocking) {
+ PyEval_RestoreThread(save_tstate);
+ }
+
+ for (unsigned int i = num_added; i < num; i++) {
+ Py_DECREF(callable); /* unsuccessful add, destroy the extra reference */
+ }
+
+ /* The callable is decref'ed in _pending_callback() above. */
+ return PyLong_FromUnsignedLong((unsigned long)num_added);
}


@@ -1135,14 +1154,16 @@ pending_identify(PyObject *self, PyObject *args)
PyThread_acquire_lock(mutex, WAIT_LOCK);
/* It gets released in _pending_identify_callback(). */

- int r;
+ _Py_add_pending_call_result r;
do {
Py_BEGIN_ALLOW_THREADS
r = _PyEval_AddPendingCall(interp,
&_pending_identify_callback, (void *)mutex,
0);
Py_END_ALLOW_THREADS
- } while (r < 0);
+ assert(r == _Py_ADD_PENDING_SUCCESS
+ || r == _Py_ADD_PENDING_FULL);
+ } while (r == _Py_ADD_PENDING_FULL);

/* Wait for the pending call to complete. */
PyThread_acquire_lock(mutex, WAIT_LOCK);
diff --git a/Python/ceval_gil.c b/Python/ceval_gil.c
index c0819d8ab1d8d0..c3c2c54b199c59 100644
--- a/Python/ceval_gil.c
+++ b/Python/ceval_gil.c
@@ -84,15 +84,15 @@ update_eval_breaker_for_thread(PyInterpreterState *interp, PyThreadState *tstate
return;
#endif

- int32_t calls_to_do = _Py_atomic_load_int32_relaxed(
- &interp->ceval.pending.calls_to_do);
- if (calls_to_do) {
+ int32_t npending = _Py_atomic_load_int32_relaxed(
+ &interp->ceval.pending.npending);
+ if (npending) {
_Py_set_eval_breaker_bit(tstate, _PY_CALLS_TO_DO_BIT);
}
else if (_Py_IsMainThread()) {
- calls_to_do = _Py_atomic_load_int32_relaxed(
- &_PyRuntime.ceval.pending_mainthread.calls_to_do);
- if (calls_to_do) {
+ npending = _Py_atomic_load_int32_relaxed(
+ &_PyRuntime.ceval.pending_mainthread.npending);
+ if (npending) {
_Py_set_eval_breaker_bit(tstate, _PY_CALLS_TO_DO_BIT);
}
}
@@ -624,6 +624,34 @@ PyEval_RestoreThread(PyThreadState *tstate)
}


+void
+_PyEval_SignalReceived(void)
+{
+ _Py_set_eval_breaker_bit(_PyRuntime.main_tstate, _PY_SIGNALS_PENDING_BIT);
+}
+
+
+#ifndef Py_GIL_DISABLED
+static void
+signal_active_thread(PyInterpreterState *interp, uintptr_t bit)
+{
+ struct _gil_runtime_state *gil = interp->ceval.gil;
+
+ // If a thread from the targeted interpreter is holding the GIL, signal
+ // that thread. Otherwise, the next thread to run from the targeted
+ // interpreter will have its bit set as part of taking the GIL.
+ MUTEX_LOCK(gil->mutex);
+ if (_Py_atomic_load_int_relaxed(&gil->locked)) {
+ PyThreadState *holder = (PyThreadState*)_Py_atomic_load_ptr_relaxed(&gil->last_holder);
+ if (holder->interp == interp) {
+ _Py_set_eval_breaker_bit(holder, bit);
+ }
+ }
+ MUTEX_UNLOCK(gil->mutex);
+}
+#endif
+
+
/* Mechanism whereby asynchronously executing callbacks (e.g. UNIX
signal handlers or Mac I/O completion routines) can schedule calls
to a function to be called synchronously.
@@ -646,29 +674,31 @@ PyEval_RestoreThread(PyThreadState *tstate)
threadstate.
*/

-void
-_PyEval_SignalReceived(void)
-{
- _Py_set_eval_breaker_bit(_PyRuntime.main_tstate, _PY_SIGNALS_PENDING_BIT);
-}
-
/* Push one item onto the queue while holding the lock. */
static int
_push_pending_call(struct _pending_calls *pending,
_Py_pending_call_func func, void *arg, int flags)
{
- int i = pending->last;
- int j = (i + 1) % NPENDINGCALLS;
- if (j == pending->first) {
- return -1; /* Queue full */
+ if (pending->npending == pending->max) {
+ return _Py_ADD_PENDING_FULL;
}
+ assert(pending->npending < pending->max);
+
+ int i = pending->next;
+ assert(pending->calls[i].func == NULL);
+
pending->calls[i].func = func;
pending->calls[i].arg = arg;
pending->calls[i].flags = flags;
- pending->last = j;
- assert(pending->calls_to_do < NPENDINGCALLS);
- _Py_atomic_add_int32(&pending->calls_to_do, 1);
- return 0;
+
+ assert(pending->npending < PENDINGCALLSARRAYSIZE);
+ _Py_atomic_add_int32(&pending->npending, 1);
+
+ pending->next = (i + 1) % PENDINGCALLSARRAYSIZE;
+ assert(pending->next != pending->first
+ || pending->npending == pending->max);
+
+ return _Py_ADD_PENDING_SUCCESS;
}

static int
@@ -676,8 +706,9 @@ _next_pending_call(struct _pending_calls *pending,
int (**func)(void *), void **arg, int *flags)
{
int i = pending->first;
- if (i == pending->last) {
+ if (pending->npending == 0) {
/* Queue empty */
+ assert(i == pending->next);
assert(pending->calls[i].func == NULL);
return -1;
}
@@ -695,38 +726,18 @@ _pop_pending_call(struct _pending_calls *pending,
int i = _next_pending_call(pending, func, arg, flags);
if (i >= 0) {
pending->calls[i] = (struct _pending_call){0};
- pending->first = (i + 1) % NPENDINGCALLS;
- assert(pending->calls_to_do > 0);
- _Py_atomic_add_int32(&pending->calls_to_do, -1);
+ pending->first = (i + 1) % PENDINGCALLSARRAYSIZE;
+ assert(pending->npending > 0);
+ _Py_atomic_add_int32(&pending->npending, -1);
}
}

-#ifndef Py_GIL_DISABLED
-static void
-signal_active_thread(PyInterpreterState *interp, uintptr_t bit)
-{
- struct _gil_runtime_state *gil = interp->ceval.gil;
-
- // If a thread from the targeted interpreter is holding the GIL, signal
- // that thread. Otherwise, the next thread to run from the targeted
- // interpreter will have its bit set as part of taking the GIL.
- MUTEX_LOCK(gil->mutex);
- if (_Py_atomic_load_int_relaxed(&gil->locked)) {
- PyThreadState *holder = (PyThreadState*)_Py_atomic_load_ptr_relaxed(&gil->last_holder);
- if (holder->interp == interp) {
- _Py_set_eval_breaker_bit(holder, bit);
- }
- }
- MUTEX_UNLOCK(gil->mutex);
-}
-#endif
-
/* This implementation is thread-safe. It allows
scheduling to be made from any thread, and even from an executing
callback.
*/

-int
+_Py_add_pending_call_result
_PyEval_AddPendingCall(PyInterpreterState *interp,
_Py_pending_call_func func, void *arg, int flags)
{
@@ -739,7 +750,8 @@ _PyEval_AddPendingCall(PyInterpreterState *interp,
}

PyMutex_Lock(&pending->mutex);
- int result = _push_pending_call(pending, func, arg, flags);
+ _Py_add_pending_call_result result =
+ _push_pending_call(pending, func, arg, flags);
PyMutex_Unlock(&pending->mutex);

if (main_only) {
@@ -762,7 +774,15 @@ Py_AddPendingCall(_Py_pending_call_func func, void *arg)
/* Legacy users of this API will continue to target the main thread
(of the main interpreter). */
PyInterpreterState *interp = _PyInterpreterState_Main();
- return _PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_MAINTHREADONLY);
+ _Py_add_pending_call_result r =
+ _PyEval_AddPendingCall(interp, func, arg, _Py_PENDING_MAINTHREADONLY);
+ if (r == _Py_ADD_PENDING_FULL) {
+ return -1;
+ }
+ else {
+ assert(r == _Py_ADD_PENDING_SUCCESS);
+ return 0;
+ }
}

static int
@@ -782,10 +802,21 @@ handle_signals(PyThreadState *tstate)
}

static int
-_make_pending_calls(struct _pending_calls *pending)
+_make_pending_calls(struct _pending_calls *pending, int32_t *p_npending)
{
+ int res = 0;
+ int32_t npending = -1;
+
+ assert(sizeof(pending->max) <= sizeof(size_t)
+ && ((size_t)pending->max) <= Py_ARRAY_LENGTH(pending->calls));
+ int32_t maxloop = pending->maxloop;
+ if (maxloop == 0) {
+ maxloop = pending->max;
+ }
+ assert(maxloop > 0 && maxloop <= pending->max);
+
/* perform a bounded number of calls, in case of recursion */
- for (int i=0; i<NPENDINGCALLS; i++) {
+ for (int i=0; i<maxloop; i++) {
_Py_pending_call_func func = NULL;
void *arg = NULL;
int flags = 0;
@@ -793,21 +824,29 @@ _make_pending_calls(struct _pending_calls *pending)
/* pop one item off the queue while holding the lock */
PyMutex_Lock(&pending->mutex);
_pop_pending_call(pending, &func, &arg, &flags);
+ npending = pending->npending;
PyMutex_Unlock(&pending->mutex);

- /* having released the lock, perform the callback */
+ /* Check if there are any more pending calls. */
if (func == NULL) {
+ assert(npending == 0);
break;
}
- int res = func(arg);
+
+ /* having released the lock, perform the callback */
+ res = func(arg);
if ((flags & _Py_PENDING_RAWFREE) && arg != NULL) {
PyMem_RawFree(arg);
}
if (res != 0) {
- return -1;
+ res = -1;
+ goto finally;
}
}
- return 0;
+
+finally:
+ *p_npending = npending;
+ return res;
}

static void
@@ -861,26 +900,36 @@ make_pending_calls(PyThreadState *tstate)
added in-between re-signals */
unsignal_pending_calls(tstate, interp);

- if (_make_pending_calls(pending) != 0) {
+ int32_t npending;
+ if (_make_pending_calls(pending, &npending) != 0) {
pending->busy = 0;
/* There might not be more calls to make, but we play it safe. */
signal_pending_calls(tstate, interp);
return -1;
}
+ if (npending > 0) {
+ /* We hit pending->maxloop. */
+ signal_pending_calls(tstate, interp);
+ }

if (_Py_IsMainThread() && _Py_IsMainInterpreter(interp)) {
- if (_make_pending_calls(pending_main) != 0) {
+ if (_make_pending_calls(pending_main, &npending) != 0) {
pending->busy = 0;
/* There might not be more calls to make, but we play it safe. */
signal_pending_calls(tstate, interp);
return -1;
}
+ if (npending > 0) {
+ /* We hit pending_main->maxloop. */
+ signal_pending_calls(tstate, interp);
+ }
}

pending->busy = 0;
return 0;
}

+
void
_Py_set_eval_breaker_bit_all(PyInterpreterState *interp, uintptr_t bit)
{

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-leave@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: list-python-checkins@lists.gossamer-threads.com