From 4602695d5cc0497bcebfc92aa1dfa44e3cb16708 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Tue, 13 Mar 2012 09:41:51 +0400 Subject: [PATCH] implemented broadcast --- TODO | 9 +- ngx_rtmp.c | 24 +++--- ngx_rtmp.h | 22 ++--- ngx_rtmp_amf0.c | 6 +- ngx_rtmp_amf0.h | 4 +- ngx_rtmp_broadcast_module.c | 128 ++++++++++++++++++++++++----- ngx_rtmp_core_module.c | 13 ++- ngx_rtmp_handler.c | 77 ++++++++--------- ngx_rtmp_receive.c | 22 ++--- ngx_rtmp_send.c | 159 +++++++++++++++++------------------- 10 files changed, 277 insertions(+), 187 deletions(-) diff --git a/TODO b/TODO index 761ba50..41f0fbf 100644 --- a/TODO +++ b/TODO @@ -4,12 +4,11 @@ - remove macros hell from ngx_rtmp_send.c +- packet dropping + +- shortcuts for big-endian copy + - implement loc confs (=fms apps) loc options: - - session buckets - broadcast/file - - input buffer size per connection - - output chunk size - - output chunk buffer size - - output header buffer size - HTTP callbacks on invoke calls diff --git a/ngx_rtmp.c b/ngx_rtmp.c index 26e8d7c..2f70208 100644 --- a/ngx_rtmp.c +++ b/ngx_rtmp.c @@ -22,7 +22,7 @@ static ngx_int_t ngx_rtmp_add_addrs6(ngx_conf_t *cf, ngx_rtmp_port_t *mport, static ngx_int_t ngx_rtmp_cmp_conf_addrs(const void *one, const void *two); static ngx_int_t ngx_rtmp_init_events(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf); -static ngx_int_t ngx_rtmp_init_phase_handlers(ngx_conf_t *cf, +static ngx_int_t ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf); @@ -262,6 +262,8 @@ ngx_rtmp_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) static ngx_int_t ngx_rtmp_init_events(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) { + size_t n; + for(n = 0; n < NGX_RTMP_MSG_MAX; ++n) { if (ngx_array_init(&cmcf->events[n], cf->pool, 1, sizeof(ngx_rtmp_event_handler_pt)) != NGX_OK) @@ -270,13 +272,13 @@ ngx_rtmp_init_events(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) } } - if (ngx_init_array(&conf->calls, cf->pool, 1, + if (ngx_array_init(&cmcf->calls, cf->pool, 1, sizeof(ngx_hash_key_t)) != NGX_OK) { return NGX_ERROR; } - if (ngx_init_array(&conf->disconnect, cf->pool, 1, + if (ngx_array_init(&cmcf->disconnect, cf->pool, 1, sizeof(ngx_rtmp_disconnect_handler_pt)) != NGX_OK) { return NGX_ERROR; @@ -290,11 +292,11 @@ ngx_rtmp_init_events(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) static ngx_int_t ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) { - ngx_hash_init_t calls_hash; - ngx_event_handler_pt *eh; - ngx_hash_key_t *h; - size_t n; - static size_t pm_events[] = { + ngx_hash_init_t calls_hash; + ngx_rtmp_event_handler_pt *eh; + ngx_hash_key_t *h; + size_t n; + static size_t pm_events[] = { NGX_RTMP_MSG_CHUNK_SIZE, NGX_RTMP_MSG_ABORT, NGX_RTMP_MSG_ACK, @@ -304,7 +306,7 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) }; /* init events */ - for(n = 0; n < sizeof(pm_events) / sizeof(p_events[0]); ++n) { + for(n = 0; n < sizeof(pm_events) / sizeof(pm_events[0]); ++n) { eh = ngx_array_push(&cmcf->events[pm_events[n]]); *eh = ngx_rtmp_protocol_message_handler; } @@ -317,8 +319,8 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) /* init calls */ - for(n = 0; n < cmcf->nelts; ++n) { - h = &cmcf->calls.elts[n]; + h = cmcf->calls.elts; + for(n = 0; n < cmcf->calls.nelts; ++n, ++h) { h->key_hash = ngx_hash_key_lc(h->key.data, h->key.len); } diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 12788d3..80906ca 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -12,6 +12,8 @@ #include #include +#include "ngx_rtmp_amf0.h" + typedef struct { void **main_conf; @@ -153,7 +155,7 @@ typedef struct ngx_rtmp_stream_t { } ngx_rtmp_stream_t; -typedef struct { +typedef struct ngx_rtmp_session_s { uint32_t signature; /* "RTMP" */ ngx_connection_t *connection; @@ -194,7 +196,7 @@ typedef struct { ngx_array_t events[NGX_RTMP_MSG_MAX]; ngx_hash_t calls_hash; ngx_array_t calls; - ngx_array_t disconect; + ngx_array_t disconnect; } ngx_rtmp_core_main_conf_t; @@ -203,7 +205,7 @@ typedef struct { ngx_flag_t so_keepalive; ngx_int_t max_streams; - ngx_uint_t out_chunk_size; + ngx_int_t out_chunk_size; ngx_pool_t *out_pool; ngx_chain_t *out_free; @@ -260,11 +262,11 @@ u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len); /* Receiving messages */ -ngx_int_t ngx_rtmp_protocol_message_handler(ngx_session_t *s, +ngx_int_t ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in); -ngx_int_t ngx_rtmp_user_message_handler(ngx_session_t *s, +ngx_int_t ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in); -ngx_int_t ngx_rtmp_amf0_message_handler(ngx_session_t *s, +ngx_int_t ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in); /* Sending messages */ @@ -286,7 +288,7 @@ ngx_int_t ngx_rtmp_send_ack(ngx_rtmp_session_t *s, uint32_t seq); ngx_int_t ngx_rtmp_send_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size); -ngx_int_t ngx_rtmp_send_bandwidth(ngx_rtmp_session_r *s, +ngx_int_t ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size, uint8_t limit_type); /* User control messages */ @@ -306,11 +308,11 @@ ngx_int_t ngx_rtmp_send_user_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp); /* AMF0 sender/receiver */ -ngx_int_t ngx_rtmp_send_amf0(ngx_session_t *s, +ngx_int_t ngx_rtmp_send_amf0(ngx_rtmp_session_t *s, uint32_t csid, uint32_t msid, ngx_rtmp_amf0_elt_t *elts, size_t nelts); -ngx_int_t ngx_rtmp_receive_amf0(ngx_session_t *s, ngx_chain_t *in, - ngx_rtmp_amf0_elt_t *elts, size_t nelts) +ngx_int_t ngx_rtmp_receive_amf0(ngx_rtmp_session_t *s, ngx_chain_t *in, + ngx_rtmp_amf0_elt_t *elts, size_t nelts); extern ngx_uint_t ngx_rtmp_max_module; diff --git a/ngx_rtmp_amf0.c b/ngx_rtmp_amf0.c index 50f063e..4971fe0 100644 --- a/ngx_rtmp_amf0.c +++ b/ngx_rtmp_amf0.c @@ -57,7 +57,7 @@ ngx_rtmp_amf0_get(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n) { ngx_buf_t *b; size_t size; - ngx_chain_t **l; + ngx_chain_t *l; #ifdef NGX_DEBUG void *op = p; #endif @@ -65,9 +65,9 @@ ngx_rtmp_amf0_get(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n) if (!n) return NGX_OK; - for(l = ctx->link; *l; l = &(*l)->next) { + for(l = ctx->link; l; l = l->next) { - b = (*l)->buf; + b = l->buf; if (b->last > n + b->pos) { if (p) { diff --git a/ngx_rtmp_amf0.h b/ngx_rtmp_amf0.h index 1db5e43..5d4fb06 100644 --- a/ngx_rtmp_amf0.h +++ b/ngx_rtmp_amf0.h @@ -27,7 +27,9 @@ typedef struct { } ngx_rtmp_amf0_elt_t; -typedef ngx_chain_t * (*ngx_rtmp_amf0_alloc_pt) +struct ngx_rtmp_session_s; + +typedef ngx_chain_t * (*ngx_rtmp_amf0_alloc_pt)(struct ngx_rtmp_session_s *s); typedef struct { ngx_chain_t *link, *first; diff --git a/ngx_rtmp_broadcast_module.c b/ngx_rtmp_broadcast_module.c index 88a0b59..a58bb65 100644 --- a/ngx_rtmp_broadcast_module.c +++ b/ngx_rtmp_broadcast_module.c @@ -60,10 +60,8 @@ ngx_module_t ngx_rtmp_broadcast_module = { }; -#define NGX_RTMP_PUBLISHER 0x01 -#define NGX_RTMP_SUBSCRIBER 0x02 - -#define NGX_RTMP_SESSION_HASH_SIZE 16384 +#define NGX_RTMP_BROADCAST_PUBLISHER 0x01 +#define NGX_RTMP_BROADCAST_SUBSCRIBER 0x02 typedef struct ngx_rtmp_broadcast_ctx_s { @@ -107,11 +105,13 @@ ngx_rtmp_broadcast_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) static ngx_rtmp_broadcast_ctx_t ** -ngx_rtmp_broadcast_get_head(ngx_rtmp_broadcast_ctx_t *ctx) +ngx_rtmp_broadcast_get_head(ngx_rtmp_session_t *s) { ngx_rtmp_broadcast_srv_conf_t *bscf; + ngx_rtmp_broadcast_ctx_t *ctx; bscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_broadcast_module); + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module); return &bscf->contexts[ ngx_hash_key(ctx->stream.data, ctx->stream.len) @@ -143,15 +143,15 @@ ngx_rtmp_broadcast_join(ngx_rtmp_session_t *s, ngx_str_t *stream, ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, "join broadcast stream '%V'", &stream); - s->stream = *stream; - hctx = ngx_rtmp_broadcast_get_head(ctx); + ctx->stream = *stream; + hctx = ngx_rtmp_broadcast_get_head(s); ctx->next = *hctx; ctx->flags = flags; *hctx = ctx; } -static void +static ngx_int_t ngx_rtmp_broadcast_leave(ngx_rtmp_session_t *s) { ngx_connection_t *c; @@ -161,34 +161,107 @@ ngx_rtmp_broadcast_leave(ngx_rtmp_session_t *s) ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module); if (ctx == NULL || !ctx->stream.len) { - return; + return NGX_OK; } ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "leave broadcast stream '%V'", &s->stream); + "leave broadcast stream '%V'", &ctx->stream); - hctx = ngx_rtmp_broadcast_get_head(ctx); + hctx = ngx_rtmp_broadcast_get_head(s); ngx_str_null(&ctx->stream); for(; *hctx; hctx = &(*hctx)->next) { if (*hctx == ctx) { *hctx = (*hctx)->next; - return; + break; } } + + return NGX_OK; } -static ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans, +static ngx_int_t +ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in) +{ + ngx_connection_t *c; + ngx_rtmp_broadcast_ctx_t *ctx, *cctx; + ngx_chain_t *out, *l; + u_char *p; + + c = s->connection; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module); + + if (ctx == NULL || !(ctx->flags & NGX_RTMP_BROADCAST_PUBLISHER)) { + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, + "received audio/video from non-publisher"); + return NGX_ERROR; + } + + if (in == NULL || in->buf == NULL) { + return NGX_OK; + } + + /* copy data to output stream */ + out = NULL; + p = in->buf->pos; + + for(;;) { + l = ngx_rtmp_alloc_shared_buf(s); + if (l == NULL || l->buf == NULL) { + return NGX_ERROR; + } + + if (out == NULL) { + out = l; + } + + while (l->buf->end - l->buf->last > in->buf->last - p) { + l->buf->last = ngx_cpymem(l->buf->last, p, + in->buf->last - p); + in = in->next; + p = in->buf->pos; + } + + l->buf->last = ngx_cpymem(l->buf->last, p, + l->buf->end - l->buf->last); + p += (l->buf->end - l->buf->last); + } + + ngx_rtmp_prepare_message(h, out, 0/*fmt*/); + + /* broadcast to all subscribers */ + for(cctx = *ngx_rtmp_broadcast_get_head(s); + cctx; cctx = cctx->next) + { + if (cctx != ctx + && cctx->flags & NGX_RTMP_BROADCAST_SUBSCRIBER + && cctx->stream.len == ctx->stream.len + && !ngx_strncmp(cctx->stream.data, ctx->stream.data, + ctx->stream.len)) + { + ngx_rtmp_send_message(s, out); + } + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans, ngx_chain_t *in) { static double trans; static u_char app[1024]; static u_char url[1024]; + static ngx_str_t app_str; static ngx_rtmp_amf0_elt_t in_cmd[] = { { NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) }, - { NGX_RTMP_AMF0_STRING, "pageUrl", url, sizeof(utl) }, + { NGX_RTMP_AMF0_STRING, "pageUrl", url, sizeof(url) }, }; static ngx_rtmp_amf0_elt_t out_inf[] = { @@ -216,15 +289,19 @@ static ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans, } trans = in_trans; - ngx_str_set(&inf[0], "NetConnection.Connect.Success"); - ngx_str_set(&inf[1], "status"); - ngx_str_set(&inf[2], "Connection succeeded."); + ngx_str_set(&out_inf[0], "NetConnection.Connect.Success"); + ngx_str_set(&out_inf[1], "status"); + ngx_str_set(&out_inf[2], "Connection succeeded."); - ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, "connect() called; app='%s' url='%s'", app, url); - /*ngx_rtmp_broadcast_join(s, app);*/ + if (0) { + app_str.data = app; + app_str.len = ngx_strlen(app); + ngx_rtmp_broadcast_join(s, &app_str, 0); + } return ngx_rtmp_send_ack_size(s, 65536) || ngx_rtmp_send_bandwidth(s, 65536, NGX_RTMP_LIMIT_SOFT) @@ -242,8 +319,10 @@ ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf) ngx_rtmp_core_main_conf_t *cmcf; ngx_hash_key_t *h; ngx_rtmp_disconnect_handler_pt *dh; + ngx_rtmp_event_handler_pt *avh; + + cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module); - cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_rtmp_module); /* add connect() handler */ h = ngx_array_push(&cmcf->calls); @@ -255,6 +334,15 @@ ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf) ngx_str_set(&h->key, "connect"); h->value = ngx_rtmp_broadcast_connect; + + /* register audio/video broadcast handler */ + avh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AUDIO]); + *avh = ngx_rtmp_broadcast_av; + + avh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]); + *avh = ngx_rtmp_broadcast_av; + + /* add disconnect handler */ dh = ngx_array_push(&cmcf->disconnect); diff --git a/ngx_rtmp_core_module.c b/ngx_rtmp_core_module.c index 9d49707..178de04 100644 --- a/ngx_rtmp_core_module.c +++ b/ngx_rtmp_core_module.c @@ -130,20 +130,19 @@ ngx_rtmp_core_create_main_conf(ngx_conf_t *cf) static void * ngx_rtmp_core_create_srv_conf(ngx_conf_t *cf) { - ngx_rtmp_core_srv_conf_t *cscf; - size_t n + ngx_rtmp_core_srv_conf_t *conf; - cscf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_core_srv_conf_t)); - if (cscf == NULL) { + conf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_core_srv_conf_t)); + if (conf == NULL) { return NULL; } - cscf->timeout = NGX_CONF_UNSET_MSEC; - cscf->so_keepalive = NGX_CONF_UNSET; + conf->timeout = NGX_CONF_UNSET_MSEC; + conf->so_keepalive = NGX_CONF_UNSET; conf->max_streams = NGX_CONF_UNSET; conf->out_chunk_size = NGX_CONF_UNSET; - return cscf; + return conf; } diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index de344d4..b15d342 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -20,6 +20,8 @@ static void ngx_rtmp_handshake_send(ngx_event_t *rev); static void ngx_rtmp_recv(ngx_event_t *rev); static void ngx_rtmp_send(ngx_event_t *rev); +static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in); static void ngx_rtmp_close_connection(ngx_connection_t *c); @@ -196,7 +198,6 @@ ngx_rtmp_init_session(ngx_connection_t *c) { ngx_rtmp_session_t *s; ngx_rtmp_core_srv_conf_t *cscf; - ngx_bufs_t bufs; ngx_buf_t *b; size_t size; @@ -211,13 +212,13 @@ ngx_rtmp_init_session(ngx_connection_t *c) } s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t) - * cmcf->max_streams); + * cscf->max_streams); if (s->in_streams == NULL) { ngx_rtmp_close_session(s); return; } - s->chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE; + s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE; s->in_pool = ngx_create_pool(NGX_RTMP_HANDSHAKE_SIZE + 1 + sizeof(ngx_pool_t), c->log); @@ -373,14 +374,13 @@ ngx_rtmp_recv(ngx_event_t *rev) ngx_connection_t *c; ngx_rtmp_session_t *s; ngx_rtmp_core_srv_conf_t *cscf; - u_char *p; + u_char *p, *pp; uint32_t timestamp; size_t size; - ngx_chain_t *in; - ngx_rtmp_header_t *h; + ngx_rtmp_header_t *h; uint8_t fmt; uint32_t csid; - ngx_rtmp_stream_t *st, st0; + ngx_rtmp_stream_t *st, *st0; ngx_chain_t *in, *head; ngx_buf_t *b; @@ -391,11 +391,11 @@ ngx_rtmp_recv(ngx_event_t *rev) for(;;) { - st = &s->in_streams[s->csid]; + st = &s->in_streams[s->in_csid]; if (st->in == NULL) { if ((st->in = ngx_alloc_chain_link(s->in_pool)) == NULL - || (sin->in->buf = ngx_calloc_buf(s->in_pool)) == NULL) + || (st->in->buf = ngx_calloc_buf(s->in_pool)) == NULL) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "chain alloc failed"); @@ -411,7 +411,7 @@ ngx_rtmp_recv(ngx_event_t *rev) ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "buf alloc failed"); ngx_rtmp_close_session(s); - return NULL; + return; } st->in->buf->flush = 1; } @@ -421,10 +421,10 @@ ngx_rtmp_recv(ngx_event_t *rev) /* anything remained from last iteration? */ if (b != NULL && b->recycled && b->pos < b->last) { - s->in->buf->last = ngx_movemem(s->in->buf->start, b->pos, + st->in->buf->last = ngx_movemem(st->in->buf->start, b->pos, b->last - b->pos); b->recycled = 0; - s->in->buf->flush = 0; + st->in->buf->flush = 0; } b = in->buf; @@ -485,14 +485,14 @@ ngx_rtmp_recv(ngx_event_t *rev) } /* link orphan */ - if (s->csid == 0) { + if (s->in_csid == 0) { /* unlink from stream #0 */ st->in = st->in->next; /* link to new stream */ - s->csid = csid; - st = s->in_streams[csid]; + s->in_csid = csid; + st = &s->in_streams[csid]; if (st->in == NULL) { in->next = in; } else { @@ -518,7 +518,7 @@ ngx_rtmp_recv(ngx_event_t *rev) pp[0] = *p++; pp[3] = 0; - if (mt <= 1) { + if (fmt <= 1) { if (b->last - p < 4) continue; /* size: @@ -554,7 +554,7 @@ ngx_rtmp_recv(ngx_event_t *rev) pp[2] = *p++; pp[1] = *p++; pp[0] = *p++; - } else if (h->fmt) { + } else if (fmt) { h->timestamp += timestamp; } else { h->timestamp = timestamp; @@ -573,7 +573,7 @@ ngx_rtmp_recv(ngx_event_t *rev) size = b->last - b->pos; - if (size < (ngx_int_t)ngx_min(h->mlen, s->chunk_size)) + if (size < ngx_min(h->mlen, s->in_chunk_size)) continue; /* buffer is ready */ @@ -600,18 +600,18 @@ ngx_rtmp_recv(ngx_event_t *rev) st0->in->next = head; } - s->csid = 0; + s->in_csid = 0; b->recycled = 1; } } #define ngx_rtmp_buf_addref(b) \ - (++(ngx_int_t)(b)->tag) + (++*(int*)&(b)->tag) #define ngx_rtmp_buf_release(b) \ - (--(ngx_int_t)(b)->tag) + (--*(int*)&(b)->tag) static void @@ -666,8 +666,8 @@ ngx_rtmp_send(ngx_event_t *wev) /* return buffer to core */ ln = l->next; - l->next = cscf->free; - cscf->free = l; + l->next = cscf->out_free; + cscf->out_free = l; l = ln; } } @@ -723,7 +723,7 @@ ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out, uint8_t fmt) { ngx_chain_t *l; - u_char *p; + u_char *p, *pp; ngx_int_t hsize, thsize, nbufs; uint32_t mlen, timestamp, ext_timestamp; static uint8_t hdrsize[] = { 12, 8, 4, 1 }; @@ -735,13 +735,13 @@ ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out, mlen += (out->buf->last - l->buf->pos); ++nbufs; } - +/* ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0, "RTMP send %s (%d) csid=%D timestamp=%D " "mlen=%D msid=%D nbufs=%d", ngx_rtmp_packet_type(h->type), (int)h->type, h->csid, h->timestamp, mlen, h->msid, nbufs); - +*/ /* determine initial header size */ hsize = hdrsize[fmt]; @@ -778,7 +778,7 @@ ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out, *p++ = (uint8_t)((h->csid - 64) >> 8); } - thsize = p - b->pos; + thsize = p - out->buf->pos; /* message header */ if (fmt <= 2) { @@ -813,9 +813,10 @@ ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out, /* use the smallest fmt (3) for * trailing fragments */ + p = out->buf->pos; for(out = out->next; out; out = out->next) { - out->pos -= hsize; - ngx_memcpy(out->pos, b->pos, thsize); + out->buf->pos -= hsize; + ngx_memcpy(out->buf->pos, p, thsize); } } @@ -834,7 +835,7 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out) for(ll = &s->out; *ll; ll = &(*ll)->next); *ll = out; - ngx_rtmp_send(c->write); + ngx_rtmp_send(s->connection->write); } @@ -845,8 +846,10 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s, ngx_rtmp_core_main_conf_t *cmcf; ngx_array_t *evhs; size_t n; - ngx_rtmp_event_handler_pt evh; + ngx_rtmp_event_handler_pt *evh; + ngx_connection_t *c; + c = s->connection; cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); #ifdef NGX_DEBUG @@ -854,7 +857,7 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s, int nbufs; ngx_chain_t *ch; - for(nbufs = 1, ch = l; + for(nbufs = 1, ch = in; ch->next; ch = ch->next, ++nbufs); @@ -873,13 +876,13 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s, } evhs = &cmcf->events[h->type]; - for(n = 0; n < evhs->nelts; ++n) { - evh = evh->elts[n]; + evh = evhs->elts; + for(n = 0; n < evhs->nelts; ++n, ++evh) { if (!evh) { continue; } - if (evh(s, h, in) != NGX_OK) { + if ((*evh)(s, h, in) != NGX_OK) { return NGX_ERROR; } } @@ -897,8 +900,8 @@ ngx_rtmp_close_session(ngx_rtmp_session_t *s) cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); - for(n = 0; n < cmcf->disconnect.nelts; ++n) { - h = &cmcf->disconnect.elts[n]; + h = cmcf->disconnect.elts; + for(n = 0; n < cmcf->disconnect.nelts; ++n, ++h) { if (*h) { (*h)(s); } diff --git a/ngx_rtmp_receive.c b/ngx_rtmp_receive.c index 50acdab..be90d81 100644 --- a/ngx_rtmp_receive.c +++ b/ngx_rtmp_receive.c @@ -9,7 +9,7 @@ ngx_int_t -ngx_rtmp_protocol_message_handler(ngx_session_t *s, +ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { ngx_buf_t *b; @@ -28,7 +28,7 @@ ngx_rtmp_protocol_message_handler(ngx_session_t *s, return NGX_OK; } - p = &val; + p = (u_char*)&val; p[0] = b->pos[3]; p[1] = b->pos[2]; p[2] = b->pos[1]; @@ -72,7 +72,7 @@ ngx_rtmp_protocol_message_handler(ngx_session_t *s, ngx_int_t -ngx_rtmp_user_message_handler(ngx_session_t *s, +ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { ngx_buf_t *b; @@ -91,11 +91,11 @@ ngx_rtmp_user_message_handler(ngx_session_t *s, return NGX_OK; } - p = &evt; + p = (u_char*)&evt; p[0] = b->pos[1]; p[1] = b->pos[0]; - p = &val; + p = (u_char*)&val; p[0] = b->pos[5]; p[1] = b->pos[4]; p[2] = b->pos[3]; @@ -116,7 +116,7 @@ ngx_rtmp_user_message_handler(ngx_session_t *s, case NGX_RTMP_USER_SET_BUFLEN: if (b->last - b->pos >= 10) { - p = &arg; + p = (u_char*)&arg; p[0] = b->pos[9]; p[1] = b->pos[8]; p[2] = b->pos[7]; @@ -153,7 +153,7 @@ ngx_rtmp_user_message_handler(ngx_session_t *s, ngx_int_t -ngx_rtmp_amf0_message_handler(ngx_session_t *s, +ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) { ngx_rtmp_amf0_ctx_t act; @@ -178,7 +178,7 @@ ngx_rtmp_amf0_message_handler(ngx_session_t *s, act.log = s->connection->log; memset(func, 0, sizeof(func)); - if (ngx_rtmp_amf0_read(&ect, elts, + if (ngx_rtmp_amf0_read(&act, elts, sizeof(elts) / sizeof(elts[0])) != NGX_OK) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, @@ -196,13 +196,13 @@ ngx_rtmp_amf0_message_handler(ngx_session_t *s, ngx_hash_key_lc(func, len), func, len); if (ch) { - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, "AMF0 func '%s' @%f passed to handler", func, trans); return (*ch)(s, trans, in); } - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, "AMF0 cmd '%s' @%f no handler", func, trans); return NGX_OK; @@ -210,7 +210,7 @@ ngx_rtmp_amf0_message_handler(ngx_session_t *s, ngx_int_t -ngx_rtmp_receive_amf0(ngx_session_t *s, ngx_chain_t *in, +ngx_rtmp_receive_amf0(ngx_rtmp_session_t *s, ngx_chain_t *in, ngx_rtmp_amf0_elt_t *elts, size_t nelts) { ngx_rtmp_amf0_ctx_t act; diff --git a/ngx_rtmp_send.c b/ngx_rtmp_send.c index 573d4ae..28d7aba 100644 --- a/ngx_rtmp_send.c +++ b/ngx_rtmp_send.c @@ -3,108 +3,86 @@ */ -#include "ngx_rtmp.h.h" +#include "ngx_rtmp.h" #include "ngx_rtmp_amf0.h" -#define NGX_RTMP_CTL_START(s, type) \ - ngx_rtmp_packet_hdr_t __h; \ +#define NGX_RTMP_USER_START(s, tp) \ + ngx_rtmp_header_t __h; \ ngx_chain_t *__l; \ - u_char *__p; \ + ngx_buf_t *__b; \ \ - memset(&h, 0, sizeof(__h)); \ - __h.type = type; \ + memset(&__h, 0, sizeof(__h)); \ + __h.type = tp; \ __h.csid = 2; \ __l = ngx_rtmp_alloc_shared_buf(s); \ if (__l == NULL) { \ return NGX_ERROR; \ } \ - __p = __l->buf->pos; + __b = __l->buf; #define NGX_RTMP_UCTL_START(s, type, utype) \ - NGX_RTMP_CTL_START(s, type); \ - *(__p->last++) = (u_char)((utype) >> 8); \ - *(__p->last++) = (u_char)(utype); + NGX_RTMP_USER_START(s, type); \ + *(__b->last++) = (u_char)((utype) >> 8); \ + *(__b->last++) = (u_char)(utype); -#define NGX_RTMP_CTL_OUT1(v) \ - *(__p->last++) = ((u_char*)&v)[0]; +#define NGX_RTMP_USER_OUT1(v) \ + *(__b->last++) = ((u_char*)&v)[0]; -#define NGX_RTMP_CTL_OUT4(v) \ - *(__p->last++) = ((u_char*)&v)[3]; \ - *(__p->last++) = ((u_char*)&v)[2]; \ - *(__p->last++) = ((u_char*)&v)[1]; \ - *(__p->last++) = ((u_char*)&v)[0]; +#define NGX_RTMP_USER_OUT4(v) \ + *(__b->last++) = ((u_char*)&v)[3]; \ + *(__b->last++) = ((u_char*)&v)[2]; \ + *(__b->last++) = ((u_char*)&v)[1]; \ + *(__b->last++) = ((u_char*)&v)[0]; -#define NGX_RTMP_CTL_END(s) \ +#define NGX_RTMP_USER_END(s) \ ngx_rtmp_prepare_message(&__h, __l, 0); \ ngx_rtmp_send_message(s, __l); \ return NGX_OK; -#define NGX_RTMP_AMF0_START(s, cs, ms) \ - ngx_rtmp_packet_hdr_t __h; \ - ngx_rtmp_amf0_ctx_t __act; \ - \ - memset(&__act, 0, sizeof(__act)); \ - __act.arg = s; \ - __act.alloc = ngx_rtmp_alloc_shared_buf; \ - __act.log = (s)->connection->log; \ - \ - memset(&__h, 0, sizeof(__h)); \ - __h.type = NGX_RTMP_MSG_AMF0_CMD; \ - __h.csid = cs; \ - __h.msid = ms; - -#define NGX_RTMP_AMF0_END(s) \ - if (__act.first) { \ - ngx_rtmp_prepare_message(&__h, \ - __act.first, 0); \ - ngx_rtmp_send_message(s, __act.first); \ - } \ - return NGX_OK; - /* Protocol control messages */ ngx_int_t ngx_rtmp_send_chunk_size(ngx_rtmp_session_t *s, uint32_t chunk_size) { - NGX_RTMP_CTL_START(s, NGX_RTMP_MSG_CHUNK_SIZE); + NGX_RTMP_USER_START(s, NGX_RTMP_MSG_CHUNK_SIZE); - NGX_RTMP_CTL_OUT4(chunk_size); + NGX_RTMP_USER_OUT4(chunk_size); - NGX_RTMP_CTL_END(s); + NGX_RTMP_USER_END(s); } ngx_int_t ngx_rtmp_send_abort(ngx_rtmp_session_t *s, uint32_t csid) { - NGX_RTMP_CTL_START(s, NGX_RTMP_MSG_CHUNK_SIZE); + NGX_RTMP_USER_START(s, NGX_RTMP_MSG_CHUNK_SIZE); - NGX_RTMP_CTL_OUT4(csid); + NGX_RTMP_USER_OUT4(csid); - NGX_RTMP_CTL_END(s); + NGX_RTMP_USER_END(s); } ngx_int_t ngx_rtmp_send_ack(ngx_rtmp_session_t *s, uint32_t seq) { - NGX_RTMP_CTL_START(s, NGX_RTMP_MSG_ACK); + NGX_RTMP_USER_START(s, NGX_RTMP_MSG_ACK); - NGX_RTMP_CTL_OUT4(seq); + NGX_RTMP_USER_OUT4(seq); - NGX_RTMP_CTL_END(s); + NGX_RTMP_USER_END(s); } ngx_int_t ngx_rtmp_send_ack_size(ngx_rtmp_session_t *s, uint32_t ack_size) { - NGX_RTMP_CTL_START(s, NGX_RTMP_MSG_ACK_SIZE); + NGX_RTMP_USER_START(s, NGX_RTMP_MSG_ACK_SIZE); - NGX_RTMP_CTL_OUT4(ack_size); + NGX_RTMP_USER_OUT4(ack_size); - NGX_RTMP_CTL_END(s); + NGX_RTMP_USER_END(s); } @@ -112,12 +90,12 @@ ngx_int_t ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size, uint8_t limit_type) { - NGX_RTMP_CTL_START(s, NGX_RTMP_MSG_BANDWIDTH); + NGX_RTMP_USER_START(s, NGX_RTMP_MSG_BANDWIDTH); - NGX_RTMP_CTL_OUT4(ack_size); - NGX_RTMP_CTL_OUT1(limit_type); + NGX_RTMP_USER_OUT4(ack_size); + NGX_RTMP_USER_OUT1(limit_type); - NGX_RTMP_CTL_END(s); + NGX_RTMP_USER_END(s); } @@ -125,33 +103,33 @@ ngx_rtmp_send_bandwidth(ngx_rtmp_session_t *s, uint32_t ack_size, ngx_int_t ngx_rtmp_send_user_stream_begin(ngx_rtmp_session_t *s, uint32_t msid) { - NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_CTL, NGX_RTMP_CTL_STREAM_BEGIN); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_BEGIN); - NGX_RTMP_CTL_OUT4(msid); + NGX_RTMP_USER_OUT4(msid); - NGX_RTMP_CTL_END(s); + NGX_RTMP_USER_END(s); } ngx_int_t ngx_rtmp_send_user_stream_eof(ngx_rtmp_session_t *s, uint32_t msid) { - NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_CTL, NGX_RTMP_CTL_STREAM_EOF); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_EOF); - NGX_RTMP_CTL_OUT4(msid); + NGX_RTMP_USER_OUT4(msid); - NGX_RTMP_CTL_END(s); + NGX_RTMP_USER_END(s); } ngx_int_t ngx_rtmp_send_user_stream_dry(ngx_rtmp_session_t *s, uint32_t msid) { - NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_CTL, NGX_RTMP_CTL_STREAM_DRY); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_STREAM_DRY); - NGX_RTMP_CTL_OUT4(msid); + NGX_RTMP_USER_OUT4(msid); - NGX_RTMP_CTL_END(s); + NGX_RTMP_USER_END(s); } @@ -159,58 +137,75 @@ ngx_int_t ngx_rtmp_send_user_set_buflen(ngx_rtmp_session_t *s, uint32_t msid, uint32_t buflen_msec) { - NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_CTL, NGX_RTMP_CTL_SET_BUFLEN); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_SET_BUFLEN); - NGX_RTMP_CTL_OUT4(msid); - NGX_RTMP_CTL_OUT4(buflen_msec); + NGX_RTMP_USER_OUT4(msid); + NGX_RTMP_USER_OUT4(buflen_msec); - NGX_RTMP_CTL_END(s); + NGX_RTMP_USER_END(s); } ngx_int_t ngx_rtmp_send_user_recorded(ngx_rtmp_session_t *s, uint32_t msid) { - NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_CTL, NGX_RTMP_CTL_RECORDED); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_RECORDED); - NGX_RTMP_CTL_OUT4(msid); + NGX_RTMP_USER_OUT4(msid); - NGX_RTMP_CTL_END(s); + NGX_RTMP_USER_END(s); } ngx_int_t ngx_rtmp_send_user_ping_request(ngx_rtmp_session_t *s, uint32_t timestamp) { - NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_CTL, NGX_RTMP_CTL_PING_REQUEST); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_PING_REQUEST); - NGX_RTMP_CTL_OUT4(timestamp); + NGX_RTMP_USER_OUT4(timestamp); - NGX_RTMP_CTL_END(s); + NGX_RTMP_USER_END(s); } ngx_int_t ngx_rtmp_send_user_ping_response(ngx_rtmp_session_t *s, uint32_t timestamp) { - NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_CTL, NGX_RTMP_CTL_PING_RESPONSE); + NGX_RTMP_UCTL_START(s, NGX_RTMP_MSG_USER, NGX_RTMP_USER_PING_RESPONSE); - NGX_RTMP_CTL_OUT4(timestamp); + NGX_RTMP_USER_OUT4(timestamp); - NGX_RTMP_CTL_END(s); + NGX_RTMP_USER_END(s); } + /* AMF0 sender */ ngx_int_t -ngx_rtmp_send_amf0(ngx_session_t *s, uint32_t csid, uint32_t msid, +ngx_rtmp_send_amf0(ngx_rtmp_session_t *s, uint32_t csid, uint32_t msid, ngx_rtmp_amf0_elt_t *elts, size_t nelts) { - NGX_RTMP_AMF0_START(s, csid, msid); + ngx_rtmp_header_t h; + ngx_rtmp_amf0_ctx_t act; - if (ngx_rtmp_amf0_write(&__act, elts, nelts)) { + memset(&act, 0, sizeof(act)); + act.arg = s; + act.alloc = ngx_rtmp_alloc_shared_buf; + act.log = s->connection->log; + + memset(&h, 0, sizeof(h)); + h.type = NGX_RTMP_MSG_AMF0_CMD; + h.csid = csid; + h.msid = msid; + + if (ngx_rtmp_amf0_write(&act, elts, nelts) != NGX_OK) { return NGX_ERROR; } - NGX_RTMP_AMF0_END(s); + if (act.first) { + ngx_rtmp_prepare_message(&h, act.first, 0); + ngx_rtmp_send_message(s, act.first); + } + + return NGX_OK; }