diff --git a/include/haproxy/server-t.h b/include/haproxy/server-t.h index 7fc7b1c9d..1c46de821 100644 --- a/include/haproxy/server-t.h +++ b/include/haproxy/server-t.h @@ -361,6 +361,7 @@ struct server { struct queue queue; /* pending connections */ struct mt_list sess_conns; /* list of private conns managed by a session on this server */ + unsigned int dequeuing; /* non-zero = dequeuing in progress (atomic) */ /* Element below are usd by LB algorithms and must be doable in * parallel to other threads reusing connections above. diff --git a/src/queue.c b/src/queue.c index bf32d4239..a1bc46c57 100644 --- a/src/queue.c +++ b/src/queue.c @@ -379,12 +379,20 @@ void process_srv_queue(struct server *s) * However we still re-enter the loop for one pass if there's no * more served, otherwise we could end up with no other thread * trying to dequeue them. + * + * There's one racy part: we don't want to have more than one thread + * in charge of dequeuing, hence the dequeung flag. We cannot rely + * on a trylock here because it would compete against pendconn_add() + * and would occasionally leave entries in the queue that are never + * dequeued. Nobody else uses the dequeuing flag so when seeing it + * non-null, we're certain that another thread is waiting on it. */ while (!stop && (done < global.tune.maxpollevents || !s->served) && s->served < (maxconn = srv_dynamic_maxconn(s))) { - if (HA_SPIN_TRYLOCK(QUEUE_LOCK, &s->queue.lock) != 0) + if (HA_ATOMIC_XCHG(&s->dequeuing, 1)) break; + HA_SPIN_LOCK(QUEUE_LOCK, &s->queue.lock); while (s->served < maxconn) { stop = !pendconn_process_next_strm(s, p, px_ok); if (stop) @@ -394,6 +402,7 @@ void process_srv_queue(struct server *s) if (done >= global.tune.maxpollevents) break; } + HA_ATOMIC_STORE(&s->dequeuing, 0); HA_SPIN_UNLOCK(QUEUE_LOCK, &s->queue.lock); }