diff --git a/src/server.c b/src/server.c index 6eb34dcf5..8ff084a9d 100644 --- a/src/server.c +++ b/src/server.c @@ -72,6 +72,8 @@ __decl_thread(HA_SPINLOCK_T idle_conn_srv_lock); struct eb_root idle_conn_srv = EB_ROOT; struct task *idle_conn_task __read_mostly = NULL; struct list servers_list = LIST_HEAD_INIT(servers_list); +static struct task *server_atomic_sync_task = NULL; +static event_hdl_async_equeue server_atomic_sync_queue; /* SERVER DELETE(n)->ADD global tracker: * This is meant to provide srv->rid (revision id) value. @@ -168,7 +170,8 @@ int srv_getinter(const struct check *check) /* Update server's addr:svc_port tuple in INET context * - * Must be called with server lock held + * Must be called under thread isolation to ensure consistent readings accross + * all threads (addr:svc_port might be read without srv lock being held). */ void _srv_set_inetaddr(struct server *srv, const struct sockaddr_storage *addr, unsigned int svc_port) { @@ -176,6 +179,163 @@ void _srv_set_inetaddr(struct server *srv, const struct sockaddr_storage *addr, srv->svc_port = svc_port; } +/* + * Function executed by server_atomic_sync_task to perform atomic updates on + * compatible server struct members that are not guarded by any lock since + * they are not supposed to change often and are subject to being used in + * sensitive codepaths + * + * Some updates may require thread isolation: we start without isolation + * but as soon as we encounter an event that requires isolation, we do so. + * Once the event is processed, we keep the isolation until we've processed + * the whole batch of events and leave isolation once we're done, as it would + * be very costly to try to acquire isolation multiple times in a row. + * The task will limit itself to a number of events per run to prevent + * thread contention (see: "tune.events.max-events-at-once"). + * + * TODO: if we find out that enforcing isolation is too costly, we may + * consider adding thread_isolate_try_full(timeout) or equivalent to the + * thread API so that we can do our best not to block harmless threads + * for too long if one or multiple threads are still heavily busy. This + * would mean that the task would be capable of rescheduling itself to + * start again on the current event if it failed to acquire thread + * isolation. This would also imply that the event_hdl API allows us + * to check an event without popping it from the queue first (remove the + * event once it is successfully processed). + */ +static void srv_set_addr_desc(struct server *s, int reattach); +static struct task *server_atomic_sync(struct task *task, void *context, unsigned int state) +{ + unsigned int remain = event_hdl_tune.max_events_at_once; // to limit max number of events per batch + struct event_hdl_async_event *event; + + /* check for new server events that we care about */ + while ((event = event_hdl_async_equeue_pop(&server_atomic_sync_queue))) { + if (event_hdl_sub_type_equal(event->type, EVENT_HDL_SUB_END)) { + /* ending event: no more events to come */ + event_hdl_async_free_event(event); + task_destroy(task); + task = NULL; + break; + } + + if (!remain) { + /* STOP: we've already spent all our budget here, and + * considering we possibly are under isolation, we cannot + * keep blocking other threads any longer. + * + * Reschedule the task to finish where we left off if + * there are remaining events in the queue. + */ + if (!event_hdl_async_equeue_isempty(&server_atomic_sync_queue)) + task_wakeup(task, TASK_WOKEN_OTHER); + break; + } + remain--; + + /* new event to process */ + if (event_hdl_sub_type_equal(event->type, EVENT_HDL_SUB_SERVER_INETADDR)) { + struct sockaddr_storage new_addr; + struct event_hdl_cb_data_server_inetaddr *data = event->data; + struct proxy *px; + struct server *srv; + + /* server ip:port changed, we must atomically update data members + * to prevent invalid reads by other threads. + */ + + /* check if related server still exists */ + px = proxy_find_by_id(data->server.safe.proxy_uuid, PR_CAP_BE, 0); + if (!px) + continue; + srv = findserver_unique_id(px, data->server.safe.puid, data->server.safe.rid); + if (!srv) + continue; + + /* prepare new addr based on event cb data */ + memset(&new_addr, 0, sizeof(new_addr)); + new_addr.ss_family = data->safe.next.family; + switch (new_addr.ss_family) { + case AF_INET: + ((struct sockaddr_in *)&new_addr)->sin_addr.s_addr = + data->safe.next.addr.v4.s_addr; + break; + case AF_INET6: + memcpy(&((struct sockaddr_in6 *)&new_addr)->sin6_addr, + &data->safe.next.addr.v6, + sizeof(struct in6_addr)); + break; + default: + /* should not happen */ + break; + } + /* + * this requires thread isolation, which is safe since we're the only + * task working for the current subscription and we don't hold locks + * or ressources that other threads may depend on to complete a running + * cycle. Note that we do this way because we assume that this event is + * rather rare. + */ + if (!thread_isolated()) + thread_isolate_full(); + + /* apply new addr:port combination */ + _srv_set_inetaddr(srv, &new_addr, data->safe.next.svc_port); + + /* propagate the changes */ + if (data->safe.purge_conn) /* force connection cleanup on the given server? */ + srv_cleanup_connections(srv); + srv_set_dyncookie(srv); + srv_set_addr_desc(srv, 1); + } + event_hdl_async_free_event(event); + } + + /* some events possibly required thread_isolation: + * now that we are done, we must leave thread isolation before + * returning + */ + if (thread_isolated()) + thread_release(); + + return task; +} + +/* Try to start the atomic server sync task. + * + * Returns ERR_NONE on success and a combination of ERR_CODE on failure + */ +static int server_atomic_sync_start() +{ + struct event_hdl_sub_type subscriptions = EVENT_HDL_SUB_NONE; + + if (server_atomic_sync_task) + return ERR_NONE; // nothing to do + server_atomic_sync_task = task_new_anywhere(); + if (!server_atomic_sync_task) + goto fail; + server_atomic_sync_task->process = server_atomic_sync; + event_hdl_async_equeue_init(&server_atomic_sync_queue); + + /* task created, now subscribe to relevant server events in the global list */ + subscriptions = event_hdl_sub_type_add(subscriptions, EVENT_HDL_SUB_SERVER_INETADDR); + if (!event_hdl_subscribe(NULL, subscriptions, + EVENT_HDL_ASYNC_TASK(&server_atomic_sync_queue, + server_atomic_sync_task, + NULL, + NULL))) + goto fail; + + + return ERR_NONE; + + fail: + task_destroy(server_atomic_sync_task); + server_atomic_sync_task = NULL; + return ERR_ALERT | ERR_FATAL; +} +REGISTER_POST_CHECK(server_atomic_sync_start); + /* fill common server event data members struct * must be called with server lock or under thread isolate */ @@ -3601,14 +3761,9 @@ int srv_update_addr(struct server *s, void *ip, int ip_sin_family, const char *u _srv_event_hdl_prepare(&cb_data.common, s, 0); _srv_event_hdl_prepare_inetaddr(&cb_data.addr, &s->addr, s->svc_port, &new_addr, s->svc_port, 0); - /* apply the new IP address */ - _srv_set_inetaddr(s, &new_addr, s->svc_port); - + /* server_atomic_sync_task will apply the changes for us */ _srv_event_hdl_publish(EVENT_HDL_SUB_SERVER_INETADDR, cb_data, s); - srv_set_dyncookie(s); - srv_set_addr_desc(s, 1); - return 0; } @@ -3866,17 +4021,8 @@ out: ((port_change) ? new_port : s->svc_port), 1); - /* apply new ip and port */ - _srv_set_inetaddr(s, - ((ip_change) ? &sa : &s->addr), - ((port_change) ? new_port : s->svc_port)); - + /* server_atomic_sync_task will apply the changes for us */ _srv_event_hdl_publish(EVENT_HDL_SUB_SERVER_INETADDR, cb_data, s); - - /* force connection cleanup on the given server */ - srv_cleanup_connections(s); - srv_set_dyncookie(s); - srv_set_addr_desc(s, 1); } if (updater) chunk_appendf(msg, " by '%s'", updater);