diff --git a/src/listener.c b/src/listener.c index 8512ed054..3ae20e247 100644 --- a/src/listener.c +++ b/src/listener.c @@ -578,6 +578,8 @@ void listener_accept(int fd) struct proxy *p; int max_accept; int next_conn = 0; + int next_feconn = 0; + int next_actconn = 0; int expire; int cfd; int ret; @@ -648,12 +650,15 @@ void listener_accept(int fd) * worst case. If we fail due to system limits or temporary resource * shortage, we try again 100ms later in the worst case. */ - for (; max_accept-- > 0; next_conn = 0) { + for (; max_accept-- > 0; next_conn = next_feconn = next_actconn = 0) { struct sockaddr_storage addr; socklen_t laddr = sizeof(addr); unsigned int count; - /* pre-increase the number of connections without going too far */ + /* pre-increase the number of connections without going too far. + * We process the listener, then the proxy, then the process. + * We know which ones to unroll based on the next_xxx value. + */ do { count = l->nbconn; if (count >= l->maxconn) { @@ -671,15 +676,42 @@ void listener_accept(int fd) listener_full(l); } - if (unlikely(actconn >= global.maxconn) && !(l->options & LI_O_UNLIMITED)) { - limit_listener(l, &global_listener_queue); - task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ - goto end; + if (p) { + do { + count = p->feconn; + if (count >= p->maxconn) { + /* the frontend was marked full or another + * thread is going to do it. + */ + next_feconn = 0; + goto end; + } + next_feconn = count + 1; + } while (!HA_ATOMIC_CAS(&p->feconn, &count, next_feconn)); + + if (unlikely(next_feconn == p->maxconn)) { + /* we just filled it */ + limit_listener(l, &p->listener_queue); + } } - if (unlikely(p && p->feconn >= p->maxconn)) { - limit_listener(l, &p->listener_queue); - goto end; + if (!(l->options & LI_O_UNLIMITED)) { + do { + count = actconn; + if (count >= global.maxconn) { + /* the process was marked full or another + * thread is going to do it. + */ + next_actconn = 0; + goto end; + } + next_actconn = count + 1; + } while (!HA_ATOMIC_CAS(&actconn, &count, next_actconn)); + + if (unlikely(next_actconn == global.maxconn)) { + limit_listener(l, &global_listener_queue); + task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ + } } /* with sockpair@ we don't want to do an accept */ @@ -723,6 +755,10 @@ void listener_accept(int fd) case EINTR: case ECONNABORTED: HA_ATOMIC_SUB(&l->nbconn, 1); + if (p) + HA_ATOMIC_SUB(&p->feconn, 1); + if (!(l->options & LI_O_UNLIMITED)) + HA_ATOMIC_SUB(&actconn, 1); continue; case ENFILE: if (p) @@ -757,18 +793,20 @@ void listener_accept(int fd) if (l->counters) HA_ATOMIC_UPDATE_MAX(&l->counters->conn_max, next_conn); + if (p) + HA_ATOMIC_UPDATE_MAX(&p->fe_counters.conn_max, next_feconn); + + proxy_inc_fe_conn_ctr(l, p); + if (!(l->options & LI_O_UNLIMITED)) { count = update_freq_ctr(&global.conn_per_sec, 1); HA_ATOMIC_UPDATE_MAX(&global.cps_max, count); - HA_ATOMIC_ADD(&actconn, 1); } if (unlikely(cfd >= global.maxsock)) { send_log(p, LOG_EMERG, "Proxy %s reached the configured maximum connection limit. Please check the global 'maxconn' value.\n", p->id); - if (!(l->options & LI_O_UNLIMITED)) - HA_ATOMIC_SUB(&actconn, 1); close(cfd); limit_listener(l, &global_listener_queue); task_schedule(global_listener_queue_task, tick_add(now_ms, 1000)); /* try again in 1 second */ @@ -776,11 +814,13 @@ void listener_accept(int fd) } /* past this point, l->accept() will automatically decrement - * l->nbconn and actconn once done. Setting next_conn=0 allows - * the error path not to rollback on nbconn. It's more convenient - * than duplicating all exit labels. + * l->nbconn, feconn and actconn once done. Setting next_*conn=0 + * allows the error path not to rollback on nbconn. It's more + * convenient than duplicating all exit labels. */ next_conn = 0; + next_feconn = 0; + next_actconn = 0; #if defined(USE_THREAD) count = l->bind_conf->thr_count; @@ -875,7 +915,14 @@ void listener_accept(int fd) if (next_conn) HA_ATOMIC_SUB(&l->nbconn, 1); - if (l->nbconn < l->maxconn && l->state == LI_FULL) { + if (p && next_feconn) + HA_ATOMIC_SUB(&p->feconn, 1); + + if (next_actconn) + HA_ATOMIC_SUB(&actconn, 1); + + if ((l->state == LI_FULL && l->nbconn < l->maxconn) || + (l->state == LI_LIMITED && ((!p || p->feconn < p->maxconn) && (actconn < global.maxconn)))) { /* at least one thread has to this when quitting */ resume_listener(l); @@ -899,9 +946,12 @@ void listener_release(struct listener *l) if (!(l->options & LI_O_UNLIMITED)) HA_ATOMIC_SUB(&actconn, 1); + if (fe) + HA_ATOMIC_SUB(&fe->feconn, 1); HA_ATOMIC_SUB(&l->nbconn, 1); HA_ATOMIC_SUB(&l->thr_conn[tid], 1); - if (l->state == LI_FULL) + + if (l->state == LI_FULL || l->state == LI_LIMITED) resume_listener(l); /* Dequeues all of the listeners waiting for a resource */ diff --git a/src/session.c b/src/session.c index 2732dbca8..58317b897 100644 --- a/src/session.c +++ b/src/session.c @@ -55,10 +55,6 @@ struct session *session_new(struct proxy *fe, struct listener *li, enum obj_type vars_init(&sess->vars, SCOPE_SESS); sess->task = NULL; sess->t_handshake = -1; /* handshake not done yet */ - HA_ATOMIC_UPDATE_MAX(&fe->fe_counters.conn_max, - HA_ATOMIC_ADD(&fe->feconn, 1)); - if (li) - proxy_inc_fe_conn_ctr(li, fe); HA_ATOMIC_ADD(&totalconn, 1); HA_ATOMIC_ADD(&jobs, 1); LIST_INIT(&sess->srv_list); @@ -72,7 +68,6 @@ void session_free(struct session *sess) struct connection *conn, *conn_back; struct sess_srv_list *srv_list, *srv_list_back; - HA_ATOMIC_SUB(&sess->fe->feconn, 1); if (sess->listener) listener_release(sess->listener); session_store_counters(sess);