diff --git a/ngx_rtmp.h b/ngx_rtmp.h index d4695ab..d4aa1ec 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -212,6 +212,9 @@ typedef struct { uint32_t in_bytes; uint32_t in_last_ack; + ngx_pool_t *in_old_pool; + ngx_int_t in_chunk_size_changing; + ngx_chain_t *out; ngx_chain_t *out_free_chains; } ngx_rtmp_session_t; @@ -334,6 +337,7 @@ void ngx_rtmp_init_connection(ngx_connection_t *c); void ngx_rtmp_close_connection(ngx_connection_t *c); u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len); uint32_t ngx_rtmp_get_timestamp(); +ngx_int_t ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size); /* Bit reverse: we need big-endians in many places */ diff --git a/ngx_rtmp_core_module.c b/ngx_rtmp_core_module.c index 3f444aa..2a509e8 100644 --- a/ngx_rtmp_core_module.c +++ b/ngx_rtmp_core_module.c @@ -217,7 +217,7 @@ ngx_rtmp_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_msec_value(conf->timeout, prev->timeout, 60000); ngx_conf_merge_value(conf->so_keepalive, prev->so_keepalive, 0); - ngx_conf_merge_value(conf->max_streams, prev->max_streams, 16); + ngx_conf_merge_value(conf->max_streams, prev->max_streams, 64); ngx_conf_merge_value(conf->chunk_size, prev->chunk_size, 4096); ngx_conf_merge_uint_value(conf->ack_window, prev->ack_window, 5000000); ngx_conf_merge_size_value(conf->max_buf, prev->max_buf, 128 * 1024); diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index a539e4d..e0944da 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -21,7 +21,8 @@ static void ngx_rtmp_handshake_send(ngx_event_t *rev); static void ngx_rtmp_recv(ngx_event_t *rev); static void ngx_rtmp_send(ngx_event_t *rev); static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s, - ngx_rtmp_header_t *h, ngx_chain_t *in); + ngx_rtmp_header_t *h, ngx_chain_t *in); +static ngx_int_t ngx_rtmp_finalize_set_chunk_size(ngx_rtmp_session_t *s); #ifdef NGX_DEBUG @@ -238,9 +239,12 @@ ngx_rtmp_init_session(ngx_connection_t *c) return; } size = NGX_RTMP_HANDSHAKE_SIZE + 1; - s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE; - s->in_pool = ngx_create_pool(4096/*2 * size + sizeof(ngx_pool_t)*/, c->log); + ngx_rtmp_set_chunk_size(s, NGX_RTMP_DEFAULT_CHUNK_SIZE); +/* + s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE; + s->in_pool = ngx_create_pool(4096, c->log); +*/ /* start handshake */ b = &s->hs_in_buf; b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size); @@ -472,6 +476,33 @@ restart: } +static ngx_chain_t * +ngx_rtmp_alloc_in_buf(ngx_rtmp_session_t *s) +{ + ngx_chain_t *cl; + ngx_buf_t *b; + size_t size; + + if ((cl = ngx_alloc_chain_link(s->in_pool)) == NULL + || (cl->buf = ngx_calloc_buf(s->in_pool)) == NULL) + { + return NULL; + } + + cl->next = NULL; + b = cl->buf; + size = s->in_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER; + + b->start = b->last = b->pos = ngx_palloc(s->in_pool, size); + if (b->start == NULL) { + return NULL; + } + b->end = b->start + size; + + return cl; +} + + void ngx_rtmp_recv(ngx_event_t *rev) { @@ -505,32 +536,13 @@ ngx_rtmp_recv(ngx_event_t *rev) /* allocate new buffer */ if (st->in == NULL) { - - if ((st->in = ngx_alloc_chain_link(s->in_pool)) == NULL - || (st->in->buf = ngx_calloc_buf(s->in_pool)) == NULL) - { + st->in = ngx_rtmp_alloc_in_buf(s); + if (st->in == NULL) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, - "chain alloc failed"); + "in buf alloc failed"); ngx_rtmp_close_connection(c); return; } - - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, - "allocating input buffer %p : %p", - st->in, st->in->buf); - - st->in->next = NULL; - b = st->in->buf; - size = s->in_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER; - - b->start = b->last = b->pos = ngx_palloc(s->in_pool, size); - if (b->start == NULL) { - ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, - "buf alloc failed"); - ngx_rtmp_close_connection(c); - return; - } - b->end = b->start + size; } h = &st->hdr; @@ -545,6 +557,10 @@ ngx_rtmp_recv(ngx_event_t *rev) b->pos = b->start; b->last = ngx_movemem(b->pos, old_pos, old_size); + if (s->in_chunk_size_changing) { + ngx_rtmp_finalize_set_chunk_size(s); + } + } else { if (old_pos) { @@ -751,11 +767,19 @@ ngx_rtmp_recv(ngx_event_t *rev) return; } - /* add used bufs to stream #0 */ - st0 = &s->in_streams[0]; - st->in->next = st0->in; - st0->in = head; - st->in = NULL; + if (s->in_chunk_size_changing) { + /* copy old data to a new buffer */ + if (!old_size) { + ngx_rtmp_finalize_set_chunk_size(s); + } + + } else { + /* add used bufs to stream #0 */ + st0 = &s->in_streams[0]; + st->in->next = st0->in; + st0->in = head; + st->in = NULL; + } } s->in_csid = 0; @@ -1150,6 +1174,89 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s, } +ngx_int_t +ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size) +{ + ngx_rtmp_core_srv_conf_t *cscf; + ngx_chain_t *li, *fli, *lo, *flo; + ngx_buf_t *bi, *bo; + ngx_int_t n; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "setting chunk_size=%ui", size); + + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + + s->in_old_pool = s->in_pool; + s->in_chunk_size = size; + s->in_pool = ngx_create_pool(4096, s->connection->log); + + /* copy existing chunk data */ + if (s->in_old_pool) { + s->in_chunk_size_changing = 1; + s->in_streams[0].in = NULL; + + for(n = 1; n < cscf->max_streams; ++n) { + /* stream buffer is circular + * for all streams except for the current one + * (which caused this chunk size change); + * we can simply ignore it */ + li = s->in_streams[n].in; + if (li == NULL || li->next == NULL) { + continue; + } + /* move from last to the first */ + li = li->next; + fli = li; + lo = ngx_rtmp_alloc_in_buf(s); + if (lo == NULL) { + return NGX_ERROR; + } + flo = lo; + for ( ;; ) { + bi = li->buf; + bo = lo->buf; + if (bo->end - bo->last >= bi->last - bi->pos) { + bo->last = ngx_cpymem(bo->last, bi->pos, + bi->last - bi->pos); + li = li->next; + if (li == fli) { + lo->next = flo; + s->in_streams[n].in = lo; + break; + } + } else { + bo->last = ngx_cpymem(bo->last, bi->pos, + bo->end - bo->last); + bi->pos += (bo->end - bo->last); + } + if (bo->last == bo->end) { + lo->next = ngx_rtmp_alloc_in_buf(s); + if (lo->next == NULL) { + return NGX_ERROR; + } + lo = lo->next; + } + } + } + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_finalize_set_chunk_size(ngx_rtmp_session_t *s) +{ + if (s->in_chunk_size_changing && s->in_old_pool) { + ngx_destroy_pool(s->in_old_pool); + s->in_old_pool = NULL; + s->in_chunk_size_changing = 0; + } + return NGX_OK; +} + + void ngx_rtmp_close_connection(ngx_connection_t *c) { @@ -1183,6 +1290,10 @@ ngx_rtmp_close_connection(ngx_connection_t *c) } } + if (s->in_old_pool) { + ngx_destroy_pool(s->in_old_pool); + } + if (s->in_pool) { ngx_destroy_pool(s->in_pool); } diff --git a/ngx_rtmp_receive.c b/ngx_rtmp_receive.c index 5be23ad..714e864 100644 --- a/ngx_rtmp_receive.c +++ b/ngx_rtmp_receive.c @@ -37,6 +37,7 @@ ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s, switch(h->type) { case NGX_RTMP_MSG_CHUNK_SIZE: /* set chunk size =val */ + ngx_rtmp_set_chunk_size(s, val); break; case NGX_RTMP_MSG_ABORT: