added access control

This commit is contained in:
Roman Arutyunyan 2012-03-21 19:08:59 +04:00
parent f2746b6ee1
commit 3980a59237
10 changed files with 640 additions and 132 deletions

10
TODO
View file

@ -1,12 +1,14 @@
- fix time wrapping problem (% 0x00ffffff)
- add support of flv file streaming
- add support of flv file writing & streaming
- add RTMP authorization (SecureToken)
- check compilation with IPv6 enabled
- fix broken data_frame
- add HTTP callbacks for all calls
- remove macros hell from ngx_rtmp_send.c
- add max message size
- shortcuts for big-endian copy

2
config
View file

@ -3,6 +3,7 @@ ngx_addon_name="ngx_rtmp_module"
CORE_MODULES="$CORE_MODULES
ngx_rtmp_module \
ngx_rtmp_core_module \
ngx_rtmp_access_module \
ngx_rtmp_broadcast_module"
NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
@ -13,4 +14,5 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
$ngx_addon_dir/ngx_rtmp_amf0.c \
$ngx_addon_dir/ngx_rtmp_send.c \
$ngx_addon_dir/ngx_rtmp_receive.c \
$ngx_addon_dir/ngx_rtmp_access_module.c \
$ngx_addon_dir/ngx_rtmp_broadcast_module.c"

View file

@ -264,27 +264,20 @@ ngx_rtmp_init_events(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
{
size_t n;
for(n = 0; n < NGX_RTMP_MSG_MAX; ++n) {
for(n = 0; n < NGX_RTMP_MAX_EVENT; ++n) {
if (ngx_array_init(&cmcf->events[n], cf->pool, 1,
sizeof(ngx_rtmp_event_handler_pt)) != NGX_OK)
sizeof(ngx_rtmp_handler_pt)) != NGX_OK)
{
return NGX_ERROR;
}
}
if (ngx_array_init(&cmcf->amf0, cf->pool, 1,
sizeof(ngx_hash_key_t)) != NGX_OK)
sizeof(ngx_rtmp_amf0_handler_t)) != NGX_OK)
{
return NGX_ERROR;
}
if (ngx_array_init(&cmcf->disconnect, cf->pool, 1,
sizeof(ngx_rtmp_disconnect_handler_pt)) != NGX_OK)
{
return NGX_ERROR;
}
return NGX_OK;
}
@ -293,9 +286,10 @@ static ngx_int_t
ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
{
ngx_hash_init_t calls_hash;
ngx_rtmp_event_handler_pt *eh;
ngx_hash_key_t *h;
size_t n;
ngx_rtmp_handler_pt *eh;
ngx_rtmp_amf0_handler_t *h;
ngx_hash_key_t *ha;
size_t n, m;
static size_t pm_events[] = {
NGX_RTMP_MSG_CHUNK_SIZE,
@ -304,40 +298,66 @@ ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
NGX_RTMP_MSG_ACK_SIZE,
NGX_RTMP_MSG_BANDWIDTH
};
static size_t amf0_events[] = {
NGX_RTMP_MSG_AMF0_META,
NGX_RTMP_MSG_AMF0_SHARED,
NGX_RTMP_MSG_AMF0_CMD
};
/* init events */
/* init standard protocol events */
for(n = 0; n < sizeof(pm_events) / sizeof(pm_events[0]); ++n) {
eh = ngx_array_push(&cmcf->events[pm_events[n]]);
*eh = ngx_rtmp_protocol_message_handler;
}
/* init amf0 events */
for(n = 0; n < sizeof(amf0_events) / sizeof(amf0_events[0]); ++n) {
eh = ngx_array_push(&cmcf->events[amf0_events[n]]);
*eh = ngx_rtmp_amf0_message_handler;
}
/* init user protocol events */
eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_USER]);
*eh = ngx_rtmp_user_message_handler;
/* init amf0 callbacks */
ngx_array_init(&cmcf->amf0_arrays, cf->pool, 1, sizeof(ngx_hash_key_t));
h = cmcf->amf0.elts;
for(n = 0; n < cmcf->amf0.nelts; ++n, ++h) {
h->key_hash = ngx_hash_key_lc(h->key.data, h->key.len);
ha = cmcf->amf0_arrays.elts;
for(m = 0; m < cmcf->amf0_arrays.nelts; ++m, ++ha) {
if (h->name.len == ha->key.len
&& !ngx_strncmp(h->name.data, ha->key.data, ha->key.len))
{
break;
}
}
if (m == cmcf->amf0_arrays.nelts) {
ha = ngx_array_push(&cmcf->amf0_arrays);
ha->key = h->name;
ha->key_hash = ngx_hash_key_lc(ha->key.data, ha->key.len);
ha->value = ngx_array_create(cf->pool, 1,
sizeof(ngx_rtmp_handler_pt));
if (ha->value == NULL) {
return NGX_ERROR;
}
}
eh = ngx_array_push((ngx_array_t*)ha->value);
*eh = h->handler;
}
calls_hash.hash = &cmcf->amf0_hash;
calls_hash.key = ngx_hash_key_lc;
calls_hash.max_size = 512;
calls_hash.bucket_size = ngx_cacheline_size;
calls_hash.name = "calls_hash";
calls_hash.name = "amf0_hash";
calls_hash.pool = cf->pool;
calls_hash.temp_pool = NULL;
if (ngx_hash_init(&calls_hash, cmcf->amf0.elts, cmcf->amf0.nelts)
if (ngx_hash_init(&calls_hash, cmcf->amf0_arrays.elts, cmcf->amf0_arrays.nelts)
!= NGX_OK)
{
return NGX_ERROR;

View file

@ -129,7 +129,11 @@ typedef struct {
#define NGX_RTMP_MSG_AMF0_SHARED 19
#define NGX_RTMP_MSG_AMF0_CMD 20
#define NGX_RTMP_MSG_AGGREGATE 22
#define NGX_RTMP_MSG_MAX 23
#define NGX_RTMP_MSG_MAX 22
#define NGX_RTMP_CONNECT NGX_RTMP_MSG_MAX + 1
#define NGX_RTMP_DISCONNECT NGX_RTMP_MSG_MAX + 2
#define NGX_RTMP_MAX_EVENT NGX_RTMP_MSG_MAX + 3
/* RMTP control message types */
@ -201,21 +205,29 @@ typedef struct {
} ngx_rtmp_session_t;
typedef ngx_int_t (*ngx_rtmp_event_handler_pt)(ngx_rtmp_session_t *s,
/* handler result code:
* NGX_ERROR - error
* NGX_OK - success
* NGX_DONE - success, but do not call more handlers on this event */
typedef ngx_int_t (*ngx_rtmp_handler_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
typedef ngx_int_t (*ngx_rtmp_disconnect_handler_pt)(ngx_rtmp_session_t *s);
typedef struct {
ngx_str_t name;
ngx_rtmp_handler_pt handler;
} ngx_rtmp_amf0_handler_t;
typedef struct {
ngx_array_t servers; /* ngx_rtmp_core_srv_conf_t */
ngx_array_t listen; /* ngx_rtmp_listen_t */
ngx_array_t events[NGX_RTMP_MSG_MAX];
ngx_array_t events[NGX_RTMP_MAX_EVENT];
ngx_hash_t amf0_hash;
ngx_array_t amf0_arrays;
ngx_array_t amf0;
ngx_array_t disconnect;
} ngx_rtmp_core_main_conf_t;

447
ngx_rtmp_access_module.c Normal file
View file

@ -0,0 +1,447 @@
/*
* Copyright (c) 2012 Roman Arutyunyan
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include "ngx_rtmp.h"
#define NGX_RTMP_ACCESS_PUBLISH 0x01
#define NGX_RTMP_ACCESS_PLAY 0x02
static ngx_int_t ngx_rtmp_access_publish(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
static ngx_int_t ngx_rtmp_access_play(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
static char * ngx_rtmp_access_rule(ngx_conf_t *cf, ngx_command_t *cmd,
void *conf);
static ngx_int_t ngx_rtmp_access_postconfiguration(ngx_conf_t *cf);
static void * ngx_rtmp_access_create_srv_conf(ngx_conf_t *cf);
static char * ngx_rtmp_access_merge_srv_conf(ngx_conf_t *cf,
void *parent, void *child);
static ngx_rtmp_amf0_handler_t ngx_rtmp_access_map[] = {
{ ngx_string("publish"), ngx_rtmp_access_publish },
{ ngx_string("play"), ngx_rtmp_access_play },
};
typedef struct {
in_addr_t mask;
in_addr_t addr;
ngx_uint_t deny;
ngx_uint_t flags;
} ngx_rtmp_access_rule_t;
#if (NGX_HAVE_INET6)
typedef struct {
struct in6_addr addr;
struct in6_addr mask;
ngx_uint_t deny;
ngx_uint_t flags;
} ngx_rtmp_access_rule6_t;
#endif
typedef struct {
ngx_array_t *rules; /* array of ngx_rtmp_access_rule_t */
#if (NGX_HAVE_INET6)
ngx_array_t *rules6; /* array of ngx_rtmp_access_rule6_t */
#endif
} ngx_rtmp_access_srv_conf_t;
static ngx_command_t ngx_rtmp_access_commands[] = {
{ ngx_string("allow"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1|NGX_CONF_TAKE2,
ngx_rtmp_access_rule,
NGX_RTMP_SRV_CONF_OFFSET,
0,
NULL },
{ ngx_string("deny"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1|NGX_CONF_TAKE2,
ngx_rtmp_access_rule,
NGX_RTMP_SRV_CONF_OFFSET,
0,
NULL },
ngx_null_command
};
static ngx_rtmp_module_t ngx_rtmp_access_module_ctx = {
NULL, /* preconfiguration */
ngx_rtmp_access_postconfiguration, /* postconfiguration */
NULL, /* create main configuration */
NULL, /* init main configuration */
ngx_rtmp_access_create_srv_conf, /* create server configuration */
ngx_rtmp_access_merge_srv_conf /* merge server configuration */
};
ngx_module_t ngx_rtmp_access_module = {
NGX_MODULE_V1,
&ngx_rtmp_access_module_ctx, /* module context */
ngx_rtmp_access_commands, /* module directives */
NGX_RTMP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
static void *
ngx_rtmp_access_create_srv_conf(ngx_conf_t *cf)
{
ngx_rtmp_access_srv_conf_t *ascf;
ascf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_access_srv_conf_t));
if (ascf == NULL) {
return NULL;
}
return ascf;
}
static char *
ngx_rtmp_access_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
{
return NGX_CONF_OK;
}
static ngx_int_t
ngx_rtmp_access_found(ngx_rtmp_session_t *s, ngx_uint_t deny)
{
if (deny) {
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"access forbidden by rule");
return NGX_ERROR;
}
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_access_inet(ngx_rtmp_session_t *s,
ngx_rtmp_access_srv_conf_t *ascf,
in_addr_t addr, ngx_uint_t flag)
{
ngx_uint_t i;
ngx_rtmp_access_rule_t *rule;
rule = ascf->rules->elts;
for (i = 0; i < ascf->rules->nelts; i++) {
ngx_log_debug3(NGX_LOG_DEBUG_HTTP, s->connection->log, 0,
"access: %08XD %08XD %08XD",
addr, rule[i].mask, rule[i].addr);
if ((addr & rule[i].mask) == rule[i].addr
&& flag & rule[i].flags)
{
return ngx_rtmp_access_found(s, rule[i].deny);
}
}
return NGX_OK;
}
#if (NGX_HAVE_INET6)
static ngx_int_t
ngx_rtmp_access_inet6(ngx_rtmp_session_t *s,
ngx_rtmp_srv_conf_t *ascf,
u_char *p, ngx_uint_t flag)
{
ngx_uint_t n;
ngx_uint_t i;
ngx_rtmp_access_rule6_t *rule6;
rule6 = ascf->rules6->elts;
for (i = 0; i < ascf->rules6->nelts; i++) {
#if (NGX_DEBUG)
{
size_t cl, ml, al;
u_char ct[NGX_INET6_ADDRSTRLEN];
u_char mt[NGX_INET6_ADDRSTRLEN];
u_char at[NGX_INET6_ADDRSTRLEN];
cl = ngx_inet6_ntop(p, ct, NGX_INET6_ADDRSTRLEN);
ml = ngx_inet6_ntop(rule6[i].mask.s6_addr, mt, NGX_INET6_ADDRSTRLEN);
al = ngx_inet6_ntop(rule6[i].addr.s6_addr, at, NGX_INET6_ADDRSTRLEN);
ngx_log_debug6(NGX_LOG_DEBUG_HTTP, s->connection->log, 0,
"access: %*s %*s %*s", cl, ct, ml, mt, al, at);
}
#endif
for (n = 0; n < 16; n++) {
if ((p[n] & rule6[i].mask.s6_addr[n]) != rule6[i].addr.s6_addr[n]) {
goto next;
}
}
if (flag & rule6[i].flags) {
return ngx_rtmp_access_found(r, rule6[i].deny);
}
next:
continue;
}
return NGX_OK;
}
#endif
static ngx_int_t
ngx_rtmp_access(ngx_rtmp_session_t *s, ngx_uint_t flag)
{
struct sockaddr_in *sin;
ngx_rtmp_access_srv_conf_t *ascf;
#if (NGX_HAVE_INET6)
u_char *p;
in_addr_t addr;
struct sockaddr_in6 *sin6;
#endif
ascf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_access_module);
switch (s->connection->sockaddr->sa_family) {
case AF_INET:
if (ascf->rules) {
sin = (struct sockaddr_in *) s->connection->sockaddr;
return ngx_rtmp_access_inet(s, ascf,
sin->sin_addr.s_addr, flag);
}
break;
#if (NGX_HAVE_INET6)
case AF_INET6:
sin6 = (struct sockaddr_in6 *) s->connection->sockaddr;
p = sin6->sin6_addr.s6_addr;
if (ascf->rules && IN6_IS_ADDR_V4MAPPED(&sin6->sin6_addr)) {
addr = p[12] << 24;
addr += p[13] << 16;
addr += p[14] << 8;
addr += p[15];
return ngx_rtmp_access_inet(r, ascf, htonl(addr), flag);
}
if (ascf->rules6) {
return ngx_rtmp_access_inet6(r, ascf, p, flag);
}
#endif
}
return NGX_OK;
}
static char *
ngx_rtmp_access_rule(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
{
ngx_rtmp_access_srv_conf_t *ascf = conf;
ngx_int_t rc;
ngx_uint_t all;
ngx_str_t *value;
ngx_cidr_t cidr;
ngx_rtmp_access_rule_t *rule;
#if (NGX_HAVE_INET6)
ngx_rtmp_access_rule6_t *rule6;
#endif
size_t n;
ngx_uint_t flags;
ngx_memzero(&cidr, sizeof(ngx_cidr_t));
value = cf->args->elts;
n = 1;
flags = 0;
if (cf->args->nelts == 2) {
flags = NGX_RTMP_ACCESS_PUBLISH | NGX_RTMP_ACCESS_PLAY;
} else {
for(; n < cf->args->nelts - 1; ++n) {
if (value[n].len == sizeof("publish") - 1
&& ngx_strcmp(value[1].data, "publish") == 0)
{
flags |= NGX_RTMP_ACCESS_PUBLISH;
continue;
}
if (value[n].len == sizeof("play") - 1
&& ngx_strcmp(value[1].data, "play") == 0)
{
flags |= NGX_RTMP_ACCESS_PLAY;
continue;
}
ngx_log_error(NGX_LOG_ERR, cf->log, 0,
"unexpected access specified: '%V'", &value[n]);
return NGX_CONF_ERROR;
}
}
all = (value[n].len == 3 && ngx_strcmp(value[n].data, "all") == 0);
if (!all) {
rc = ngx_ptocidr(&value[n], &cidr);
if (rc == NGX_ERROR) {
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
"invalid parameter \"%V\"", &value[1]);
return NGX_CONF_ERROR;
}
if (rc == NGX_DONE) {
ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
"low address bits of %V are meaningless", &value[1]);
}
}
switch (cidr.family) {
#if (NGX_HAVE_INET6)
case AF_INET6:
case 0: /* all */
if (cscf->rules6 == NULL) {
cscf->rules6 = ngx_array_create(cf->pool, 4,
sizeof(ngx_rtmp_access_rule6_t));
if (cscf->rules6 == NULL) {
return NGX_CONF_ERROR;
}
}
rule6 = ngx_array_push(cscf->rules6);
if (rule6 == NULL) {
return NGX_CONF_ERROR;
}
rule6->mask = cidr.u.in6.mask;
rule6->addr = cidr.u.in6.addr;
rule6->deny = (value[0].data[0] == 'd') ? 1 : 0;
rule6->flags = flags;
if (!all) {
break;
}
/* "all" passes through */
#endif
default: /* AF_INET */
if (ascf->rules == NULL) {
ascf->rules = ngx_array_create(cf->pool, 4,
sizeof(ngx_rtmp_access_rule_t));
if (ascf->rules == NULL) {
return NGX_CONF_ERROR;
}
}
rule = ngx_array_push(ascf->rules);
if (rule == NULL) {
return NGX_CONF_ERROR;
}
rule->mask = cidr.u.in.mask;
rule->addr = cidr.u.in.addr;
rule->deny = (value[0].data[0] == 'd') ? 1 : 0;
rule->flags = flags;
}
return NGX_CONF_OK;
}
static ngx_int_t
ngx_rtmp_access_init(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
return ngx_rtmp_access(s, NGX_RTMP_ACCESS_PUBLISH) == NGX_OK
|| ngx_rtmp_access(s, NGX_RTMP_ACCESS_PLAY) == NGX_OK
? NGX_OK
: NGX_ERROR;
}
static ngx_int_t
ngx_rtmp_access_publish(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
return ngx_rtmp_access(s, NGX_RTMP_ACCESS_PUBLISH);
}
static ngx_int_t
ngx_rtmp_access_play(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
return ngx_rtmp_access(s, NGX_RTMP_ACCESS_PLAY);
}
static ngx_int_t
ngx_rtmp_access_postconfiguration(ngx_conf_t *cf)
{
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_handler_pt *h;
ngx_rtmp_amf0_handler_t *ch, *bh;
size_t n, ncalls;
cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module);
/* register event handlers */
h = ngx_array_push(&cmcf->events[NGX_RTMP_CONNECT]);
*h = ngx_rtmp_access_init;
/* register AMF0 callbacks */
ncalls = sizeof(ngx_rtmp_access_map)
/ sizeof(ngx_rtmp_access_map[0]);
ch = ngx_array_push_n(&cmcf->amf0, ncalls);
if (h == NULL) {
return NGX_ERROR;
}
bh = ngx_rtmp_access_map;
for(n = 0; n < ncalls; ++n, ++ch, ++bh) {
*ch = *bh;
}
return NGX_OK;
}

View file

@ -15,18 +15,12 @@
#define NGX_RTMP_BROADCAST_CSID_VIDEO 7
/* Frame cutoff */
#define NGX_RTMP_CUTOFF_ALL 0
#define NGX_RTMP_CUTOFF_KEY 1
#define NGX_RTMP_CUTOFF_INTER 2
#define NGX_RTMP_CUTOFF_DISPOSABLE 3
static ngx_int_t ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf);
static void * ngx_rtmp_broadcast_create_srv_conf(ngx_conf_t *cf);
static char * ngx_rtmp_broadcast_merge_srv_conf(ngx_conf_t *cf,
void *parent, void *child);
static ngx_int_t ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
static ngx_int_t ngx_rtmp_broadcast_create_stream(ngx_rtmp_session_t *s,
@ -41,18 +35,12 @@ static ngx_int_t ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
typedef struct {
ngx_str_t name;
ngx_rtmp_event_handler_pt handler;
} ngx_rtmp_broadcast_map_t;
static ngx_rtmp_broadcast_map_t ngx_rtmp_broadcast_map[] = {
static ngx_rtmp_amf0_handler_t ngx_rtmp_broadcast_map[] = {
{ ngx_string("connect"), ngx_rtmp_broadcast_connect },
{ ngx_string("createStream"), ngx_rtmp_broadcast_create_stream },
{ ngx_string("publish"), ngx_rtmp_broadcast_publish },
{ ngx_string("play"), ngx_rtmp_broadcast_play },
{ ngx_string("@setDataFrame"), ngx_rtmp_broadcast_set_data_frame },
{ ngx_string("-@setDataFrame"), ngx_rtmp_broadcast_set_data_frame },
{ ngx_string("releaseStream"), ngx_rtmp_broadcast_ok },
{ ngx_string("FCPublish"), ngx_rtmp_broadcast_ok },
{ ngx_string("FCSubscribe"), ngx_rtmp_broadcast_ok },
@ -68,6 +56,7 @@ typedef struct {
static ngx_command_t ngx_rtmp_broadcast_commands[] = {
{ ngx_string("broadcast_buckets"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
@ -105,6 +94,7 @@ ngx_module_t ngx_rtmp_broadcast_module = {
};
/* session flags */
#define NGX_RTMP_BROADCAST_PUBLISHER 0x01
#define NGX_RTMP_BROADCAST_SUBSCRIBER 0x02
#define NGX_RTMP_BROADCAST_KEYFRAME 0x04
@ -222,7 +212,7 @@ ngx_rtmp_broadcast_leave(ngx_rtmp_session_t *s)
c = s->connection;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module);
if (ctx == NULL || !ctx->stream.len) {
if (ctx == NULL) {
return NGX_OK;
}
@ -321,50 +311,52 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
* so save next ptr while it's not too late */
cnext = cctx->next;
if (cctx != ctx
&& cctx->flags & NGX_RTMP_BROADCAST_SUBSCRIBER
&& cctx->stream.len == ctx->stream.len
&& !ngx_strncmp(cctx->stream.data, ctx->stream.data,
if (cctx == ctx
|| !(cctx->flags & NGX_RTMP_BROADCAST_SUBSCRIBER)
|| cctx->stream.len != ctx->stream.len
|| ngx_strncmp(cctx->stream.data, ctx->stream.data,
ctx->stream.len))
{
ss = cctx->session;
continue;
}
/* if we have metadata check if the subscriber
* has already received one */
if (ctx->data_frame && 0
&& !(cctx->flags & NGX_RTMP_BROADCAST_DATA_FRAME))
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"sending data_frame");
ss = cctx->session;
rc = ngx_rtmp_send_message(ss, ctx->data_frame, 0);
if (rc == NGX_ERROR) {
continue;
}
/* if we have metadata check if the subscriber
* has already received one */
if (ctx->data_frame
&& !(cctx->flags & NGX_RTMP_BROADCAST_DATA_FRAME))
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"sending data_frame");
if (rc == NGX_OK) {
cctx->flags |= NGX_RTMP_BROADCAST_DATA_FRAME;
}
}
/* waiting for a keyframe? */
if (cscf->wait_key_frame
&& sh.type == NGX_RTMP_MSG_VIDEO
&& !(cctx->flags & NGX_RTMP_BROADCAST_KEYFRAME)
&& !keyframe)
{
rc = ngx_rtmp_send_message(ss, ctx->data_frame, 0);
if (rc == NGX_ERROR) {
continue;
}
if (ngx_rtmp_send_message(ss, out, priority) == NGX_OK
&& keyframe
&& !(cctx->flags & NGX_RTMP_BROADCAST_KEYFRAME))
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"keyframe sent");
cctx->flags |= NGX_RTMP_BROADCAST_KEYFRAME;
if (rc == NGX_OK) {
cctx->flags |= NGX_RTMP_BROADCAST_DATA_FRAME;
}
}
/* waiting for a keyframe? */
if (cscf->wait_key_frame
&& sh.type == NGX_RTMP_MSG_VIDEO
&& !(cctx->flags & NGX_RTMP_BROADCAST_KEYFRAME)
&& !keyframe)
{
continue;
}
if (ngx_rtmp_send_message(ss, out, priority) == NGX_OK
&& keyframe
&& !(cctx->flags & NGX_RTMP_BROADCAST_KEYFRAME))
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"keyframe sent");
cctx->flags |= NGX_RTMP_BROADCAST_KEYFRAME;
}
}
ngx_rtmp_free_shared_bufs(cscf, out);
@ -373,6 +365,14 @@ ngx_rtmp_broadcast_av(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
}
static ngx_int_t
ngx_rtmp_broadcast_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
return ngx_rtmp_broadcast_leave(s);
}
static ngx_int_t
ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
@ -712,7 +712,7 @@ ngx_rtmp_broadcast_set_data_frame(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
sh.msid = NGX_RTMP_BROADCAST_MSID;
sh.type = NGX_RTMP_MSG_AMF0_META;
ngx_rtmp_prepare_message(s, h, NULL, ctx->data_frame);
ngx_rtmp_prepare_message(s, &sh, NULL, ctx->data_frame);
return NGX_OK;
}
@ -745,9 +745,9 @@ ngx_rtmp_broadcast_ok(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
}
memset(&sh, 0, sizeof(sh));
sh.csid = h->csid;/*NGX_RTMP_BROADCAST_CSID_AMF0;*/
sh.csid = h->csid;
sh.type = NGX_RTMP_MSG_AMF0_CMD;
sh.msid = 0;//NGX_RTMP_BROADCAST_MSID;
sh.msid = 0;
/* send simple _result */
return ngx_rtmp_send_amf0(s, &sh, out_elts,
@ -759,42 +759,33 @@ static ngx_int_t
ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf)
{
ngx_rtmp_core_main_conf_t *cmcf;
ngx_hash_key_t *h;
ngx_rtmp_disconnect_handler_pt *dh;
ngx_rtmp_event_handler_pt *avh;
ngx_rtmp_broadcast_map_t *bm;
ngx_rtmp_handler_pt *h;
ngx_rtmp_amf0_handler_t *ch, *bh;
size_t n, ncalls;
cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module);
/* register audio/video broadcast handler */
avh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AUDIO]);
*avh = ngx_rtmp_broadcast_av;
/* register event handlers */
h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AUDIO]);
*h = ngx_rtmp_broadcast_av;
avh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]);
*avh = ngx_rtmp_broadcast_av;
h = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_VIDEO]);
*h = ngx_rtmp_broadcast_av;
/* register disconnect handler */
dh = ngx_array_push(&cmcf->disconnect);
h = ngx_array_push(&cmcf->events[NGX_RTMP_DISCONNECT]);
*h = ngx_rtmp_broadcast_done;
if (dh == NULL) {
return NGX_ERROR;
}
*dh = ngx_rtmp_broadcast_leave;
/* register AMF0 call handlers */
/* register AMF0 callbacks */
ncalls = sizeof(ngx_rtmp_broadcast_map)
/ sizeof(ngx_rtmp_broadcast_map[0]);
h = ngx_array_push_n(&cmcf->amf0, ncalls);
ch = ngx_array_push_n(&cmcf->amf0, ncalls);
if (h == NULL) {
return NGX_ERROR;
}
bm = ngx_rtmp_broadcast_map;
for(n = 0; n < ncalls; ++n, ++h, ++bm) {
h->key = bm->name;
h->value = bm->handler;
bh = ngx_rtmp_broadcast_map;
for(n = 0; n < ncalls; ++n, ++ch, ++bh) {
*ch = *bh;
}
return NGX_OK;

View file

@ -213,10 +213,13 @@ ngx_rtmp_init_connection(ngx_connection_t *c)
static void
ngx_rtmp_init_session(ngx_connection_t *c)
{
ngx_rtmp_session_t *s;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_buf_t *b;
size_t size;
ngx_rtmp_session_t *s;
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_buf_t *b;
size_t n, size;
ngx_rtmp_handler_pt *h;
ngx_array_t *ch;
s = c->data;
@ -234,7 +237,6 @@ ngx_rtmp_init_session(ngx_connection_t *c)
ngx_rtmp_close_connection(c);
return;
}
size = NGX_RTMP_HANDSHAKE_SIZE + 1;
s->in_chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE;
s->in_pool = ngx_create_pool(4096/*2 * size + sizeof(ngx_pool_t)*/, c->log);
@ -253,6 +255,20 @@ ngx_rtmp_init_session(ngx_connection_t *c)
c->write->handler = ngx_rtmp_handshake_send;
c->read->handler = ngx_rtmp_handshake_recv;
/* call connect callbacks */
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
ch = &cmcf->events[NGX_RTMP_CONNECT];
h = ch->elts;
for(n = 0; n < ch->nelts; ++n, ++h) {
if (*h) {
if ((*h)(s, NULL, NULL) != NGX_OK) {
ngx_rtmp_close_connection(c);
return;
}
}
}
ngx_rtmp_handshake_recv(c->read);
}
@ -1061,7 +1077,7 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
ngx_rtmp_core_main_conf_t *cmcf;
ngx_array_t *evhs;
size_t n;
ngx_rtmp_event_handler_pt *evh;
ngx_rtmp_handler_pt *evh;
ngx_connection_t *c;
c = s->connection;
@ -1103,10 +1119,13 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"calling handler %d", n);
if ((*evh)(s, h, in) != NGX_OK) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"handler %d failed", n);
return NGX_ERROR;
switch ((*evh)(s, h, in)) {
case NGX_ERROR:
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"handler %d failed", n);
return NGX_ERROR;
case NGX_DONE:
return NGX_OK;
}
}
@ -1121,7 +1140,8 @@ ngx_rtmp_close_connection(ngx_connection_t *c)
ngx_pool_t *pool;
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_disconnect_handler_pt *h;
ngx_rtmp_handler_pt *h;
ngx_array_t *dh;
size_t n;
if (c->destroyed) {
@ -1135,10 +1155,12 @@ ngx_rtmp_close_connection(ngx_connection_t *c)
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close connection");
if (s) {
h = cmcf->disconnect.elts;
for(n = 0; n < cmcf->disconnect.nelts; ++n, ++h) {
dh = &cmcf->events[NGX_RTMP_DISCONNECT];
h = dh->elts;
for(n = 0; n < dh->nelts; ++n, ++h) {
if (*h) {
(*h)(s);
(*h)(s, NULL, NULL);
}
}

View file

@ -163,8 +163,9 @@ ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_amf0_ctx_t act;
ngx_connection_t *c;
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_event_handler_pt ch;
size_t len;
ngx_array_t *ch;
ngx_rtmp_handler_pt *ph;
size_t len, n;
static u_char func[128];
@ -190,22 +191,26 @@ ngx_rtmp_amf0_message_handler(ngx_rtmp_session_t *s,
len = ngx_strlen(func);
/* lookup function handler
* only the first handler is called so far
* because ngx_hash_find only returns one item;
* no good to patch NGINX core ;) */
ch = ngx_hash_find(&cmcf->amf0_hash,
ngx_hash_strlow(func, func, len), func, len);
if (ch) {
if (ch && ch->nelts) {
ph = ch->elts;
for (n = 0; n < ch->nelts; ++n, ++ph) {
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, c->log, 0,
"AMF0 func '%s' passed to handler %d/%d",
func, n, ch->nelts);
switch ((*ph)(s, h, in)) {
case NGX_ERROR:
return NGX_ERROR;
case NGX_DONE:
return NGX_OK;
}
}
} else {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"AMF0 func '%s' passed to handler", func);
return ch(s, h, in);
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"AMF0 cmd '%s' no handler", func);
}
return NGX_OK;
}

View file

@ -18,7 +18,7 @@ rtmp {
server {
listen 1935;
listen 1935;
# wait_key_frame on;
@ -26,6 +26,12 @@ rtmp {
max_buf 1000000;
#allow play all;
allow publish 127.0.0.1;
deny publish all;
}
}
@ -39,6 +45,7 @@ http {
listen 8080;
location / {
# deny 192.168.33.33;
root /home/rarutyunyan/nginx-rtmp-module/test/www;
}
}

View file

@ -1 +1 @@
ffplay -loglevel verbose "rtmp://192.168.0.100/helo/pd"
ffplay -loglevel verbose "rtmp://localhost/helo/pd"