diff --git a/include/common/hathreads.h b/include/common/hathreads.h index 30009cc82..2620b7791 100644 --- a/include/common/hathreads.h +++ b/include/common/hathreads.h @@ -292,6 +292,7 @@ enum lock_label { PIPES_LOCK, START_LOCK, TLSKEYS_REF_LOCK, + PENDCONN_LOCK, LOCK_LABELS }; struct lock_stat { @@ -409,6 +410,7 @@ static inline const char *lock_label(enum lock_label label) case PIPES_LOCK: return "PIPES"; case START_LOCK: return "START"; case TLSKEYS_REF_LOCK: return "TLSKEYS_REF"; + case PENDCONN_LOCK: return "PENDCONN"; case LOCK_LABELS: break; /* keep compiler happy */ }; /* only way to come here is consecutive to an internal bug */ diff --git a/include/proto/queue.h b/include/proto/queue.h index f66d809f1..2d4773a09 100644 --- a/include/proto/queue.h +++ b/include/proto/queue.h @@ -38,6 +38,7 @@ extern struct pool_head *pool_head_pendconn; int init_pendconn(); struct pendconn *pendconn_add(struct stream *strm); +int pendconn_dequeue(struct stream *strm); void pendconn_free(struct pendconn *p); void process_srv_queue(struct server *s); unsigned int srv_dynamic_maxconn(const struct server *s); diff --git a/include/types/queue.h b/include/types/queue.h index 4b3545140..42dbbd047 100644 --- a/include/types/queue.h +++ b/include/types/queue.h @@ -24,15 +24,19 @@ #include #include +#include #include struct stream; struct pendconn { - struct list list; /* chaining ... */ - struct stream *strm; /* the stream waiting for a connection */ - struct server *srv; /* the server we are waiting for */ + int strm_flags; /* stream flags */ + struct stream *strm; + struct proxy *px; + struct server *srv; /* the server we are waiting for, may be NULL */ + struct list list; /* next pendconn */ + __decl_hathreads(HA_SPINLOCK_T lock); }; #endif /* _TYPES_QUEUE_H */ diff --git a/include/types/stream.h b/include/types/stream.h index 227b0ffba..0dbc79f44 100644 --- a/include/types/stream.h +++ b/include/types/stream.h @@ -124,7 +124,7 @@ struct stream { struct session *sess; /* the session this stream is attached to */ struct server *srv_conn; /* stream already has a slot on a server and is not in queue */ - struct pendconn *pend_pos; /* if not NULL, points to the position in the pending queue */ + struct pendconn *pend_pos; /* if not NULL, points to the pending position in the pending queue */ struct http_txn *txn; /* current HTTP transaction being processed. Should become a list. */ diff --git a/src/proto_http.c b/src/proto_http.c index ae582b3de..80e001d69 100644 --- a/src/proto_http.c +++ b/src/proto_http.c @@ -8253,8 +8253,6 @@ void http_reset_txn(struct stream *s) s->store_count = 0; s->uniq_id = global.req_count++; - s->pend_pos = NULL; - s->req.flags |= CF_READ_DONTWAIT; /* one read is usually enough */ /* We must trim any excess data from the response buffer, because we diff --git a/src/queue.c b/src/queue.c index 1dea7d53b..aa40fba72 100644 --- a/src/queue.c +++ b/src/queue.c @@ -24,8 +24,6 @@ struct pool_head *pool_head_pendconn; -static void __pendconn_free(struct pendconn *p); - /* perform minimal intializations, report 0 in case of error, 1 if OK. */ int init_pendconn() { @@ -63,78 +61,99 @@ unsigned int srv_dynamic_maxconn(const struct server *s) return max; } - -/* Returns the first pending connection for server , which may be NULL if - * nothing is pending. +/* Remove the pendconn from the server/proxy queue. At this stage, the + * connection is not really dequeued. It will be done during the + * process_stream. This function must be called by function owning the locks on + * the pendconn _AND_ the server/proxy. It also decreases the pending count. + * + * The caller must own the lock on the pendconn _AND_ the queue containing the + * pendconn. The pendconn must still be queued. */ -static inline struct pendconn *pendconn_from_srv(const struct server *s) { - if (!s->nbpend) - return NULL; - return LIST_ELEM(s->pendconns.n, struct pendconn *, list); -} - -/* Returns the first pending connection for proxy , which may be NULL if - * nothing is pending. - */ -static inline struct pendconn *pendconn_from_px(const struct proxy *px) { - if (!px->nbpend) - return NULL; - - return LIST_ELEM(px->pendconns.n, struct pendconn *, list); -} - - -/* Detaches the next pending connection from either a server or a proxy, and - * returns its associated stream. If no pending connection is found, NULL is - * returned. Note that neither nor may be NULL. - * Priority is given to the oldest request in the queue if both and - * have pending requests. This ensures that no request will be left unserved. - * The queue is not considered if the server (or a tracked server) is not - * RUNNING, is disabled, or has a null weight (server going down). The - * queue is still considered in this case, because if some connections remain - * there, it means that some requests have been forced there after it was seen - * down (eg: due to option persist). - * The stream is immediately marked as "assigned", and both its and - * are set to , - */ -static struct stream *pendconn_get_next_strm(struct server *srv, struct proxy *px) +static void pendconn_unlink(struct pendconn *p) { - struct pendconn *ps, *pp; - struct stream *strm; - struct server *rsrv; + if (p->srv) + p->srv->nbpend--; + else + p->px->nbpend--; + HA_ATOMIC_SUB(&p->px->totpend, 1); + LIST_DEL(&p->list); + LIST_INIT(&p->list); +} + +/* Process the next pending connection from either a server or a proxy, and + * returns 0 on success. If no pending connection is found, 1 is returned. + * Note that neither nor may be NULL. Priority is given to the + * oldest request in the queue if both and have pending + * requests. This ensures that no request will be left unserved. The queue + * is not considered if the server (or a tracked server) is not RUNNING, is + * disabled, or has a null weight (server going down). The queue is still + * considered in this case, because if some connections remain there, it means + * that some requests have been forced there after it was seen down (eg: due to + * option persist). The stream is immediately marked as "assigned", and both + * its and are set to . + * + * This function must only be called if the server queue _AND_ the proxy queue + * are locked. Today it is only called by process_srv_queue. + */ +static int pendconn_process_next_strm(struct server *srv, struct proxy *px) +{ + struct pendconn *p = NULL; + struct server *rsrv; rsrv = srv->track; if (!rsrv) rsrv = srv; - ps = pendconn_from_srv(srv); - pp = pendconn_from_px(px); - /* we want to get the definitive pendconn in */ - if (!pp || !srv_currently_usable(rsrv)) { - if (!ps) - return NULL; - } else { - /* pendconn exists in the proxy queue */ - if (!ps || tv_islt(&pp->strm->logs.tv_request, &ps->strm->logs.tv_request)) - ps = pp; + if (srv->nbpend) { + list_for_each_entry(p, &srv->pendconns, list) { + if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock)) + goto ps_found; + } + p = NULL; } - strm = ps->strm; - __pendconn_free(ps); - /* we want to note that the stream has now been assigned a server */ - strm->flags |= SF_ASSIGNED; - strm->target = &srv->obj_type; - __stream_add_srv_conn(strm, srv); + ps_found: + if (srv_currently_usable(rsrv) && px->nbpend) { + struct pendconn *pp; + + list_for_each_entry(pp, &px->pendconns, list) { + /* If the server pendconn is older than the proxy one, + * we process the server one. */ + if (p && !tv_islt(&pp->strm->logs.tv_request, &p->strm->logs.tv_request)) + goto pendconn_found; + + if (!HA_SPIN_TRYLOCK(PENDCONN_LOCK, &pp->lock)) { + /* Let's switch from the server pendconn to the + * proxy pendconn. Don't forget to unlock the + * server pendconn, if any. */ + if (p) + HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); + p = pp; + goto pendconn_found; + } + } + } + + if (!p) + return 1; + + pendconn_found: + pendconn_unlink(p); + p->strm_flags |= SF_ASSIGNED; + p->srv = srv; + HA_ATOMIC_ADD(&srv->served, 1); HA_ATOMIC_ADD(&srv->proxy->served, 1); if (px->lbprm.server_take_conn) px->lbprm.server_take_conn(srv); + __stream_add_srv_conn(p->strm, srv); - return strm; + task_wakeup(p->strm->task, TASK_WOKEN_RES); + HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); + return 0; } -/* - * Manages a server's connection queue. This function will try to dequeue as +/* Manages a server's connection queue. This function will try to dequeue as * many pending streams as possible, and wake them up. */ void process_srv_queue(struct server *s) @@ -144,17 +163,10 @@ void process_srv_queue(struct server *s) HA_SPIN_LOCK(PROXY_LOCK, &p->lock); HA_SPIN_LOCK(SERVER_LOCK, &s->lock); - - /* First, check if we can handle some connections queued at the proxy. We - * will take as many as we can handle. - */ maxconn = srv_dynamic_maxconn(s); while (s->served < maxconn) { - struct stream *strm = pendconn_get_next_strm(s, p); - - if (strm == NULL) + if (pendconn_process_next_strm(s, p)) break; - task_wakeup(strm->task, TASK_WOKEN_RES); } HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock); HA_SPIN_UNLOCK(PROXY_LOCK, &p->lock); @@ -165,39 +177,50 @@ void process_srv_queue(struct server *s) * are updated accordingly. Returns NULL if no memory is available, otherwise the * pendconn itself. If the stream was already marked as served, its flag is * cleared. It is illegal to call this function with a non-NULL strm->srv_conn. + * + * This function must be called by the stream itself, so in the context of + * process_stream. */ struct pendconn *pendconn_add(struct stream *strm) { struct pendconn *p; - struct server *srv; - int count; + struct proxy *px; + struct server *srv; p = pool_alloc(pool_head_pendconn); if (!p) return NULL; - strm->pend_pos = p; - p->strm = strm; srv = objt_server(strm->target); + px = strm->be; + + p->srv = NULL; + p->px = px; + p->strm = strm; + p->strm_flags = strm->flags; + HA_SPIN_INIT(&p->lock); if ((strm->flags & SF_ASSIGNED) && srv) { p->srv = srv; HA_SPIN_LOCK(SERVER_LOCK, &srv->lock); + srv->nbpend++; + strm->logs.srv_queue_size += srv->nbpend; + if (srv->nbpend > srv->counters.nbpend_max) + srv->counters.nbpend_max = srv->nbpend; LIST_ADDQ(&srv->pendconns, &p->list); HA_SPIN_UNLOCK(SERVER_LOCK, &srv->lock); - count = HA_ATOMIC_ADD(&srv->nbpend, 1); - strm->logs.srv_queue_size += count; - HA_ATOMIC_UPDATE_MAX(&srv->counters.nbpend_max, count); - } else { - p->srv = NULL; - HA_SPIN_LOCK(PROXY_LOCK, &strm->be->lock); - LIST_ADDQ(&strm->be->pendconns, &p->list); - HA_SPIN_UNLOCK(PROXY_LOCK, &strm->be->lock); - count = HA_ATOMIC_ADD(&strm->be->nbpend, 1); - strm->logs.prx_queue_size += count; - HA_ATOMIC_UPDATE_MAX(&strm->be->be_counters.nbpend_max, count); } - HA_ATOMIC_ADD(&strm->be->totpend, 1); + else { + HA_SPIN_LOCK(PROXY_LOCK, &px->lock); + px->nbpend++; + strm->logs.prx_queue_size += px->nbpend; + if (px->nbpend > px->be_counters.nbpend_max) + px->be_counters.nbpend_max = px->nbpend; + LIST_ADDQ(&px->pendconns, &p->list); + HA_SPIN_UNLOCK(PROXY_LOCK, &px->lock); + } + HA_ATOMIC_ADD(&px->totpend, 1); + strm->pend_pos = p; return p; } @@ -206,26 +229,28 @@ struct pendconn *pendconn_add(struct stream *strm) */ int pendconn_redistribute(struct server *s) { - struct pendconn *pc, *pc_bck; + struct pendconn *p, *pback; int xferred = 0; + /* The REDISP option was specified. We will ignore cookie and force to + * balance or use the dispatcher. */ + if ((s->proxy->options & (PR_O_REDISP|PR_O_PERSIST)) != PR_O_REDISP) + return 0; + HA_SPIN_LOCK(SERVER_LOCK, &s->lock); - list_for_each_entry_safe(pc, pc_bck, &s->pendconns, list) { - struct stream *strm = pc->strm; + list_for_each_entry_safe(p, pback, &s->pendconns, list) { + if (p->strm_flags & SF_FORCE_PRST) + continue; - if ((strm->be->options & (PR_O_REDISP|PR_O_PERSIST)) == PR_O_REDISP && - !(strm->flags & SF_FORCE_PRST)) { - /* The REDISP option was specified. We will ignore - * cookie and force to balance or use the dispatcher. - */ + if (HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock)) + continue; - /* it's left to the dispatcher to choose a server */ - strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET); + /* it's left to the dispatcher to choose a server */ + pendconn_unlink(p); + p->strm_flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET); - __pendconn_free(pc); - task_wakeup(strm->task, TASK_WOKEN_RES); - xferred++; - } + task_wakeup(p->strm->task, TASK_WOKEN_RES); + HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); } HA_SPIN_UNLOCK(SERVER_LOCK, &s->lock); return xferred; @@ -238,65 +263,110 @@ int pendconn_redistribute(struct server *s) */ int pendconn_grab_from_px(struct server *s) { - int xferred; + struct pendconn *p, *pback; + int maxconn, xferred = 0; if (!srv_currently_usable(s)) return 0; HA_SPIN_LOCK(PROXY_LOCK, &s->proxy->lock); - for (xferred = 0; !s->maxconn || xferred < srv_dynamic_maxconn(s); xferred++) { - struct stream *strm; - struct pendconn *p; - - p = pendconn_from_px(s->proxy); - if (!p) + maxconn = srv_dynamic_maxconn(s); + list_for_each_entry_safe(p, pback, &s->proxy->pendconns, list) { + if (s->maxconn && s->served + xferred >= maxconn) break; - p->strm->target = &s->obj_type; - strm = p->strm; - __pendconn_free(p); - task_wakeup(strm->task, TASK_WOKEN_RES); + + if (HA_SPIN_TRYLOCK(PENDCONN_LOCK, &p->lock)) + continue; + + pendconn_unlink(p); + p->srv = s; + + task_wakeup(p->strm->task, TASK_WOKEN_RES); + HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); + xferred++; } HA_SPIN_UNLOCK(PROXY_LOCK, &s->proxy->lock); return xferred; } -/* - * Detaches pending connection

, decreases the pending count, and frees - * the pending connection. The connection might have been queued to a specific - * server as well as to the proxy. The stream also gets marked unqueued. +/* Try to dequeue pending connection attached to the stream . It must + * always exists here. If the pendconn is still linked to the server or the + * proxy queue, nothing is done and the function returns 1. Otherwise, + * ->flags and ->target are updated, the pendconn is released and 0 + * is returned. + * + * This function must be called by the stream itself, so in the context of + * process_stream. + */ +int pendconn_dequeue(struct stream *strm) +{ + struct pendconn *p; + + if (unlikely(!strm->pend_pos)) { + /* unexpected case because it is called by the stream itself and + * only the stream can release a pendconn. So it is only + * possible if a pendconn is released by someone else or if the + * stream is supposed to be queued but without its associated + * pendconn. In both cases it is a bug! */ + abort(); + } + p = strm->pend_pos; + HA_SPIN_LOCK(PENDCONN_LOCK, &p->lock); + + /* the pendconn is still linked to the server/proxy queue, so unlock it + * and go away. */ + if (!LIST_ISEMPTY(&p->list)) { + HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); + return 1; + } + + /* the pendconn must be dequeued now */ + if (p->srv) + strm->target = &p->srv->obj_type; + + strm->flags &= ~(SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET); + strm->flags |= p->strm_flags & (SF_DIRECT | SF_ASSIGNED | SF_ADDR_SET); + strm->pend_pos = NULL; + HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); + pool_free(pool_head_pendconn, p); + return 0; +} + +/* Release the pending connection

, and decreases the pending count if + * needed. The connection might have been queued to a specific server as well as + * to the proxy. The stream also gets marked unqueued.

must always be + * defined here. So it is the caller responsibility to check its existance. + * + * This function must be called by the stream itself, so in the context of + * process_stream. */ void pendconn_free(struct pendconn *p) { + struct stream *strm = p->strm; + + HA_SPIN_LOCK(PENDCONN_LOCK, &p->lock); + + /* The pendconn was already unlinked, just release it. */ + if (LIST_ISEMPTY(&p->list)) + goto release; + if (p->srv) { HA_SPIN_LOCK(SERVER_LOCK, &p->srv->lock); + p->srv->nbpend--; LIST_DEL(&p->list); HA_SPIN_UNLOCK(SERVER_LOCK, &p->srv->lock); - HA_ATOMIC_SUB(&p->srv->nbpend, 1); } else { - HA_SPIN_LOCK(SERVER_LOCK, &p->strm->be->lock); + HA_SPIN_LOCK(PROXY_LOCK, &p->px->lock); + p->px->nbpend--; LIST_DEL(&p->list); - HA_SPIN_UNLOCK(SERVER_LOCK, &p->strm->be->lock); - HA_ATOMIC_SUB(&p->strm->be->nbpend, 1); + HA_SPIN_UNLOCK(PROXY_LOCK, &p->px->lock); } - p->strm->pend_pos = NULL; - HA_ATOMIC_SUB(&p->strm->be->totpend, 1); - pool_free(pool_head_pendconn, p); -} + HA_ATOMIC_SUB(&p->px->totpend, 1); -/* Lock-free version of pendconn_free. */ -static void __pendconn_free(struct pendconn *p) -{ - if (p->srv) { - LIST_DEL(&p->list); - HA_ATOMIC_SUB(&p->srv->nbpend, 1); - } - else { - LIST_DEL(&p->list); - HA_ATOMIC_SUB(&p->strm->be->nbpend, 1); - } - p->strm->pend_pos = NULL; - HA_ATOMIC_SUB(&p->strm->be->totpend, 1); + release: + strm->pend_pos = NULL; + HA_SPIN_UNLOCK(PENDCONN_LOCK, &p->lock); pool_free(pool_head_pendconn, p); } diff --git a/src/stream.c b/src/stream.c index afa9f9933..38d724006 100644 --- a/src/stream.c +++ b/src/stream.c @@ -929,7 +929,7 @@ static void sess_update_stream_int(struct stream *s) } else if (si->state == SI_ST_QUE) { /* connection request was queued, check for any update */ - if (!s->pend_pos) { + if (!pendconn_dequeue(s)) { /* The connection is not in the queue anymore. Either * we have a server connection slot available and we * go directly to the assigned state, or we need to