From 40684b12b4550dbb8f8d90954383ccb15d0aec96 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Wed, 18 Apr 2012 16:37:18 +0400 Subject: [PATCH] implemented shared output chains --- ngx_rtmp.h | 19 +++-- ngx_rtmp_core_module.c | 12 +-- ngx_rtmp_handler.c | 184 +++++++++++++++-------------------------- ngx_rtmp_live_module.c | 4 +- ngx_rtmp_send.c | 4 +- ngx_rtmp_shared.c | 83 ++++--------------- test/nginx.conf | 2 +- 7 files changed, 105 insertions(+), 203 deletions(-) diff --git a/ngx_rtmp.h b/ngx_rtmp.h index d86a59c..6f978f7 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -216,8 +216,11 @@ typedef struct { ngx_pool_t *in_old_pool; ngx_int_t in_chunk_size_changing; - ngx_chain_t *out; - ngx_chain_t *out_free_chains; + /* circular buffer of RTMP message pointers */ + ngx_chain_t **out_start, **out_end; + ngx_chain_t **out_pos, **out_last; + ngx_chain_t *out_chain; + u_char *out_bpos; } ngx_rtmp_session_t; @@ -260,8 +263,7 @@ typedef struct ngx_rtmp_core_srv_conf_s { ngx_int_t chunk_size; ngx_pool_t *pool; ngx_chain_t *free; - ngx_chain_t *free_chains; - size_t max_buf; + size_t max_queue; size_t max_message; ngx_flag_t play_time_fix; ngx_flag_t publish_time_fix; @@ -361,11 +363,9 @@ ngx_int_t ngx_rtmp_amf_shared_object_handler(ngx_rtmp_session_t *s, /* Shared output buffers */ ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf); -void ngx_rtmp_free_shared_bufs(ngx_rtmp_core_srv_conf_t *cscf, - ngx_chain_t *out); -void ngx_rtmp_free_shared_buf(ngx_rtmp_core_srv_conf_t *cscf, - ngx_buf_t *b); -void ngx_rtmp_acquire_shared_buf(ngx_buf_t *b); +void ngx_rtmp_acquire_shared_chain(ngx_chain_t *in); +void ngx_rtmp_free_shared_chain(ngx_rtmp_core_srv_conf_t *cscf, + ngx_chain_t *in); ngx_chain_t * ngx_rtmp_append_shared_bufs(ngx_rtmp_core_srv_conf_t *cscf, ngx_chain_t *head, ngx_chain_t *in); @@ -380,6 +380,7 @@ ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out, * the bigger value the lower the priority. * priority=0 is the highest */ + #define NGX_RTMP_LIMIT_SOFT 0 #define NGX_RTMP_LIMIT_HARD 1 #define NGX_RTMP_LIMIT_DYNAMIC 2 diff --git a/ngx_rtmp_core_module.c b/ngx_rtmp_core_module.c index 2a509e8..d16d991 100644 --- a/ngx_rtmp_core_module.c +++ b/ngx_rtmp_core_module.c @@ -89,11 +89,11 @@ static ngx_command_t ngx_rtmp_core_commands[] = { offsetof(ngx_rtmp_core_srv_conf_t, chunk_size), NULL }, - { ngx_string("max_buf"), + { ngx_string("max_queue"), NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1, ngx_conf_set_size_slot, NGX_RTMP_SRV_CONF_OFFSET, - offsetof(ngx_rtmp_core_srv_conf_t, max_buf), + offsetof(ngx_rtmp_core_srv_conf_t, max_queue), NULL }, { ngx_string("max_message"), @@ -199,7 +199,7 @@ ngx_rtmp_core_create_srv_conf(ngx_conf_t *cf) conf->max_streams = NGX_CONF_UNSET; conf->chunk_size = NGX_CONF_UNSET; conf->ack_window = NGX_CONF_UNSET; - conf->max_buf = NGX_CONF_UNSET; + conf->max_queue = NGX_CONF_UNSET; conf->max_message = NGX_CONF_UNSET; conf->play_time_fix = NGX_CONF_UNSET; conf->publish_time_fix = NGX_CONF_UNSET; @@ -217,11 +217,11 @@ ngx_rtmp_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_msec_value(conf->timeout, prev->timeout, 60000); ngx_conf_merge_value(conf->so_keepalive, prev->so_keepalive, 0); - ngx_conf_merge_value(conf->max_streams, prev->max_streams, 64); + ngx_conf_merge_value(conf->max_streams, prev->max_streams, 16); ngx_conf_merge_value(conf->chunk_size, prev->chunk_size, 4096); ngx_conf_merge_uint_value(conf->ack_window, prev->ack_window, 5000000); - ngx_conf_merge_size_value(conf->max_buf, prev->max_buf, 128 * 1024); - ngx_conf_merge_size_value(conf->max_message, prev->max_message, 4 * 1024 * 1024); + ngx_conf_merge_size_value(conf->max_queue, prev->max_queue, 256); + ngx_conf_merge_size_value(conf->max_message, prev->max_message, 1 * 1024 * 1024); ngx_conf_merge_value(conf->play_time_fix, prev->play_time_fix, 1); ngx_conf_merge_value(conf->publish_time_fix, prev->publish_time_fix, 1); diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index 6b916ee..91928f8 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -48,9 +48,9 @@ ngx_rtmp_message_type(uint8_t type) { "amf3_meta", "amf3_shared", "amd3_cmd", - "amf0_meta", - "amf0_shared", - "amf0_cmd", + "amf_meta", + "amf_shared", + "amf_cmd", "?", "aggregate" }; @@ -233,6 +233,7 @@ ngx_rtmp_init_session(ngx_connection_t *c) return; } + s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t) * cscf->max_streams); if (s->in_streams == NULL) { @@ -241,8 +242,19 @@ ngx_rtmp_init_session(ngx_connection_t *c) } size = NGX_RTMP_HANDSHAKE_SIZE + 1; + + s->out_start = ngx_palloc(c->pool, sizeof(ngx_chain_t *) * cscf->max_queue); + if (s->out_start == NULL) { + ngx_rtmp_close_connection(c); + return; + } + s->out_pos = s->out_last = s->out_start; + s->out_end = s->out_start + cscf->max_queue; + + ngx_rtmp_set_chunk_size(s, NGX_RTMP_DEFAULT_CHUNK_SIZE); + /* start handshake */ b = &s->hs_in_buf; b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size); @@ -790,10 +802,7 @@ ngx_rtmp_send(ngx_event_t *wev) ngx_connection_t *c; ngx_rtmp_session_t *s; ngx_rtmp_core_srv_conf_t *cscf; - ngx_chain_t *out, *l, *cl; - u_char *p; - off_t limit; - size_t n; + ngx_int_t n; c = wev->data; s = c->data; @@ -815,78 +824,43 @@ ngx_rtmp_send(ngx_event_t *wev) ngx_del_timer(wev); } - while (s->out) { - p = s->out->buf->pos; + while (s->out_chain) { + n = c->send(c, s->out_bpos, s->out_chain->buf->last - s->out_bpos); - /* send_chain calls writev for output. - * It uses mixed allocation model for - * for iovecs passed to writev. Only 64 - * structs fit into stack. When writing more - * memory is allocated from c->pool and - * **NEVER EVER** returned back. - * IOV_MAX=1024 on Linux. - * - * The only way to escape allocation is - * limiting the number of output data blocks - * being written at once with NGX_HEADERS - * (64 by default). - * - * FIXME: NGINX - * Unfortunately NGINX API does not allow - * us to specify max number of such blocks - * but only size limit. We're left with - * limiting by size which leads to extra - * loop here to find size of first 64 - * blocks in output. - * */ - - limit = 0; - n = 0; - cl = s->out; - while (cl && n < 64) { - ++n; - limit += cl->buf->last - cl->buf->pos; - cl = cl->next; - } - - out = c->send_chain(c, s->out, limit); - - if (out == NGX_CHAIN_ERROR) { + if (n == NGX_ERROR) { ngx_rtmp_finalize_session(s); return; } - if (out == s->out && out->buf->pos == p) { - cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + if (n == NGX_AGAIN || n == 0) { ngx_add_timer(c->write, cscf->timeout); if (ngx_handle_write_event(c->write, 0) != NGX_OK) { ngx_rtmp_finalize_session(s); + return; } - return; } - while (s->out) { - l = s->out; - - if (l->buf->pos < l->buf->last) { - break; - } - - s->out = s->out->next; - l->next = NULL; - - l->next = s->out_free_chains; - s->out_free_chains = l; - - ngx_rtmp_free_shared_buf(cscf, l->buf); - - if (s->out == out) { - break; + s->out_bpos += n; + if (s->out_bpos == s->out_chain->buf->last) { + s->out_chain = s->out_chain->next; + if (s->out_chain == NULL) { + ngx_rtmp_free_shared_chain(cscf, *s->out_pos); + ++s->out_pos; + if (s->out_pos == s->out_end) { + s->out_pos = s->out_start; + } + if (s->out_pos == s->out_last) { + break; + } + s->out_chain = *s->out_pos; } + s->out_bpos = s->out_chain->buf->pos; } } - ngx_del_event(wev, NGX_WRITE_EVENT, 0); + if (wev->active) { + ngx_del_event(wev, NGX_WRITE_EVENT, 0); + } } @@ -1038,68 +1012,42 @@ ngx_int_t ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out, ngx_uint_t priority) { - ngx_chain_t *l, **ll; ngx_connection_t *c; - ngx_buf_t *b; ngx_rtmp_core_srv_conf_t *cscf; - size_t nbytes, nbufs, qbytes, qbufs; + size_t nmsg; c = s->connection; cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - qbytes = 0; - qbufs = 0; - nbytes = 0; - nbufs = 0; - for(ll = &s->out; *ll; ll = &(*ll)->next) { - qbytes += (*ll)->buf->last - (*ll)->buf->pos; - ++qbufs; - } + nmsg = (s->out_pos <= s->out_last) + ? s->out_last - s->out_pos + : (s->out_end - s->out_pos) + (s->out_last - s->out_start); + ++nmsg; - /* drop packet? */ - if (qbytes > cscf->max_buf / (priority + 1)) { - ngx_log_debug3(NGX_LOG_DEBUG_RTMP, c->log, 0, - "drop message bytes=%uz, bufs=%uz priority=%ui", - qbytes, qbufs, priority); + /* drop packet? + * Note we always leave 1 slot free */ + if (nmsg >= (s->out_end - s->out_start) / (priority + 1)) { + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP drop message bufs=%ui, priority=%ui", + nmsg, priority); return NGX_AGAIN; } - /* append locally-linked chain of shared buffers */ - for(l = out; l; l = l->next) { + ngx_rtmp_acquire_shared_chain(out); - if (s->out_free_chains) { - *ll = s->out_free_chains; - s->out_free_chains = (*ll)->next; - - } else { - *ll = ngx_alloc_chain_link(c->pool); - if (*ll == NULL) { - break; - } - (*ll)->buf = ngx_calloc_buf(c->pool); - if ((*ll)->buf == NULL) { - ngx_free_chain(c->pool, (*ll)); - break; - } - } - - b = (*ll)->buf; - *b = *l->buf; - - ngx_rtmp_acquire_shared_buf(b); - - ll = &(*ll)->next; - - nbytes += (b->last - b->pos); - ++nbufs; + *s->out_last++ = out; + if (s->out_last >= s->out_end) { + s->out_last = s->out_start; } - *ll = NULL; - ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP send bytes=%uz+%uz, bufs=%uz+%uz, priority=%ui, " - "ready=%d, active=%d", - qbytes, nbytes, qbufs, nbufs, priority, - c->write->ready, c->write->active); + if (s->out_chain == NULL) { + s->out_chain = out; + s->out_bpos = out->buf->pos; + } + + ngx_log_debug4(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP send nmsg=%ui, priority=%ui, ready=%d, active=%d", + nmsg, priority, c->write->ready, c->write->active); if (!c->write->active) { ngx_rtmp_send(c->write); @@ -1306,10 +1254,12 @@ ngx_rtmp_close_session_handler(ngx_event_t *e) } } - /* release only buffers, links are local - * and will be released as part of pool */ - for (; s->out; s->out = s->out->next) { - ngx_rtmp_free_shared_buf(cscf, s->out->buf); + while (s->out_pos != s->out_last) { + ngx_rtmp_free_shared_chain(cscf, *s->out_pos); + ++s->out_pos; + if (s->out_pos == s->out_end) { + s->out_pos = s->out_start; + } } ngx_rtmp_close_connection(c); diff --git a/ngx_rtmp_live_module.c b/ngx_rtmp_live_module.c index 77db968..350cbe2 100644 --- a/ngx_rtmp_live_module.c +++ b/ngx_rtmp_live_module.c @@ -281,7 +281,7 @@ ngx_rtmp_live_send_abs_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, rc = ngx_rtmp_send_message(s, out, 0); - ngx_rtmp_free_shared_bufs(cscf, out); + ngx_rtmp_free_shared_chain(cscf, out); return rc; } @@ -430,7 +430,7 @@ ngx_rtmp_live_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } } - ngx_rtmp_free_shared_bufs(cscf, out); + ngx_rtmp_free_shared_chain(cscf, out); return NGX_OK; } diff --git a/ngx_rtmp_send.c b/ngx_rtmp_send.c index c71fa8b..ea524b1 100644 --- a/ngx_rtmp_send.c +++ b/ngx_rtmp_send.c @@ -42,7 +42,7 @@ #define NGX_RTMP_USER_END(s) \ ngx_rtmp_prepare_message(s, &__h, NULL, __l); \ rc = ngx_rtmp_send_message(s, __l, 0); \ - ngx_rtmp_free_shared_bufs(__cscf, __l); \ + ngx_rtmp_free_shared_chain(__cscf, __l); \ return rc; @@ -269,7 +269,7 @@ ngx_int_t ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, rc = ngx_rtmp_send_message(s, first, 0); done: - ngx_rtmp_free_shared_bufs(cscf, first); + ngx_rtmp_free_shared_chain(cscf, first); return rc; } diff --git a/ngx_rtmp_shared.c b/ngx_rtmp_shared.c index 421e7b6..728c6c6 100644 --- a/ngx_rtmp_shared.c +++ b/ngx_rtmp_shared.c @@ -37,21 +37,14 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf) } else { - if (cscf->free_chains) { - out = cscf->free_chains; - cscf->free_chains = out->next; + out = ngx_alloc_chain_link(cscf->pool); + if (out == NULL) { + return NULL; + } - } else { - out = ngx_alloc_chain_link(cscf->pool); - if (out == NULL) { - return NULL; - } - - out->buf = ngx_calloc_buf(cscf->pool); - if (out->buf == NULL) { - ngx_free_chain(cscf->pool, out); - return NULL; - } + out->buf = ngx_calloc_buf(cscf->pool); + if (out->buf == NULL) { + return NULL; } size = cscf->chunk_size + NGX_RTMP_MAX_CHUNK_HEADER @@ -60,8 +53,6 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf) b = out->buf; b->start = ngx_palloc(cscf->pool, size); if (b->start == NULL) { - out->next = cscf->free_chains; - cscf->free_chains = out; return NULL; } @@ -82,68 +73,28 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_core_srv_conf_t *cscf) void -ngx_rtmp_free_shared_bufs(ngx_rtmp_core_srv_conf_t *cscf, ngx_chain_t *out) +ngx_rtmp_acquire_shared_chain(ngx_chain_t *in) { - ngx_chain_t *cl; - - while (out) { - cl = out; - out = out->next; - - if (ngx_rtmp_ref_put(cl->buf->start) == 0) { - /* both chain & buf are free; - * put the whole chain in free list */ - cl->next = cscf->free; - cscf->free = cl; - continue; - } - - /* only chain is free; - * buf is still used by somebody & will - * be freed in ngx_rtmp_free_shared_buf */ - cl->next = cscf->free_chains; - cscf->free_chains = cl; - } + ngx_rtmp_ref_get(in->buf->start); } -void -ngx_rtmp_acquire_shared_buf(ngx_buf_t *b) +void +ngx_rtmp_free_shared_chain(ngx_rtmp_core_srv_conf_t *cscf, ngx_chain_t *in) { - ngx_rtmp_ref_get(b->start); -} + ngx_chain_t *cl; - -void -ngx_rtmp_free_shared_buf(ngx_rtmp_core_srv_conf_t *cscf, ngx_buf_t *b) -{ - ngx_chain_t *cl; - - if (ngx_rtmp_ref_put(b->start)) { + if (ngx_rtmp_ref_put(in->buf->start)) { return; } - if (cscf->free_chains) { - cl = cscf->free_chains; - cscf->free_chains = cl->next; - - } else { - cl = ngx_alloc_chain_link(cscf->pool); - if (cl == NULL) { - return; - } - - cl->buf = ngx_calloc_buf(cscf->pool); - if (cl->buf == NULL) { - ngx_free_chain(cscf->pool, cl); + for (cl = in; ; cl = cl->next) { + if (cl->next == NULL) { + cl->next = cscf->free; + cscf->free = in; return; } } - - cl->buf->start = b->start; - cl->buf->end = b->end; - cl->next = cscf->free; - cscf->free = cl; } diff --git a/test/nginx.conf b/test/nginx.conf index dec66f2..016145b 100644 --- a/test/nginx.conf +++ b/test/nginx.conf @@ -22,7 +22,7 @@ rtmp { chunk_size 128; - max_buf 1000000; + max_queue 256; publish_time_fix off;