On 7/2/20 2:14 AM, ylavic@apache.org wrote:
> Author: ylavic
> Date: Thu Jul 2 00:14:26 2020
> New Revision: 1879419
>
> URL: http://svn.apache.org/viewvc?rev=1879419&view=rev
> Log:
> mod_proxy_http: handle async tunneling of Upgrade(d) protocols.
>
> When supported by the MPM (i.e. "event"), provide async callbacks and let
> them be scheduled by ap_mpm_register_poll_callback_timeout(), while the
> handler returns SUSPENDED.
>
> The new ProxyAsyncDelay directive (if positive) enables async handling,
> while ProxyAsyncIdleTimeout determines the timeout applied on both ends
> while tunneling.
>
> Github: closes #126
>
>
> Modified:
> httpd/httpd/trunk/modules/proxy/mod_proxy.c
> httpd/httpd/trunk/modules/proxy/mod_proxy.h
> httpd/httpd/trunk/modules/proxy/mod_proxy_http.c
>
> Modified: httpd/httpd/trunk/modules/proxy/mod_proxy_http.c
> URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/proxy/mod_proxy_http.c?rev=1879419&r1=1879418&r2=1879419&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/modules/proxy/mod_proxy_http.c (original)
> +++ httpd/httpd/trunk/modules/proxy/mod_proxy_http.c Thu Jul 2 00:14:26 2020
> @@ -229,29 +233,129 @@ typedef enum {
> typedef struct {
> apr_pool_t *p;
> request_rec *r;
> + const char *proto;
> proxy_worker *worker;
> + proxy_dir_conf *dconf;
> proxy_server_conf *sconf;
> -
> char server_portstr[32];
> +
> proxy_conn_rec *backend;
> conn_rec *origin;
>
> apr_bucket_alloc_t *bucket_alloc;
> apr_bucket_brigade *header_brigade;
> apr_bucket_brigade *input_brigade;
> +
> char *old_cl_val, *old_te_val;
> apr_off_t cl_val;
>
> + proxy_http_state state;
> rb_methods rb_method;
>
> - int force10;
> const char *upgrade;
> -
> - int expecting_100;
> - unsigned int do_100_continue:1,
> - prefetch_nonblocking:1;
> + proxy_tunnel_rec *tunnel;
> + apr_array_header_t *pfds;
> + apr_interval_time_t idle_timeout;
> +
> + unsigned int can_go_async :1,
> + expecting_100 :1,
> + do_100_continue :1,
> + prefetch_nonblocking :1,
> + force10 :1;
> } proxy_http_req_t;
>
> +static void proxy_http_async_finish(proxy_http_req_t *req)
> +{
> + conn_rec *c = req->r->connection;
> +
> + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> + "proxy %s: finish async", req->proto);
> +
> + proxy_run_detach_backend(req->r, req->backend);
> + ap_proxy_release_connection(req->proto, req->backend, req->r->server);
> +
> + ap_finalize_request_protocol(req->r);
> + ap_process_request_after_handler(req->r);
> + /* don't touch req or req->r from here */
> +
> + c->cs->state = CONN_STATE_LINGER;
> + ap_mpm_resume_suspended(c);
> +}
> +
> +/* If neither socket becomes readable in the specified timeout,
> + * this callback will kill the request.
> + * We do not have to worry about having a cancel and a IO both queued.
> + */
> +static void proxy_http_async_cancel_cb(void *baton)
> +{
> + proxy_http_req_t *req = (proxy_http_req_t *)baton;
> +
> + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> + "proxy %s: cancel async", req->proto);
> +
> + req->r->connection->keepalive = AP_CONN_CLOSE;
> + req->backend->close = 1;
> + proxy_http_async_finish(req);
> +}
> +
> +/* Invoked by the event loop when data is ready on either end.
> + * We don't need the invoke_mtx, since we never put multiple callback events
> + * in the queue.
> + */
> +static void proxy_http_async_cb(void *baton)
> +{
> + proxy_http_req_t *req = (proxy_http_req_t *)baton;
> + int status;
> +
> + if (req->pfds) {
> + apr_pool_clear(req->pfds->pool);
> + }
> +
> + switch (req->state) {
> + case PROXY_HTTP_TUNNELING:
> + /* Pump both ends until they'd block and then start over again */
> + status = ap_proxy_tunnel_run(req->tunnel);
> + if (status == HTTP_GATEWAY_TIME_OUT) {
> + if (req->pfds) {
> + apr_pollfd_t *async_pfds = (void *)req->pfds->elts;
> + apr_pollfd_t *tunnel_pfds = (void *)req->tunnel->pfds->elts;
> + async_pfds[0].reqevents = tunnel_pfds[0].reqevents;
> + async_pfds[1].reqevents = tunnel_pfds[1].reqevents;
What is the purpose of this?
async_pfds and tunnel_pfds are local to this block and cannot be used outside this block.
> + }
> + else {
> + req->pfds = apr_array_copy(req->p, req->tunnel->pfds);
> + apr_pool_create(&req->pfds->pool, req->p);
Why first using baton->r->pool to create the copy and then setting the pool of the array to the new pool?
> + }
> + status = SUSPENDED;
> + }
> + break;
> +
> + default:
> + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, req->r,
> + "proxy %s: unexpected async state (%i)",
> + req->proto, (int)req->state);
> + status = HTTP_INTERNAL_SERVER_ERROR;
> + break;
> + }
> +
> + if (status == SUSPENDED) {
> + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> + "proxy %s: suspended, going async",
> + req->proto);
> +
> + ap_mpm_register_poll_callback_timeout(req->pfds,
> + proxy_http_async_cb,
> + proxy_http_async_cancel_cb,
> + req, req->idle_timeout);
> + }
> + else if (status != OK) {
> + proxy_http_async_cancel_cb(req);
> + }
> + else {
> + proxy_http_async_finish(req);
> + }
> +}
> +
> /* Read what's in the client pipe. If nonblocking is set and read is EAGAIN,
> * pass a FLUSH bucket to the backend and read again in blocking mode.
> */
Regards
Rüdiger
> Author: ylavic
> Date: Thu Jul 2 00:14:26 2020
> New Revision: 1879419
>
> URL: http://svn.apache.org/viewvc?rev=1879419&view=rev
> Log:
> mod_proxy_http: handle async tunneling of Upgrade(d) protocols.
>
> When supported by the MPM (i.e. "event"), provide async callbacks and let
> them be scheduled by ap_mpm_register_poll_callback_timeout(), while the
> handler returns SUSPENDED.
>
> The new ProxyAsyncDelay directive (if positive) enables async handling,
> while ProxyAsyncIdleTimeout determines the timeout applied on both ends
> while tunneling.
>
> Github: closes #126
>
>
> Modified:
> httpd/httpd/trunk/modules/proxy/mod_proxy.c
> httpd/httpd/trunk/modules/proxy/mod_proxy.h
> httpd/httpd/trunk/modules/proxy/mod_proxy_http.c
>
> Modified: httpd/httpd/trunk/modules/proxy/mod_proxy_http.c
> URL: http://svn.apache.org/viewvc/httpd/httpd/trunk/modules/proxy/mod_proxy_http.c?rev=1879419&r1=1879418&r2=1879419&view=diff
> ==============================================================================
> --- httpd/httpd/trunk/modules/proxy/mod_proxy_http.c (original)
> +++ httpd/httpd/trunk/modules/proxy/mod_proxy_http.c Thu Jul 2 00:14:26 2020
> @@ -229,29 +233,129 @@ typedef enum {
> typedef struct {
> apr_pool_t *p;
> request_rec *r;
> + const char *proto;
> proxy_worker *worker;
> + proxy_dir_conf *dconf;
> proxy_server_conf *sconf;
> -
> char server_portstr[32];
> +
> proxy_conn_rec *backend;
> conn_rec *origin;
>
> apr_bucket_alloc_t *bucket_alloc;
> apr_bucket_brigade *header_brigade;
> apr_bucket_brigade *input_brigade;
> +
> char *old_cl_val, *old_te_val;
> apr_off_t cl_val;
>
> + proxy_http_state state;
> rb_methods rb_method;
>
> - int force10;
> const char *upgrade;
> -
> - int expecting_100;
> - unsigned int do_100_continue:1,
> - prefetch_nonblocking:1;
> + proxy_tunnel_rec *tunnel;
> + apr_array_header_t *pfds;
> + apr_interval_time_t idle_timeout;
> +
> + unsigned int can_go_async :1,
> + expecting_100 :1,
> + do_100_continue :1,
> + prefetch_nonblocking :1,
> + force10 :1;
> } proxy_http_req_t;
>
> +static void proxy_http_async_finish(proxy_http_req_t *req)
> +{
> + conn_rec *c = req->r->connection;
> +
> + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> + "proxy %s: finish async", req->proto);
> +
> + proxy_run_detach_backend(req->r, req->backend);
> + ap_proxy_release_connection(req->proto, req->backend, req->r->server);
> +
> + ap_finalize_request_protocol(req->r);
> + ap_process_request_after_handler(req->r);
> + /* don't touch req or req->r from here */
> +
> + c->cs->state = CONN_STATE_LINGER;
> + ap_mpm_resume_suspended(c);
> +}
> +
> +/* If neither socket becomes readable in the specified timeout,
> + * this callback will kill the request.
> + * We do not have to worry about having a cancel and a IO both queued.
> + */
> +static void proxy_http_async_cancel_cb(void *baton)
> +{
> + proxy_http_req_t *req = (proxy_http_req_t *)baton;
> +
> + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> + "proxy %s: cancel async", req->proto);
> +
> + req->r->connection->keepalive = AP_CONN_CLOSE;
> + req->backend->close = 1;
> + proxy_http_async_finish(req);
> +}
> +
> +/* Invoked by the event loop when data is ready on either end.
> + * We don't need the invoke_mtx, since we never put multiple callback events
> + * in the queue.
> + */
> +static void proxy_http_async_cb(void *baton)
> +{
> + proxy_http_req_t *req = (proxy_http_req_t *)baton;
> + int status;
> +
> + if (req->pfds) {
> + apr_pool_clear(req->pfds->pool);
> + }
> +
> + switch (req->state) {
> + case PROXY_HTTP_TUNNELING:
> + /* Pump both ends until they'd block and then start over again */
> + status = ap_proxy_tunnel_run(req->tunnel);
> + if (status == HTTP_GATEWAY_TIME_OUT) {
> + if (req->pfds) {
> + apr_pollfd_t *async_pfds = (void *)req->pfds->elts;
> + apr_pollfd_t *tunnel_pfds = (void *)req->tunnel->pfds->elts;
> + async_pfds[0].reqevents = tunnel_pfds[0].reqevents;
> + async_pfds[1].reqevents = tunnel_pfds[1].reqevents;
What is the purpose of this?
async_pfds and tunnel_pfds are local to this block and cannot be used outside this block.
> + }
> + else {
> + req->pfds = apr_array_copy(req->p, req->tunnel->pfds);
> + apr_pool_create(&req->pfds->pool, req->p);
Why first using baton->r->pool to create the copy and then setting the pool of the array to the new pool?
> + }
> + status = SUSPENDED;
> + }
> + break;
> +
> + default:
> + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, req->r,
> + "proxy %s: unexpected async state (%i)",
> + req->proto, (int)req->state);
> + status = HTTP_INTERNAL_SERVER_ERROR;
> + break;
> + }
> +
> + if (status == SUSPENDED) {
> + ap_log_rerror(APLOG_MARK, APLOG_TRACE1, 0, req->r,
> + "proxy %s: suspended, going async",
> + req->proto);
> +
> + ap_mpm_register_poll_callback_timeout(req->pfds,
> + proxy_http_async_cb,
> + proxy_http_async_cancel_cb,
> + req, req->idle_timeout);
> + }
> + else if (status != OK) {
> + proxy_http_async_cancel_cb(req);
> + }
> + else {
> + proxy_http_async_finish(req);
> + }
> +}
> +
> /* Read what's in the client pipe. If nonblocking is set and read is EAGAIN,
> * pass a FLUSH bucket to the backend and read again in blocking mode.
> */
Regards
Rüdiger