diff --git a/src/listener.c b/src/listener.c index 117b9c43f..b4e0bc20b 100644 --- a/src/listener.c +++ b/src/listener.c @@ -233,32 +233,30 @@ int resume_listener(struct listener *l) /* Marks a ready listener as full so that the stream code tries to re-enable * it upon next close() using resume_listener(). - * - * Note: this function is only called from listener_accept so is already - * locked. */ static void listener_full(struct listener *l) { + HA_SPIN_LOCK(LISTENER_LOCK, &l->lock); if (l->state >= LI_READY) { if (l->state == LI_LIMITED) { HA_SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock); LIST_DEL(&l->wait_queue); HA_SPIN_UNLOCK(LISTENER_QUEUE_LOCK, &lq_lock); } - - fd_stop_recv(l->fd); - l->state = LI_FULL; + if (l->state != LI_FULL) { + fd_stop_recv(l->fd); + l->state = LI_FULL; + } } + HA_SPIN_UNLOCK(LISTENER_LOCK, &l->lock); } /* Marks a ready listener as limited so that we only try to re-enable it when * resources are free again. It will be queued into the specified queue. - * - * Note: this function is only called from listener_accept so is already - * locked. */ static void limit_listener(struct listener *l, struct list *list) { + HA_SPIN_LOCK(LISTENER_LOCK, &l->lock); if (l->state == LI_READY) { HA_SPIN_LOCK(LISTENER_QUEUE_LOCK, &lq_lock); LIST_ADDQ(list, &l->wait_queue); @@ -266,6 +264,7 @@ static void limit_listener(struct listener *l, struct list *list) fd_stop_recv(l->fd); l->state = LI_LIMITED; } + HA_SPIN_UNLOCK(LISTENER_LOCK, &l->lock); } /* This function adds all of the protocol's listener's file descriptors to the @@ -429,15 +428,14 @@ int create_listeners(struct bind_conf *bc, const struct sockaddr_storage *ss, */ void delete_listener(struct listener *listener) { - if (listener->state != LI_ASSIGNED) - return; - HA_SPIN_LOCK(LISTENER_LOCK, &listener->lock); - listener->state = LI_INIT; - LIST_DEL(&listener->proto_list); - listener->proto->nb_listeners--; - HA_ATOMIC_SUB(&jobs, 1); - HA_ATOMIC_SUB(&listeners, 1); + if (listener->state == LI_ASSIGNED) { + listener->state = LI_INIT; + LIST_DEL(&listener->proto_list); + listener->proto->nb_listeners--; + HA_ATOMIC_SUB(&jobs, 1); + HA_ATOMIC_SUB(&listeners, 1); + } HA_SPIN_UNLOCK(LISTENER_LOCK, &listener->lock); } @@ -450,6 +448,7 @@ void listener_accept(int fd) struct listener *l = fdtab[fd].owner; struct proxy *p; int max_accept; + int next_conn = 0; int expire; int cfd; int ret; @@ -461,8 +460,6 @@ void listener_accept(int fd) return; p = l->bind_conf->frontend; max_accept = l->maxaccept ? l->maxaccept : 1; - if (HA_SPIN_TRYLOCK(LISTENER_LOCK, &l->lock)) - return; if (!(l->options & LI_O_UNLIMITED) && global.sps_lim) { int max = freq_ctr_remain(&global.sess_per_sec, global.sps_lim, 0); @@ -522,11 +519,29 @@ void listener_accept(int fd) * worst case. If we fail due to system limits or temporary resource * shortage, we try again 100ms later in the worst case. */ - while (l->nbconn < l->maxconn && max_accept--) { + for (; max_accept-- > 0; next_conn = 0) { struct sockaddr_storage addr; socklen_t laddr = sizeof(addr); unsigned int count; + /* pre-increase the number of connections without going too far */ + do { + count = l->nbconn; + if (count >= l->maxconn) { + /* the listener was marked full or another + * thread is going to do it. + */ + next_conn = 0; + goto end; + } + next_conn = count + 1; + } while (!HA_ATOMIC_CAS(&l->nbconn, &count, next_conn)); + + if (next_conn == l->maxconn) { + /* we filled it, mark it full */ + listener_full(l); + } + if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) { limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ @@ -578,6 +593,7 @@ void listener_accept(int fd) goto transient_error; case EINTR: case ECONNABORTED: + HA_ATOMIC_SUB(&l->nbconn, 1); continue; case ENFILE: if (p) @@ -608,26 +624,34 @@ void listener_accept(int fd) if (unlikely(master == 1)) fcntl(cfd, F_SETFD, FD_CLOEXEC); - if (unlikely(cfd >= global.maxsock)) { - send_log(p, LOG_EMERG, - "Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n", - p->id); - close(cfd); - limit_listener(l, &global_listener_queue); - task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ - goto end; - } + /* The connection was accepted, it must be counted as such */ + if (l->counters) + HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn); - /* increase the per-process number of cumulated connections */ if (!(l->options & LI_O_UNLIMITED)) { count = update_freq_ctr(&global.conn_per_sec, 1); HA_ATOMIC_UPDATE_MAX(&global.cps_max, count); HA_ATOMIC_ADD(&actconn, 1); } - count = HA_ATOMIC_ADD(&l->nbconn, 1); - if (l->counters) - HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, count); + if (unlikely(cfd >= global.maxsock)) { + send_log(p, LOG_EMERG, + "Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n", + p->id); + if (!(l->options & LI_O_UNLIMITED)) + HA_ATOMIC_SUB(&actconn, 1); + close(cfd); + limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ + goto end; + } + + /* past this point, l->accept() will automatically decrement + * l->nbconn and actconn once done. Setting next_conn=0 allows + * the error path not to rollback on nbconn. It's more convenient + * than duplicating all exit labels. + */ + next_conn = 0; ret = l->accept(l, cfd, &addr); if (unlikely(ret <= 0)) { @@ -642,7 +666,9 @@ void listener_accept(int fd) goto transient_error; } - /* increase the per-process number of cumulated connections */ + /* increase the per-process number of cumulated sessions, this + * may only be done once l->accept() has accepted the connection. + */ if (!(l->options & LI_O_UNLIMITED)) { count = update_freq_ctr(&global.sess_per_sec, 1); HA_ATOMIC_UPDATE_MAX(&global.sps_max, count); @@ -654,7 +680,7 @@ void listener_accept(int fd) } #endif - } /* end of while (max_accept--) */ + } /* end of for (max_accept--) */ /* we've exhausted max_accept, so there is no need to poll again */ stop: @@ -669,10 +695,21 @@ void listener_accept(int fd) limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_first(expire, global_listener_queue_task->expire)); end: - if (l->nbconn >= l->maxconn) - listener_full(l); + if (next_conn) + HA_ATOMIC_SUB(&l->nbconn, 1); - HA_SPIN_UNLOCK(LISTENER_LOCK, &l->lock); + if (l->nbconn < l->maxconn && l->state == LI_FULL) { + /* at least one thread has to this when quitting */ + resume_listener(l); + + /* Dequeues all of the listeners waiting for a resource */ + if (!LIST_ISEMPTY(&global_listener_queue)) + dequeue_all_listeners(&global_listener_queue); + + if (!LIST_ISEMPTY(&p->listener_queue) && + (!p->fe_sps_lim || freq_ctr_remain(&p->fe_sess_per_sec, p->fe_sps_lim, 0) > 0)) + dequeue_all_listeners(&p->listener_queue); + } } /* Notify the listener that a connection initiated from it was released. This