diff --git a/include/haproxy/stream.h b/include/haproxy/stream.h index 6781249e8..e806a2a6f 100644 --- a/include/haproxy/stream.h +++ b/include/haproxy/stream.h @@ -64,8 +64,8 @@ void stream_free(struct stream *s); int stream_upgrade_from_sc(struct stconn *sc, struct buffer *input); int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_proto); -/* kill a stream and set the termination flags to (one of SF_ERR_*) */ -void stream_shutdown(struct stream *stream, int why); +/* shutdown the stream from itself */ +void stream_shutdown_self(struct stream *stream, int why); void stream_dump_and_crash(enum obj_type *obj, int rate); void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const char *pfx, uint32_t anon_key); @@ -385,6 +385,22 @@ static inline int stream_check_conn_timeout(struct stream *s) return 0; } +/* Wake a stream up for shutdown by sending it an event. The stream must be + * locked one way or another so that it cannot leave (i.e. when inspecting + * a locked list or under thread isolation). Process_stream() will recognize + * the message and complete the job. only supports SF_ERR_DOWN (mapped + * to UEVT1) and SF_ERR_KILLED (mapped to UEVT2). Other values will just + * trigger TASK_WOKEN_OTHER. The stream handler will first call function + * stream_shutdown_self() on wakeup to complete the notification. + */ +static inline void stream_shutdown(struct stream *s, int why) +{ + task_wakeup(s->task, TASK_WOKEN_OTHER | + ((why == SF_ERR_DOWN) ? TASK_F_UEVT1 : + (why == SF_ERR_KILLED) ? TASK_F_UEVT2 : + 0)); +} + int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout); void stream_retnclose(struct stream *s, const struct buffer *msg); void sess_set_term_flags(struct stream *s); diff --git a/src/stream.c b/src/stream.c index 9b48e7d78..f689edd76 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1722,6 +1722,12 @@ void stream_update_timings(struct task *t, uint64_t lat, uint64_t cpu) * nothing too many times, the request and response buffers flags are monitored * and each function is called only if at least another function has changed at * least one flag it is interested in. + * + * This task handler understands a few wake up reasons: + * - TASK_WOKEN_MSG forces analysers to be re-evaluated + * - TASK_WOKEN_OTHER+TASK_F_UEVT1 shuts the stream down on server down + * - TASK_WOKEN_OTHER+TASK_F_UEVT2 shuts the stream down on active kill + * - TASK_WOKEN_OTHER alone has no effect */ struct task *process_stream(struct task *t, void *context, unsigned int state) { @@ -1742,6 +1748,11 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) activity[tid].stream_calls++; stream_cond_update_cpu_latency(s); + if ((state & TASK_WOKEN_OTHER) && (state & (TASK_F_UEVT1 | TASK_F_UEVT2))) { + /* that an instant kill message, the reason is in _UEVT* */ + stream_shutdown_self(s, (state & TASK_F_UEVT2) ? SF_ERR_KILLED : SF_ERR_DOWN); + } + req = &s->req; res = &s->res; @@ -2776,8 +2787,12 @@ void default_srv_error(struct stream *s, struct stconn *sc) s->flags |= fin; } -/* kill a stream and set the termination flags to (one of SF_ERR_*) */ -void stream_shutdown(struct stream *stream, int why) +/* shutdown the stream from itself. It's also possible for another one from the + * same thread but then an explicit wakeup will be needed since this function + * does not perform it. is a set of SF_ERR_* flags to pass as the cause + * for shutting down. + */ +void stream_shutdown_self(struct stream *stream, int why) { if (stream->scb->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) return; @@ -2787,7 +2802,6 @@ void stream_shutdown(struct stream *stream, int why) stream->task->nice = 1024; if (!(stream->flags & SF_ERR_MASK)) stream->flags |= why; - task_wakeup(stream->task, TASK_WOKEN_OTHER); } /* dumps an error message for type at ptr related to stream ,