diff --git a/TODO b/TODO index 9ce0087..e869c05 100644 --- a/TODO +++ b/TODO @@ -1,12 +1,14 @@ - fix time wrapping problem (% 0x00ffffff) -- add support of flv file streaming +- add support of flv file writing & streaming -- add RTMP authorization (SecureToken) +- check compilation with IPv6 enabled + +- fix broken data_frame + +- add HTTP callbacks for all calls - remove macros hell from ngx_rtmp_send.c - add max message size -- shortcuts for big-endian copy - diff --git a/config b/config index 3559e1c..f7ce9f5 100644 --- a/config +++ b/config @@ -3,6 +3,7 @@ ngx_addon_name="ngx_rtmp_module" CORE_MODULES="$CORE_MODULES ngx_rtmp_module \ ngx_rtmp_core_module \ + ngx_rtmp_access_module \ ngx_rtmp_broadcast_module" NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ @@ -13,4 +14,5 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ $ngx_addon_dir/ngx_rtmp_amf0.c \ $ngx_addon_dir/ngx_rtmp_send.c \ $ngx_addon_dir/ngx_rtmp_receive.c \ + $ngx_addon_dir/ngx_rtmp_access_module.c \ $ngx_addon_dir/ngx_rtmp_broadcast_module.c" diff --git a/ngx_rtmp.c b/ngx_rtmp.c index c1778af..4d0bb89 100644 --- a/ngx_rtmp.c +++ b/ngx_rtmp.c @@ -264,27 +264,20 @@ ngx_rtmp_init_events(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) { size_t n; - for(n = 0; n < NGX_RTMP_MSG_MAX; ++n) { + for(n = 0; n < NGX_RTMP_MAX_EVENT; ++n) { if (ngx_array_init(&cmcf->events[n], cf->pool, 1, - sizeof(ngx_rtmp_event_handler_pt)) != NGX_OK) + sizeof(ngx_rtmp_handler_pt)) != NGX_OK) { return NGX_ERROR; } } if (ngx_array_init(&cmcf->amf0, cf->pool, 1, - sizeof(ngx_hash_key_t)) != NGX_OK) + sizeof(ngx_rtmp_amf0_handler_t)) != NGX_OK) { return NGX_ERROR; } - if (ngx_array_init(&cmcf->disconnect, cf->pool, 1, - sizeof(ngx_rtmp_disconnect_handler_pt)) != NGX_OK) - { - return NGX_ERROR; - } - - return NGX_OK; } @@ -293,9 +286,10 @@ static ngx_int_t ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) { ngx_hash_init_t calls_hash; - ngx_rtmp_event_handler_pt *eh; - ngx_hash_key_t *h; - size_t n; + ngx_rtmp_handler_pt *eh; + ngx_rtmp_amf0_handler_t *h; + ngx_hash_key_t *ha; + size_t n, m; static size_t pm_events[] = { NGX_RTMP_MSG_CHUNK_SIZE, @@ -304,40 +298,66 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) NGX_RTMP_MSG_ACK_SIZE, NGX_RTMP_MSG_BANDWIDTH }; + static size_t amf0_events[] = { NGX_RTMP_MSG_AMF0_META, NGX_RTMP_MSG_AMF0_SHARED, NGX_RTMP_MSG_AMF0_CMD }; - /* init events */ + /* init standard protocol events */ for(n = 0; n < sizeof(pm_events) / sizeof(pm_events[0]); ++n) { eh = ngx_array_push(&cmcf->events[pm_events[n]]); *eh = ngx_rtmp_protocol_message_handler; } + /* init amf0 events */ for(n = 0; n < sizeof(amf0_events) / sizeof(amf0_events[0]); ++n) { eh = ngx_array_push(&cmcf->events[amf0_events[n]]); *eh = ngx_rtmp_amf0_message_handler; } + /* init user protocol events */ eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_USER]); *eh = ngx_rtmp_user_message_handler; + /* init amf0 callbacks */ + ngx_array_init(&cmcf->amf0_arrays, cf->pool, 1, sizeof(ngx_hash_key_t)); + h = cmcf->amf0.elts; for(n = 0; n < cmcf->amf0.nelts; ++n, ++h) { - h->key_hash = ngx_hash_key_lc(h->key.data, h->key.len); + ha = cmcf->amf0_arrays.elts; + for(m = 0; m < cmcf->amf0_arrays.nelts; ++m, ++ha) { + if (h->name.len == ha->key.len + && !ngx_strncmp(h->name.data, ha->key.data, ha->key.len)) + { + break; + } + } + if (m == cmcf->amf0_arrays.nelts) { + ha = ngx_array_push(&cmcf->amf0_arrays); + ha->key = h->name; + ha->key_hash = ngx_hash_key_lc(ha->key.data, ha->key.len); + ha->value = ngx_array_create(cf->pool, 1, + sizeof(ngx_rtmp_handler_pt)); + if (ha->value == NULL) { + return NGX_ERROR; + } + } + + eh = ngx_array_push((ngx_array_t*)ha->value); + *eh = h->handler; } calls_hash.hash = &cmcf->amf0_hash; calls_hash.key = ngx_hash_key_lc; calls_hash.max_size = 512; calls_hash.bucket_size = ngx_cacheline_size; - calls_hash.name = "calls_hash"; + calls_hash.name = "amf0_hash"; calls_hash.pool = cf->pool; calls_hash.temp_pool = NULL; - if (ngx_hash_init(&calls_hash, cmcf->amf0.elts, cmcf->amf0.nelts) + if (ngx_hash_init(&calls_hash, cmcf->amf0_arrays.elts, cmcf->amf0_arrays.nelts) != NGX_OK) { return NGX_ERROR; diff --git a/ngx_rtmp.h b/ngx_rtmp.h index f3e21f9..b97ab69 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -129,7 +129,11 @@ typedef struct { #define NGX_RTMP_MSG_AMF0_SHARED 19 #define NGX_RTMP_MSG_AMF0_CMD 20 #define NGX_RTMP_MSG_AGGREGATE 22 -#define NGX_RTMP_MSG_MAX 23 +#define NGX_RTMP_MSG_MAX 22 + +#define NGX_RTMP_CONNECT NGX_RTMP_MSG_MAX + 1 +#define NGX_RTMP_DISCONNECT NGX_RTMP_MSG_MAX + 2 +#define NGX_RTMP_MAX_EVENT NGX_RTMP_MSG_MAX + 3 /* RMTP control message types */ @@ -201,21 +205,29 @@ typedef struct { } ngx_rtmp_session_t; -typedef ngx_int_t (*ngx_rtmp_event_handler_pt)(ngx_rtmp_session_t *s, +/* handler result code: + * NGX_ERROR - error + * NGX_OK - success + * NGX_DONE - success, but do not call more handlers on this event */ +typedef ngx_int_t (*ngx_rtmp_handler_pt)(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in); -typedef ngx_int_t (*ngx_rtmp_disconnect_handler_pt)(ngx_rtmp_session_t *s); + + +typedef struct { + ngx_str_t name; + ngx_rtmp_handler_pt handler; +} ngx_rtmp_amf0_handler_t; typedef struct { ngx_array_t servers; /* ngx_rtmp_core_srv_conf_t */ ngx_array_t listen; /* ngx_rtmp_listen_t */ - ngx_array_t events[NGX_RTMP_MSG_MAX]; + ngx_array_t events[NGX_RTMP_MAX_EVENT]; ngx_hash_t amf0_hash; + ngx_array_t amf0_arrays; ngx_array_t amf0; - - ngx_array_t disconnect; } ngx_rtmp_core_main_conf_t; diff --git a/ngx_rtmp_access_module.c b/ngx_rtmp_access_module.c new file mode 100644 index 0000000..03dc38a --- /dev/null +++ b/ngx_rtmp_access_module.c @@ -0,0 +1,447 @@ +/* + * Copyright (c) 2012 Roman Arutyunyan + */ + + +#include +#include +#include "ngx_rtmp.h" + + +#define NGX_RTMP_ACCESS_PUBLISH 0x01 +#define NGX_RTMP_ACCESS_PLAY 0x02 + + +static ngx_int_t ngx_rtmp_access_publish(ngx_rtmp_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in); +static ngx_int_t ngx_rtmp_access_play(ngx_rtmp_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in); +static char * ngx_rtmp_access_rule(ngx_conf_t *cf, ngx_command_t *cmd, + void *conf); +static ngx_int_t ngx_rtmp_access_postconfiguration(ngx_conf_t *cf); +static void * ngx_rtmp_access_create_srv_conf(ngx_conf_t *cf); +static char * ngx_rtmp_access_merge_srv_conf(ngx_conf_t *cf, + void *parent, void *child); + + +static ngx_rtmp_amf0_handler_t ngx_rtmp_access_map[] = { + { ngx_string("publish"), ngx_rtmp_access_publish }, + { ngx_string("play"), ngx_rtmp_access_play }, +}; + + +typedef struct { + in_addr_t mask; + in_addr_t addr; + ngx_uint_t deny; + ngx_uint_t flags; +} ngx_rtmp_access_rule_t; + + +#if (NGX_HAVE_INET6) + +typedef struct { + struct in6_addr addr; + struct in6_addr mask; + ngx_uint_t deny; + ngx_uint_t flags; +} ngx_rtmp_access_rule6_t; + +#endif + + +typedef struct { + ngx_array_t *rules; /* array of ngx_rtmp_access_rule_t */ +#if (NGX_HAVE_INET6) + ngx_array_t *rules6; /* array of ngx_rtmp_access_rule6_t */ +#endif +} ngx_rtmp_access_srv_conf_t; + + +static ngx_command_t ngx_rtmp_access_commands[] = { + + { ngx_string("allow"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1|NGX_CONF_TAKE2, + ngx_rtmp_access_rule, + NGX_RTMP_SRV_CONF_OFFSET, + 0, + NULL }, + + { ngx_string("deny"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1|NGX_CONF_TAKE2, + ngx_rtmp_access_rule, + NGX_RTMP_SRV_CONF_OFFSET, + 0, + NULL }, + + ngx_null_command +}; + + +static ngx_rtmp_module_t ngx_rtmp_access_module_ctx = { + NULL, /* preconfiguration */ + ngx_rtmp_access_postconfiguration, /* postconfiguration */ + NULL, /* create main configuration */ + NULL, /* init main configuration */ + ngx_rtmp_access_create_srv_conf, /* create server configuration */ + ngx_rtmp_access_merge_srv_conf /* merge server configuration */ +}; + + +ngx_module_t ngx_rtmp_access_module = { + NGX_MODULE_V1, + &ngx_rtmp_access_module_ctx, /* module context */ + ngx_rtmp_access_commands, /* module directives */ + NGX_RTMP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +static void * +ngx_rtmp_access_create_srv_conf(ngx_conf_t *cf) +{ + ngx_rtmp_access_srv_conf_t *ascf; + + ascf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_access_srv_conf_t)); + if (ascf == NULL) { + return NULL; + } + + return ascf; +} + + +static char * +ngx_rtmp_access_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) +{ + return NGX_CONF_OK; +} + + +static ngx_int_t +ngx_rtmp_access_found(ngx_rtmp_session_t *s, ngx_uint_t deny) +{ + if (deny) { + ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, + "access forbidden by rule"); + return NGX_ERROR; + } + + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_access_inet(ngx_rtmp_session_t *s, + ngx_rtmp_access_srv_conf_t *ascf, + in_addr_t addr, ngx_uint_t flag) +{ + ngx_uint_t i; + ngx_rtmp_access_rule_t *rule; + + rule = ascf->rules->elts; + for (i = 0; i < ascf->rules->nelts; i++) { + + ngx_log_debug3(NGX_LOG_DEBUG_HTTP, s->connection->log, 0, + "access: %08XD %08XD %08XD", + addr, rule[i].mask, rule[i].addr); + + if ((addr & rule[i].mask) == rule[i].addr + && flag & rule[i].flags) + { + return ngx_rtmp_access_found(s, rule[i].deny); + } + } + + return NGX_OK; +} + + +#if (NGX_HAVE_INET6) + +static ngx_int_t +ngx_rtmp_access_inet6(ngx_rtmp_session_t *s, + ngx_rtmp_srv_conf_t *ascf, + u_char *p, ngx_uint_t flag) +{ + ngx_uint_t n; + ngx_uint_t i; + ngx_rtmp_access_rule6_t *rule6; + + rule6 = ascf->rules6->elts; + for (i = 0; i < ascf->rules6->nelts; i++) { + +#if (NGX_DEBUG) + { + size_t cl, ml, al; + u_char ct[NGX_INET6_ADDRSTRLEN]; + u_char mt[NGX_INET6_ADDRSTRLEN]; + u_char at[NGX_INET6_ADDRSTRLEN]; + + cl = ngx_inet6_ntop(p, ct, NGX_INET6_ADDRSTRLEN); + ml = ngx_inet6_ntop(rule6[i].mask.s6_addr, mt, NGX_INET6_ADDRSTRLEN); + al = ngx_inet6_ntop(rule6[i].addr.s6_addr, at, NGX_INET6_ADDRSTRLEN); + + ngx_log_debug6(NGX_LOG_DEBUG_HTTP, s->connection->log, 0, + "access: %*s %*s %*s", cl, ct, ml, mt, al, at); + } +#endif + + for (n = 0; n < 16; n++) { + if ((p[n] & rule6[i].mask.s6_addr[n]) != rule6[i].addr.s6_addr[n]) { + goto next; + } + } + + if (flag & rule6[i].flags) { + return ngx_rtmp_access_found(r, rule6[i].deny); + } + + next: + continue; + } + + return NGX_OK; +} + +#endif + + +static ngx_int_t +ngx_rtmp_access(ngx_rtmp_session_t *s, ngx_uint_t flag) +{ + struct sockaddr_in *sin; + ngx_rtmp_access_srv_conf_t *ascf; +#if (NGX_HAVE_INET6) + u_char *p; + in_addr_t addr; + struct sockaddr_in6 *sin6; +#endif + + ascf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_access_module); + + switch (s->connection->sockaddr->sa_family) { + + case AF_INET: + if (ascf->rules) { + sin = (struct sockaddr_in *) s->connection->sockaddr; + return ngx_rtmp_access_inet(s, ascf, + sin->sin_addr.s_addr, flag); + } + break; + +#if (NGX_HAVE_INET6) + + case AF_INET6: + sin6 = (struct sockaddr_in6 *) s->connection->sockaddr; + p = sin6->sin6_addr.s6_addr; + + if (ascf->rules && IN6_IS_ADDR_V4MAPPED(&sin6->sin6_addr)) { + addr = p[12] << 24; + addr += p[13] << 16; + addr += p[14] << 8; + addr += p[15]; + return ngx_rtmp_access_inet(r, ascf, htonl(addr), flag); + } + + if (ascf->rules6) { + return ngx_rtmp_access_inet6(r, ascf, p, flag); + } + +#endif + } + + return NGX_OK; + +} + + +static char * +ngx_rtmp_access_rule(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) +{ + ngx_rtmp_access_srv_conf_t *ascf = conf; + + ngx_int_t rc; + ngx_uint_t all; + ngx_str_t *value; + ngx_cidr_t cidr; + ngx_rtmp_access_rule_t *rule; +#if (NGX_HAVE_INET6) + ngx_rtmp_access_rule6_t *rule6; +#endif + size_t n; + ngx_uint_t flags; + + ngx_memzero(&cidr, sizeof(ngx_cidr_t)); + + value = cf->args->elts; + + n = 1; + flags = 0; + + if (cf->args->nelts == 2) { + flags = NGX_RTMP_ACCESS_PUBLISH | NGX_RTMP_ACCESS_PLAY; + + } else { + + for(; n < cf->args->nelts - 1; ++n) { + + if (value[n].len == sizeof("publish") - 1 + && ngx_strcmp(value[1].data, "publish") == 0) + { + flags |= NGX_RTMP_ACCESS_PUBLISH; + continue; + + } + + if (value[n].len == sizeof("play") - 1 + && ngx_strcmp(value[1].data, "play") == 0) + { + flags |= NGX_RTMP_ACCESS_PLAY; + continue; + + } + + ngx_log_error(NGX_LOG_ERR, cf->log, 0, + "unexpected access specified: '%V'", &value[n]); + return NGX_CONF_ERROR; + } + } + + all = (value[n].len == 3 && ngx_strcmp(value[n].data, "all") == 0); + + if (!all) { + + rc = ngx_ptocidr(&value[n], &cidr); + + if (rc == NGX_ERROR) { + ngx_conf_log_error(NGX_LOG_EMERG, cf, 0, + "invalid parameter \"%V\"", &value[1]); + return NGX_CONF_ERROR; + } + + if (rc == NGX_DONE) { + ngx_conf_log_error(NGX_LOG_WARN, cf, 0, + "low address bits of %V are meaningless", &value[1]); + } + } + + switch (cidr.family) { + +#if (NGX_HAVE_INET6) + case AF_INET6: + case 0: /* all */ + + if (cscf->rules6 == NULL) { + cscf->rules6 = ngx_array_create(cf->pool, 4, + sizeof(ngx_rtmp_access_rule6_t)); + if (cscf->rules6 == NULL) { + return NGX_CONF_ERROR; + } + } + + rule6 = ngx_array_push(cscf->rules6); + if (rule6 == NULL) { + return NGX_CONF_ERROR; + } + + rule6->mask = cidr.u.in6.mask; + rule6->addr = cidr.u.in6.addr; + rule6->deny = (value[0].data[0] == 'd') ? 1 : 0; + rule6->flags = flags; + + if (!all) { + break; + } + + /* "all" passes through */ +#endif + + default: /* AF_INET */ + + if (ascf->rules == NULL) { + ascf->rules = ngx_array_create(cf->pool, 4, + sizeof(ngx_rtmp_access_rule_t)); + if (ascf->rules == NULL) { + return NGX_CONF_ERROR; + } + } + + rule = ngx_array_push(ascf->rules); + if (rule == NULL) { + return NGX_CONF_ERROR; + } + + rule->mask = cidr.u.in.mask; + rule->addr = cidr.u.in.addr; + rule->deny = (value[0].data[0] == 'd') ? 1 : 0; + rule->flags = flags; + } + + return NGX_CONF_OK; +} + + +static ngx_int_t +ngx_rtmp_access_init(ngx_rtmp_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in) +{ + return ngx_rtmp_access(s, NGX_RTMP_ACCESS_PUBLISH) == NGX_OK + || ngx_rtmp_access(s, NGX_RTMP_ACCESS_PLAY) == NGX_OK + ? NGX_OK + : NGX_ERROR; +} + + +static ngx_int_t +ngx_rtmp_access_publish(ngx_rtmp_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in) +{ + return ngx_rtmp_access(s, NGX_RTMP_ACCESS_PUBLISH); +} + + +static ngx_int_t +ngx_rtmp_access_play(ngx_rtmp_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in) +{ + return ngx_rtmp_access(s, NGX_RTMP_ACCESS_PLAY); +} + + +static ngx_int_t +ngx_rtmp_access_postconfiguration(ngx_conf_t *cf) +{ + ngx_rtmp_core_main_conf_t *cmcf; + ngx_rtmp_handler_pt *h; + ngx_rtmp_amf0_handler_t *ch, *bh; + size_t n, ncalls; + + cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module); + + /* register event handlers */ + h = ngx_array_push(&cmcf->events[NGX_RTMP_CONNECT]); + *h = ngx_rtmp_access_init; + + /* register AMF0 callbacks */ + ncalls = sizeof(ngx_rtmp_access_map) + / sizeof(ngx_rtmp_access_map[0]); + ch = ngx_array_push_n(&cmcf->amf0, ncalls); + if (h == NULL) { + return NGX_ERROR; + } + + bh = ngx_rtmp_access_map; + for(n = 0; n < ncalls; ++n, ++ch, ++bh) { + *ch = *bh; + } + + return NGX_OK; +} diff --git a/ngx_rtmp_broadcast_module.c b/ngx_rtmp_broadcast_module.c index d61c1f5..91a8af3 100644 --- a/ngx_rtmp_broadcast_module.c +++ b/ngx_rtmp_broadcast_module.c @@ -15,18 +15,12 @@ #define NGX_RTMP_BROADCAST_CSID_VIDEO 7 -/* Frame cutoff */ -#define NGX_RTMP_CUTOFF_ALL 0 -#define NGX_RTMP_CUTOFF_KEY 1 -#define NGX_RTMP_CUTOFF_INTER 2 -#define NGX_RTMP_CUTOFF_DISPOSABLE 3 - - static ngx_int_t ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf); static void * ngx_rtmp_broadcast_create_srv_conf(ngx_conf_t *cf); static char * ngx_rtmp_broadcast_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child); + static ngx_int_t ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in); static ngx_int_t ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s, @@ -41,18 +35,12 @@ static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in); -typedef struct { - ngx_str_t name; - ngx_rtmp_event_handler_pt handler; -} ngx_rtmp_broadcast_map_t; - - -static ngx_rtmp_broadcast_map_t ngx_rtmp_broadcast_map[] = { +static ngx_rtmp_amf0_handler_t ngx_rtmp_broadcast_map[] = { { ngx_string("connect"), ngx_rtmp_broadcast_connect }, { ngx_string("createStream"), ngx_rtmp_broadcast_create_stream }, { ngx_string("publish"), ngx_rtmp_broadcast_publish }, { ngx_string("play"), ngx_rtmp_broadcast_play }, - { ngx_string("@setDataFrame"), ngx_rtmp_broadcast_set_data_frame }, + { ngx_string("-@setDataFrame"), ngx_rtmp_broadcast_set_data_frame }, { ngx_string("releaseStream"), ngx_rtmp_broadcast_ok }, { ngx_string("FCPublish"), ngx_rtmp_broadcast_ok }, { ngx_string("FCSubscribe"), ngx_rtmp_broadcast_ok }, @@ -68,6 +56,7 @@ typedef struct { static ngx_command_t ngx_rtmp_broadcast_commands[] = { + { ngx_string("broadcast_buckets"), NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1, ngx_conf_set_str_slot, @@ -105,6 +94,7 @@ ngx_module_t ngx_rtmp_broadcast_module = { }; +/* session flags */ #define NGX_RTMP_BROADCAST_PUBLISHER 0x01 #define NGX_RTMP_BROADCAST_SUBSCRIBER 0x02 #define NGX_RTMP_BROADCAST_KEYFRAME 0x04 @@ -222,7 +212,7 @@ ngx_rtmp_broadcast_leave(ngx_rtmp_session_t *s) c = s->connection; ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module); - if (ctx == NULL || !ctx->stream.len) { + if (ctx == NULL) { return NGX_OK; } @@ -321,50 +311,52 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, * so save next ptr while it's not too late */ cnext = cctx->next; - if (cctx != ctx - && cctx->flags & NGX_RTMP_BROADCAST_SUBSCRIBER - && cctx->stream.len == ctx->stream.len - && !ngx_strncmp(cctx->stream.data, ctx->stream.data, + if (cctx == ctx + || !(cctx->flags & NGX_RTMP_BROADCAST_SUBSCRIBER) + || cctx->stream.len != ctx->stream.len + || ngx_strncmp(cctx->stream.data, ctx->stream.data, ctx->stream.len)) { - ss = cctx->session; + continue; + } - /* if we have metadata check if the subscriber - * has already received one */ - if (ctx->data_frame && 0 - && !(cctx->flags & NGX_RTMP_BROADCAST_DATA_FRAME)) - { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "sending data_frame"); + ss = cctx->session; - rc = ngx_rtmp_send_message(ss, ctx->data_frame, 0); - if (rc == NGX_ERROR) { - continue; - } + /* if we have metadata check if the subscriber + * has already received one */ + if (ctx->data_frame + && !(cctx->flags & NGX_RTMP_BROADCAST_DATA_FRAME)) + { + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "sending data_frame"); - if (rc == NGX_OK) { - cctx->flags |= NGX_RTMP_BROADCAST_DATA_FRAME; - } - } - - /* waiting for a keyframe? */ - if (cscf->wait_key_frame - && sh.type == NGX_RTMP_MSG_VIDEO - && !(cctx->flags & NGX_RTMP_BROADCAST_KEYFRAME) - && !keyframe) - { + rc = ngx_rtmp_send_message(ss, ctx->data_frame, 0); + if (rc == NGX_ERROR) { continue; } - if (ngx_rtmp_send_message(ss, out, priority) == NGX_OK - && keyframe - && !(cctx->flags & NGX_RTMP_BROADCAST_KEYFRAME)) - { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "keyframe sent"); - cctx->flags |= NGX_RTMP_BROADCAST_KEYFRAME; + if (rc == NGX_OK) { + cctx->flags |= NGX_RTMP_BROADCAST_DATA_FRAME; } } + + /* waiting for a keyframe? */ + if (cscf->wait_key_frame + && sh.type == NGX_RTMP_MSG_VIDEO + && !(cctx->flags & NGX_RTMP_BROADCAST_KEYFRAME) + && !keyframe) + { + continue; + } + + if (ngx_rtmp_send_message(ss, out, priority) == NGX_OK + && keyframe + && !(cctx->flags & NGX_RTMP_BROADCAST_KEYFRAME)) + { + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "keyframe sent"); + cctx->flags |= NGX_RTMP_BROADCAST_KEYFRAME; + } } ngx_rtmp_free_shared_bufs(cscf, out); @@ -373,6 +365,14 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } +static ngx_int_t +ngx_rtmp_broadcast_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in) +{ + return ngx_rtmp_broadcast_leave(s); +} + + static ngx_int_t ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in) @@ -712,7 +712,7 @@ ngx_rtmp_broadcast_set_data_frame(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, sh.msid = NGX_RTMP_BROADCAST_MSID; sh.type = NGX_RTMP_MSG_AMF0_META; - ngx_rtmp_prepare_message(s, h, NULL, ctx->data_frame); + ngx_rtmp_prepare_message(s, &sh, NULL, ctx->data_frame); return NGX_OK; } @@ -745,9 +745,9 @@ ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, } memset(&sh, 0, sizeof(sh)); - sh.csid = h->csid;/*NGX_RTMP_BROADCAST_CSID_AMF0;*/ + sh.csid = h->csid; sh.type = NGX_RTMP_MSG_AMF0_CMD; - sh.msid = 0;//NGX_RTMP_BROADCAST_MSID; + sh.msid = 0; /* send simple _result */ return ngx_rtmp_send_amf0(s, &sh, out_elts, @@ -759,42 +759,33 @@ static ngx_int_t ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf) { ngx_rtmp_core_main_conf_t *cmcf; - ngx_hash_key_t *h; - ngx_rtmp_disconnect_handler_pt *dh; - ngx_rtmp_event_handler_pt *avh; - ngx_rtmp_broadcast_map_t *bm; + ngx_rtmp_handler_pt *h; + ngx_rtmp_amf0_handler_t *ch, *bh; size_t n, ncalls; cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module); - /* register audio/video broadcast handler */ - avh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AUDIO]); - *avh = ngx_rtmp_broadcast_av; + /* register event handlers */ + h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AUDIO]); + *h = ngx_rtmp_broadcast_av; - avh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]); - *avh = ngx_rtmp_broadcast_av; + h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]); + *h = ngx_rtmp_broadcast_av; - /* register disconnect handler */ - dh = ngx_array_push(&cmcf->disconnect); + h = ngx_array_push(&cmcf->events[NGX_RTMP_DISCONNECT]); + *h = ngx_rtmp_broadcast_done; - if (dh == NULL) { - return NGX_ERROR; - } - - *dh = ngx_rtmp_broadcast_leave; - - /* register AMF0 call handlers */ + /* register AMF0 callbacks */ ncalls = sizeof(ngx_rtmp_broadcast_map) / sizeof(ngx_rtmp_broadcast_map[0]); - h = ngx_array_push_n(&cmcf->amf0, ncalls); + ch = ngx_array_push_n(&cmcf->amf0, ncalls); if (h == NULL) { return NGX_ERROR; } - bm = ngx_rtmp_broadcast_map; - for(n = 0; n < ncalls; ++n, ++h, ++bm) { - h->key = bm->name; - h->value = bm->handler; + bh = ngx_rtmp_broadcast_map; + for(n = 0; n < ncalls; ++n, ++ch, ++bh) { + *ch = *bh; } return NGX_OK; diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index 870adef..0979a5d 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -213,10 +213,13 @@ ngx_rtmp_init_connection(ngx_connection_t *c) static void ngx_rtmp_init_session(ngx_connection_t *c) { - ngx_rtmp_session_t *s; - ngx_rtmp_core_srv_conf_t *cscf; - ngx_buf_t *b; - size_t size; + ngx_rtmp_session_t *s; + ngx_rtmp_core_main_conf_t *cmcf; + ngx_rtmp_core_srv_conf_t *cscf; + ngx_buf_t *b; + size_t n, size; + ngx_rtmp_handler_pt *h; + ngx_array_t *ch; s = c->data; @@ -234,7 +237,6 @@ ngx_rtmp_init_session(ngx_connection_t *c) ngx_rtmp_close_connection(c); return; } - size = NGX_RTMP_HANDSHAKE_SIZE + 1; s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE; s->in_pool = ngx_create_pool(4096/*2 * size + sizeof(ngx_pool_t)*/, c->log); @@ -253,6 +255,20 @@ ngx_rtmp_init_session(ngx_connection_t *c) c->write->handler = ngx_rtmp_handshake_send; c->read->handler = ngx_rtmp_handshake_recv; + /* call connect callbacks */ + cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); + + ch = &cmcf->events[NGX_RTMP_CONNECT]; + h = ch->elts; + for(n = 0; n < ch->nelts; ++n, ++h) { + if (*h) { + if ((*h)(s, NULL, NULL) != NGX_OK) { + ngx_rtmp_close_connection(c); + return; + } + } + } + ngx_rtmp_handshake_recv(c->read); } @@ -1061,7 +1077,7 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s, ngx_rtmp_core_main_conf_t *cmcf; ngx_array_t *evhs; size_t n; - ngx_rtmp_event_handler_pt *evh; + ngx_rtmp_handler_pt *evh; ngx_connection_t *c; c = s->connection; @@ -1103,10 +1119,13 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s, ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, "calling handler %d", n); - if ((*evh)(s, h, in) != NGX_OK) { - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "handler %d failed", n); - return NGX_ERROR; + switch ((*evh)(s, h, in)) { + case NGX_ERROR: + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "handler %d failed", n); + return NGX_ERROR; + case NGX_DONE: + return NGX_OK; } } @@ -1121,7 +1140,8 @@ ngx_rtmp_close_connection(ngx_connection_t *c) ngx_pool_t *pool; ngx_rtmp_core_main_conf_t *cmcf; ngx_rtmp_core_srv_conf_t *cscf; - ngx_rtmp_disconnect_handler_pt *h; + ngx_rtmp_handler_pt *h; + ngx_array_t *dh; size_t n; if (c->destroyed) { @@ -1135,10 +1155,12 @@ ngx_rtmp_close_connection(ngx_connection_t *c) ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection"); if (s) { - h = cmcf->disconnect.elts; - for(n = 0; n < cmcf->disconnect.nelts; ++n, ++h) { + dh = &cmcf->events[NGX_RTMP_DISCONNECT]; + h = dh->elts; + + for(n = 0; n < dh->nelts; ++n, ++h) { if (*h) { - (*h)(s); + (*h)(s, NULL, NULL); } } diff --git a/ngx_rtmp_receive.c b/ngx_rtmp_receive.c index 94d9dfa..679736f 100644 --- a/ngx_rtmp_receive.c +++ b/ngx_rtmp_receive.c @@ -163,8 +163,9 @@ ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_amf0_ctx_t act; ngx_connection_t *c; ngx_rtmp_core_main_conf_t *cmcf; - ngx_rtmp_event_handler_pt ch; - size_t len; + ngx_array_t *ch; + ngx_rtmp_handler_pt *ph; + size_t len, n; static u_char func[128]; @@ -190,22 +191,26 @@ ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s, len = ngx_strlen(func); - /* lookup function handler - * only the first handler is called so far - * because ngx_hash_find only returns one item; - * no good to patch NGINX core ;) */ ch = ngx_hash_find(&cmcf->amf0_hash, ngx_hash_strlow(func, func, len), func, len); - if (ch) { + if (ch && ch->nelts) { + ph = ch->elts; + for (n = 0; n < ch->nelts; ++n, ++ph) { + ngx_log_debug3(NGX_LOG_DEBUG_RTMP, c->log, 0, + "AMF0 func '%s' passed to handler %d/%d", + func, n, ch->nelts); + switch ((*ph)(s, h, in)) { + case NGX_ERROR: + return NGX_ERROR; + case NGX_DONE: + return NGX_OK; + } + } + } else { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "AMF0 func '%s' passed to handler", func); - - return ch(s, h, in); - } - - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, "AMF0 cmd '%s' no handler", func); + } return NGX_OK; } diff --git a/test/nginx.conf b/test/nginx.conf index ef31102..ba4b368 100644 --- a/test/nginx.conf +++ b/test/nginx.conf @@ -18,7 +18,7 @@ rtmp { server { - listen 1935; + listen 1935; # wait_key_frame on; @@ -26,6 +26,12 @@ rtmp { max_buf 1000000; +#allow play all; + + allow publish 127.0.0.1; + + deny publish all; + } } @@ -39,6 +45,7 @@ http { listen 8080; location / { +# deny 192.168.33.33; root /home/rarutyunyan/nginx-rtmp-module/test/www; } } diff --git a/test/play.sh b/test/play.sh index 063fffd..e07cd55 100755 --- a/test/play.sh +++ b/test/play.sh @@ -1 +1 @@ -ffplay -loglevel verbose "rtmp://192.168.0.100/helo/pd" +ffplay -loglevel verbose "rtmp://localhost/helo/pd"