reimplemented input buffering

This commit is contained in:
Roman Arutyunyan 2012-03-11 20:17:17 +04:00
parent 69bbf67118
commit 6769a5ac5d
4 changed files with 202 additions and 124 deletions

28
TODO
View file

@ -1,19 +1,17 @@
- RTMP replies
- Implement modules support.
Move AMF0 handlers to modules.
Move broadcast to module.
- standard-compliance
- improve handshake
- names
- Implement helpers for issuing
protocol control messages.
- Input buffers:
allocate a continuous block &
allocate input buffers
in it. Upon chunk size change
re-split the same buffer for
new chunk size. Use temp
buffer as temporary
storage of overflow data.
Max chunk size if 65K which is
really a lot.
+ Implement multiplexing on
chunk-stream-id. (Message-Streams
may not be multiplexed within
a Chunk Stream):
"Typically, all messages in
the same chunk stream will come from the same message stream"
- Output buffers:
implement shared (per-loc/srv) buffers
@ -21,6 +19,8 @@
We never change output chunk size.
Chunk buffers should be ref-counted.
- 'packet' <-> 'message'
- implement loc confs (=fms apps)
loc options:
- session buckets

View file

@ -108,7 +108,6 @@ typedef struct {
typedef struct {
uint8_t fmt; /* header format */
uint32_t csid; /* chunk stream id */
uint32_t timestamp;
uint32_t mlen; /* message length */
@ -121,6 +120,12 @@ typedef struct {
#define NGX_RTMP_SUBSCRIBER 0x02
typedef struct ngx_rtmp_stream_t {
ngx_rtmp_packet_hdr_t hdr;
ngx_chain_t *in;
} ngx_rtmp_stream_t;
struct ngx_rtmp_session_s {
uint32_t signature; /* "RTMP" */
@ -132,20 +137,19 @@ struct ngx_rtmp_session_s {
ngx_str_t *addr_text;
ngx_uint_t chunk_size;
ngx_chain_t *free;
/* FIXME: there should probably be a better way
* to store handshake buffers & states
*/
/* handshake */
ngx_buf_t buf;
ngx_uint_t hs_stage;
/* input */
ngx_chain_t *in;
ngx_rtmp_packet_hdr_t in_hdr;
/* input
* stream 0 (reserved by RTMP spec)
* used for free chain link (0) */
ngx_rtmp_stream_t *streams;
uint32_t in_csid;
ngx_uint_t in_chunk_size;
ngx_pool_t *in_pool;
/* output */
ngx_chain_t *out;
@ -168,7 +172,8 @@ typedef struct ngx_rtmp_session_s ngx_rtmp_session_t;
typedef struct {
ngx_msec_t timeout;
ngx_flag_t so_keepalive;
ngx_int_t buffers;
/*ngx_int_t buffers;*/
ngx_int_t max_streams;
ngx_msec_t resolver_timeout;
ngx_resolver_t *resolver;
ngx_rtmp_conf_ctx_t *ctx;
@ -277,22 +282,35 @@ void ngx_rtmp_send_packet(ngx_rtmp_session_t *s,
/* NetConnection methods */
ngx_int_t ngx_rtmp_connect(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_call(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_close(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_createstream(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_connect(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_call(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_close(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_createstream(ngx_rtmp_session_t *s
double trans_id, , ngx_chain_t *l);
/* NetStream methods */
ngx_int_t ngx_rtmp_play(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_play2(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_deletestream(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_closestream(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_receivevideo(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_publish(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_seek(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_pause(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_play(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_play2(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_deletestream(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_closestream(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_receivevideo(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_publish(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_seek(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_pause(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
extern ngx_uint_t ngx_rtmp_max_module;

View file

@ -65,6 +65,13 @@ static ngx_command_t ngx_rtmp_core_commands[] = {
offsetof(ngx_rtmp_core_srv_conf_t, buffers),
NULL },
{ ngx_string("max_streams"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
NGX_RTMP_SRV_CONF_OFFSET,
offsetof(ngx_rtmp_core_srv_conf_t, max_streams),
NULL },
{ ngx_string("resolver"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_1MORE,
ngx_rtmp_core_resolver,
@ -155,6 +162,7 @@ ngx_rtmp_core_create_srv_conf(ngx_conf_t *cf)
cscf->resolver_timeout = NGX_CONF_UNSET_MSEC;
cscf->so_keepalive = NGX_CONF_UNSET;
cscf->buffers = NGX_CONF_UNSET;
conf->max_streams = NGX_CONF_UNSET;
cscf->resolver = NGX_CONF_UNSET_PTR;
@ -174,6 +182,7 @@ ngx_rtmp_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_value(conf->so_keepalive, prev->so_keepalive, 0);
ngx_conf_merge_value(conf->buffers, prev->buffers, 16);
ngx_conf_merge_value(conf->max_streams, prev->max_streams, 16);
ngx_conf_merge_ptr_value(conf->resolver, prev->resolver, NULL);

View file

@ -210,16 +210,21 @@ ngx_rtmp_init_session(ngx_connection_t *c)
return;
}
s->streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t)
* cmcf->max_streams);
if (s->streams == NULL) {
ngx_rtmp_close_session(s);
return;
}
s->chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE;
s->in_pool = ngx_create_pool(NGX_RTMP_HANDSHAKE_SIZE + 1
+ sizeof(ngx_pool_t), c->log);
bufs.size = s->chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
bufs.num = cscf->buffers;
s->free = ngx_create_chain_of_bufs(c->pool, &bufs);
/* start handshake */
b = &s->buf;
size = NGX_RTMP_HANDSHAKE_SIZE + 1;
b->start = b->pos = b->last = ngx_pcalloc(c->pool, size);
b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size);
b->end = b->start + size;
b->temporary = 1;
@ -294,9 +299,11 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
}
/* handshake done */
ngx_pfree(c->pool, s->buf.start);
ngx_reset_pool(s->in_pool);
c->read->handler = ngx_rtmp_recv;
c->write->handler = ngx_rtmp_send;
ngx_rtmp_recv(rev);
}
@ -366,38 +373,65 @@ ngx_rtmp_recv(ngx_event_t *rev)
ngx_connection_t *c;
ngx_rtmp_session_t *s;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_buf_t *b, *bb;
u_char *p, *pp;
ngx_chain_t *lin;
ngx_rtmp_packet_hdr_t *h;
u_char *p;
uint32_t timestamp;
size_t size;
ngx_chain_t *in;
ngx_rtmp_packet_hdr_t *h;
uint8_t fmt;
uint32_t csid;
ngx_rtmp_stream_t *st, st0;
ngx_chain_t *in, *head;
ngx_buf_t *b;
c = rev->data;
s = c->data;
b = NULL;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
if (s->in == NULL) {
if (s->free == NULL) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "no free buffers");
ngx_rtmp_close_session(s);
return;
}
s->in = s->free;
s->free = s->free->next;
s->in->next = NULL;
b = s->in->buf;
b->pos = b->last = b->start;
}
for(;;) {
/* find the last buf */
for(lin = s->in; lin->next; lin = lin->next);
b = lin->buf;
st = &s->streams[s->csid];
if (b->last == b->end) {
ngx_rtmp_close_session(s);
return;
if (st->in == NULL) {
if ((st->in = ngx_alloc_chain_link(c->in_pool)) == NULL
|| (sin->in->buf = ngx_calloc_buf(c->in_pool)) == NULL)
{
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"chain alloc failed");
ngx_rtmp_close_session(s);
return;
}
st->in->next = NULL;
size = s->in_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
st->in->buf->start = ngx_palloc(s->in_pool, size);
if (st->in->buf->start == NULL) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"buf alloc failed");
ngx_rtmp_close_session(s);
return NULL;
}
st->in->buf->flush = 1;
}
h = &st->hdr;
in = st->in;
/* anything remained from last iteration? */
if (b != NULL && b->recycled && b->pos < b->last) {
s->in->buf->last = ngx_movemem(s->in->buf->start, b->pos,
b->last - b->pos);
b->recycled = 0;
s->in->buf->flush = 0;
}
b = in->buf;
if (b->flush) {
b->pos = b->last = b->start;
b->flush = 0;
}
n = c->recv(c, b->last, b->end - b->last);
@ -415,36 +449,65 @@ ngx_rtmp_recv(ngx_event_t *rev)
}
b->last += n;
h = &s->in_hdr;
/* parse headers */
if (b->pos == b->start) {
p = b->pos;
timestamp = h->timestamp;
/* chunk basic header */
h->fmt = (*p >> 6) & 0x03;
h->csid = *p++ & 0x3f;
fmt = (*p >> 6) & 0x03;
csid = *p++ & 0x3f;
if (h->csid == 0) {
if (csid == 0) {
if (b->last - p < 1)
continue;
h->csid = 64;
h->csid += *(uint8_t*)p++;
csid = 64;
csid += *(uint8_t*)p++;
} else if (h->csid == 1) {
} else if (csid == 1) {
if (b->last - p < 2)
continue;
h->csid = 64;
h->csid += *(uint8_t*)p++;
h->csid += (uint32_t)256 * (*(uint8_t*)p++);
csid = 64;
csid += *(uint8_t*)p++;
csid += (uint32_t)256 * (*(uint8_t*)p++);
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP bheader fmt=%d csid=%D",
(int)h->fmt, h->csid);
(int)fmt, csid);
if (h->fmt <= 2 ) {
if (csid >= cscf->max_streams) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"RTMP chunk stream too big: %D >= %D",
csid, cscf->max_streams);
ngx_rtmp_close_session(s);
return;
}
/* link orphan */
if (s->csid == 0) {
/* unlink from stream #0 */
st->in = st->in->next;
/* link to new stream */
s->csid = csid;
st = s->streams[csid];
if (st->in == NULL) {
in->next = in;
} else {
in->next = st->in->next;
st->in->next = in;
}
st->in = in;
h = &st->hdr;
h->csid = csid;
}
/* get previous header to inherit data from */
timestamp = h->timestamp;
if (fmt <= 2 ) {
if (b->last - p < 3)
continue;
/* timestamp:
@ -455,7 +518,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
pp[0] = *p++;
pp[3] = 0;
if (h->fmt <= 1) {
if (mt <= 1) {
if (b->last - p < 4)
continue;
/* size:
@ -469,7 +532,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
pp[3] = 0;
h->type = *(uint8_t*)p++;
if (h->fmt == 0) {
if (fmt == 0) {
if (b->last - p < 4)
continue;
/* stream:
@ -508,49 +571,37 @@ ngx_rtmp_recv(ngx_event_t *rev)
b->pos = p;
}
/* parse payload */
if (b->last - b->pos < (ngx_int_t)ngx_min(h->mlen, s->chunk_size))
size = b->last - b->pos;
if (size < (ngx_int_t)ngx_min(h->mlen, s->chunk_size))
continue;
/* if fragmented then wait for more fragments */
if (h->mlen > s->chunk_size) {
if (s->free == NULL) {
ngx_log_error(NGX_LOG_INFO, c->log,
NGX_ERROR, "no free buffers");
/* buffer is ready */
b->flush = 1;
if (h->mlen > s->in_chunk_size) {
/* collect fragmented chunks */
h->mlen -= s->in_chunk_size;
b->pos += s->in_chunk_size;
} else {
/* handle! */
head = st->in->next;
st->in->next = NULL;
if (ngx_rtmp_receive_packet(s, h, head) != NGX_OK) {
ngx_rtmp_close_session(s);
return;
}
lin->next = s->free;
s->free = s->free->next;
lin = lin->next;
lin->next = NULL;
h->mlen -= s->chunk_size;
bb = lin->buf;
bb->pos = bb->last = bb->start;
continue;
b->pos += h->mlen;
/* add used bufs to stream #0 */
st0 = &s->streams[0];
st->in->next = st0->in->next;
st0->in->next = head;
}
/* handle packet! */
if (ngx_rtmp_receive_packet(s, h, s->in) != NGX_OK) {
ngx_rtmp_close_session(s);
return;
}
bb = s->in->buf;
bb->pos = bb->last = bb->start;
/* copy remained data to first buffer */
if (h->mlen < b->last - b->pos) {
bb->last = ngx_movemem(bb->start,
b->pos + h->mlen,
b->last - b->pos - h->mlen);
}
/* free all but one input buffer */
if (s->in->next) {
s->in->next->next = s->free;
s->free = s->in->next;
s->in->next = NULL;
}
s->csid = 0;
b->recycled = 1;
}
}
@ -860,11 +911,10 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s,
} ping;
ngx_rtmp_session_t *ss;
static char invoke_name[64];
static ngx_rtmp_amf0_elt_t invoke_name_elt = {
NGX_RTMP_AMF0_STRING,
NULL,
invoke_name,
sizeof(invoke_name)
static double trans_id;
static ngx_rtmp_amf0_elt_t invoke_name_elt[] = {
{ NGX_RTMP_AMF0_STRING, NULL, invoke_name, sizeof(invoke_name) },
{ NGX_RTMP_AMF0_STRING, NULL, &trans_id, sizeof(trans_id) }
};
ngx_rtmp_amf0_ctx_t amf_ctx;
@ -1007,7 +1057,7 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s,
#define _CMD_CALL(name) \
if (!strcasecmp(invoke_name, #name)) { \
return ngx_rtmp_##name(s, l); \
return ngx_rtmp_##name(s, trans_id, l); \
}
/* NetConnection calls */
@ -1045,6 +1095,7 @@ void
ngx_rtmp_close_session(ngx_rtmp_session_t *s)
{
ngx_rtmp_leave(s);
ngx_destroy_pool(s->in_pool);
ngx_rtmp_close_connection(s->connection);
}