implemented chunk size change

This commit is contained in:
Roman Arutyunyan 2012-04-05 21:28:41 +04:00
parent 321c4899cc
commit cf1976cd05
4 changed files with 147 additions and 31 deletions

View file

@ -212,6 +212,9 @@ typedef struct {
uint32_t in_bytes;
uint32_t in_last_ack;
ngx_pool_t *in_old_pool;
ngx_int_t in_chunk_size_changing;
ngx_chain_t *out;
ngx_chain_t *out_free_chains;
} ngx_rtmp_session_t;
@ -334,6 +337,7 @@ void ngx_rtmp_init_connection(ngx_connection_t *c);
void ngx_rtmp_close_connection(ngx_connection_t *c);
u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len);
uint32_t ngx_rtmp_get_timestamp();
ngx_int_t ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size);
/* Bit reverse: we need big-endians in many places */

View file

@ -217,7 +217,7 @@ ngx_rtmp_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_msec_value(conf->timeout, prev->timeout, 60000);
ngx_conf_merge_value(conf->so_keepalive, prev->so_keepalive, 0);
ngx_conf_merge_value(conf->max_streams, prev->max_streams, 16);
ngx_conf_merge_value(conf->max_streams, prev->max_streams, 64);
ngx_conf_merge_value(conf->chunk_size, prev->chunk_size, 4096);
ngx_conf_merge_uint_value(conf->ack_window, prev->ack_window, 5000000);
ngx_conf_merge_size_value(conf->max_buf, prev->max_buf, 128 * 1024);

View file

@ -21,7 +21,8 @@ static void ngx_rtmp_handshake_send(ngx_event_t *rev);
static void ngx_rtmp_recv(ngx_event_t *rev);
static void ngx_rtmp_send(ngx_event_t *rev);
static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
ngx_rtmp_header_t *h, ngx_chain_t *in);
static ngx_int_t ngx_rtmp_finalize_set_chunk_size(ngx_rtmp_session_t *s);
#ifdef NGX_DEBUG
@ -238,9 +239,12 @@ ngx_rtmp_init_session(ngx_connection_t *c)
return;
}
size = NGX_RTMP_HANDSHAKE_SIZE + 1;
s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE;
s->in_pool = ngx_create_pool(4096/*2 * size + sizeof(ngx_pool_t)*/, c->log);
ngx_rtmp_set_chunk_size(s, NGX_RTMP_DEFAULT_CHUNK_SIZE);
/*
s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE;
s->in_pool = ngx_create_pool(4096, c->log);
*/
/* start handshake */
b = &s->hs_in_buf;
b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size);
@ -472,6 +476,33 @@ restart:
}
static ngx_chain_t *
ngx_rtmp_alloc_in_buf(ngx_rtmp_session_t *s)
{
ngx_chain_t *cl;
ngx_buf_t *b;
size_t size;
if ((cl = ngx_alloc_chain_link(s->in_pool)) == NULL
|| (cl->buf = ngx_calloc_buf(s->in_pool)) == NULL)
{
return NULL;
}
cl->next = NULL;
b = cl->buf;
size = s->in_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
b->start = b->last = b->pos = ngx_palloc(s->in_pool, size);
if (b->start == NULL) {
return NULL;
}
b->end = b->start + size;
return cl;
}
void
ngx_rtmp_recv(ngx_event_t *rev)
{
@ -505,32 +536,13 @@ ngx_rtmp_recv(ngx_event_t *rev)
/* allocate new buffer */
if (st->in == NULL) {
if ((st->in = ngx_alloc_chain_link(s->in_pool)) == NULL
|| (st->in->buf = ngx_calloc_buf(s->in_pool)) == NULL)
{
st->in = ngx_rtmp_alloc_in_buf(s);
if (st->in == NULL) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"chain alloc failed");
"in buf alloc failed");
ngx_rtmp_close_connection(c);
return;
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"allocating input buffer %p : %p",
st->in, st->in->buf);
st->in->next = NULL;
b = st->in->buf;
size = s->in_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
b->start = b->last = b->pos = ngx_palloc(s->in_pool, size);
if (b->start == NULL) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"buf alloc failed");
ngx_rtmp_close_connection(c);
return;
}
b->end = b->start + size;
}
h = &st->hdr;
@ -545,6 +557,10 @@ ngx_rtmp_recv(ngx_event_t *rev)
b->pos = b->start;
b->last = ngx_movemem(b->pos, old_pos, old_size);
if (s->in_chunk_size_changing) {
ngx_rtmp_finalize_set_chunk_size(s);
}
} else {
if (old_pos) {
@ -751,11 +767,19 @@ ngx_rtmp_recv(ngx_event_t *rev)
return;
}
/* add used bufs to stream #0 */
st0 = &s->in_streams[0];
st->in->next = st0->in;
st0->in = head;
st->in = NULL;
if (s->in_chunk_size_changing) {
/* copy old data to a new buffer */
if (!old_size) {
ngx_rtmp_finalize_set_chunk_size(s);
}
} else {
/* add used bufs to stream #0 */
st0 = &s->in_streams[0];
st->in->next = st0->in;
st0->in = head;
st->in = NULL;
}
}
s->in_csid = 0;
@ -1150,6 +1174,89 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
}
ngx_int_t
ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_chain_t *li, *fli, *lo, *flo;
ngx_buf_t *bi, *bo;
ngx_int_t n;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"setting chunk_size=%ui", size);
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
s->in_old_pool = s->in_pool;
s->in_chunk_size = size;
s->in_pool = ngx_create_pool(4096, s->connection->log);
/* copy existing chunk data */
if (s->in_old_pool) {
s->in_chunk_size_changing = 1;
s->in_streams[0].in = NULL;
for(n = 1; n < cscf->max_streams; ++n) {
/* stream buffer is circular
* for all streams except for the current one
* (which caused this chunk size change);
* we can simply ignore it */
li = s->in_streams[n].in;
if (li == NULL || li->next == NULL) {
continue;
}
/* move from last to the first */
li = li->next;
fli = li;
lo = ngx_rtmp_alloc_in_buf(s);
if (lo == NULL) {
return NGX_ERROR;
}
flo = lo;
for ( ;; ) {
bi = li->buf;
bo = lo->buf;
if (bo->end - bo->last >= bi->last - bi->pos) {
bo->last = ngx_cpymem(bo->last, bi->pos,
bi->last - bi->pos);
li = li->next;
if (li == fli) {
lo->next = flo;
s->in_streams[n].in = lo;
break;
}
} else {
bo->last = ngx_cpymem(bo->last, bi->pos,
bo->end - bo->last);
bi->pos += (bo->end - bo->last);
}
if (bo->last == bo->end) {
lo->next = ngx_rtmp_alloc_in_buf(s);
if (lo->next == NULL) {
return NGX_ERROR;
}
lo = lo->next;
}
}
}
}
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_finalize_set_chunk_size(ngx_rtmp_session_t *s)
{
if (s->in_chunk_size_changing && s->in_old_pool) {
ngx_destroy_pool(s->in_old_pool);
s->in_old_pool = NULL;
s->in_chunk_size_changing = 0;
}
return NGX_OK;
}
void
ngx_rtmp_close_connection(ngx_connection_t *c)
{
@ -1183,6 +1290,10 @@ ngx_rtmp_close_connection(ngx_connection_t *c)
}
}
if (s->in_old_pool) {
ngx_destroy_pool(s->in_old_pool);
}
if (s->in_pool) {
ngx_destroy_pool(s->in_pool);
}

View file

@ -37,6 +37,7 @@ ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,
switch(h->type) {
case NGX_RTMP_MSG_CHUNK_SIZE:
/* set chunk size =val */
ngx_rtmp_set_chunk_size(s, val);
break;
case NGX_RTMP_MSG_ABORT: