implemented full relay support & updated README

This commit is contained in:
Roman Arutyunyan 2012-05-16 22:29:00 +04:00
parent 806ef97ba3
commit 1f9072bbe6
3 changed files with 73 additions and 29 deletions

21
README
View file

@ -4,6 +4,9 @@ NGINX-based RTMP server
* Live streaming of video/audio * Live streaming of video/audio
* Stream relay support for distributed
streaming: push & pull models
* Recording published streams in FLV file * Recording published streams in FLV file
* H264 support * H264 support
@ -76,6 +79,24 @@ rtmp {
#allow play all; #allow play all;
} }
application mypush {
live on;
# Every stream published here
# is automatically pushed to
# these two machines
push rtmp1.example.com;
push rtmp2.example.com:1934;
}
application mypull {
live on;
# Pull all streams from remote machine
# and play locally
pull rtmp3.example.com;
}
# Many publishers, many subscribers # Many publishers, many subscribers
# no checks, no recording # no checks, no recording
application videochat { application videochat {

View file

@ -157,6 +157,25 @@ ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf, void *parent, void *child)
} }
static ngx_rtmp_relay_ctx_t **
ngx_rtmp_relay_find_ctx(ngx_rtmp_relay_app_conf_t *racf, ngx_str_t *name)
{
ngx_uint_t hash;
ngx_rtmp_relay_ctx_t **cctx;
hash = ngx_hash_key(name->data, name->len);
cctx = &racf->ctx[hash % racf->nbuckets];
for (; *cctx; cctx = &(*cctx)->next) {
if ((*cctx)->name.len == name->len
&& !ngx_memcmp(name->data, (*cctx)->name.data, name->len))
{
break;
}
}
return cctx;
}
static ngx_int_t static ngx_int_t
ngx_rtmp_relay_get_peer(ngx_peer_connection_t *pc, void *data) ngx_rtmp_relay_get_peer(ngx_peer_connection_t *pc, void *data)
{ {
@ -172,7 +191,7 @@ ngx_rtmp_relay_free_peer(ngx_peer_connection_t *pc, void *data,
static void static void
ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_rtmp_relay_init_remote(ngx_rtmp_session_t *s,
ngx_rtmp_relay_target_t *target) ngx_rtmp_relay_target_t *target)
{ {
ngx_rtmp_relay_app_conf_t *racf; ngx_rtmp_relay_app_conf_t *racf;
@ -182,7 +201,6 @@ ngx_rtmp_relay_create(ngx_rtmp_session_t *s,
ngx_pool_t *pool; ngx_pool_t *pool;
ngx_int_t rc; ngx_int_t rc;
ngx_rtmp_relay_ctx_t *rctx, **cctx; ngx_rtmp_relay_ctx_t *rctx, **cctx;
ngx_uint_t hash;
ngx_rtmp_addr_conf_t addr_conf; ngx_rtmp_addr_conf_t addr_conf;
ngx_rtmp_conf_ctx_t addr_ctx; ngx_rtmp_conf_ctx_t addr_ctx;
ngx_rtmp_relay_ctx_t *ctx; ngx_rtmp_relay_ctx_t *ctx;
@ -209,16 +227,25 @@ ngx_rtmp_relay_create(ngx_rtmp_session_t *s,
rctx->name.data = ngx_palloc(pool, ctx->name.len); rctx->name.data = ngx_palloc(pool, ctx->name.len);
ngx_memcpy(rctx->name.data, ctx->name.data, ctx->name.len); ngx_memcpy(rctx->name.data, ctx->name.data, ctx->name.len);
hash = ngx_hash_key(rctx->name.data, rctx->name.len); cctx = ngx_rtmp_relay_find_ctx(racf, &rctx->name);
cctx = &racf->ctx[hash % racf->nbuckets]; if (*cctx) {
/* add more pushes */
if ((*cctx)->target == NULL) {
rctx->src = (*cctx)->src;
rctx->next = (*cctx)->dst;
(*cctx)->dst = rctx;
}
if (target->push) { } else if (target->push) {
/* the first push */
ctx->next = *cctx; ctx->next = *cctx;
*cctx = ctx; *cctx = ctx;
ctx->src = ctx; ctx->src = ctx;
ctx->dst = rctx; ctx->dst = rctx;
rctx->src = ctx; rctx->src = ctx;
} else { } else {
/* pull */
rctx->next = *cctx; rctx->next = *cctx;
*cctx = rctx; *cctx = rctx;
rctx->src = rctx; rctx->src = rctx;
@ -279,13 +306,12 @@ clear:
static ngx_int_t static ngx_int_t
ngx_rtmp_relay_init(ngx_rtmp_session_t *s, u_char *name) ngx_rtmp_relay_init_local(ngx_rtmp_session_t *s, u_char *name)
{ {
size_t n, len; size_t n, len;
ngx_rtmp_relay_target_t *target; ngx_rtmp_relay_target_t *target;
ngx_uint_t hash;
ngx_rtmp_relay_app_conf_t *racf; ngx_rtmp_relay_app_conf_t *racf;
ngx_rtmp_relay_ctx_t *sctx, *ctx, **cctx; ngx_rtmp_relay_ctx_t *ctx, **cctx;
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
if (racf == NULL) { if (racf == NULL) {
@ -314,22 +340,13 @@ ngx_rtmp_relay_init(ngx_rtmp_session_t *s, u_char *name)
ngx_memcpy(ctx->name.data, name, len); ngx_memcpy(ctx->name.data, name, len);
/* find relay stream */ /* find relay stream */
hash = ngx_hash_key(name, len); cctx = ngx_rtmp_relay_find_ctx(racf, &ctx->name);
cctx = &racf->ctx[hash % racf->nbuckets]; if (*cctx) {
for (sctx = *cctx; sctx; sctx = sctx->next) {
if (sctx->name.len == len
&& !ngx_memcmp(name, sctx->name.data, len))
{
break;
}
}
if (sctx) {
/* add player to pull stream */ /* add player to pull stream */
if (sctx->target) { if ((*cctx)->target) {
ctx->src = sctx->src; ctx->src = (*cctx)->src;
ctx->next = sctx->dst; ctx->next = (*cctx)->dst;
sctx->dst = ctx; (*cctx)->dst = ctx;
} }
return NGX_OK; return NGX_OK;
} }
@ -345,7 +362,7 @@ ngx_rtmp_relay_init(ngx_rtmp_session_t *s, u_char *name)
"relay: create: name='%s' url='%V'", "relay: create: name='%s' url='%V'",
name, &target->url); name, &target->url);
ngx_rtmp_relay_create(s, target); ngx_rtmp_relay_init_remote(s, target);
} }
} }
@ -363,7 +380,7 @@ ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
goto next; goto next;
} }
ngx_rtmp_relay_init(s, v->name); ngx_rtmp_relay_init_local(s, v->name);
next: next:
return next_publish(s, v); return next_publish(s, v);
@ -380,7 +397,7 @@ ngx_rtmp_relay_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v)
goto next; goto next;
} }
ngx_rtmp_relay_init(s, v->name); ngx_rtmp_relay_init_local(s, v->name);
next: next:
return next_play(s, v); return next_play(s, v);

View file

@ -23,14 +23,11 @@ rtmp {
chunk_size 128; chunk_size 128;
publish_time_fix off; publish_time_fix off;
#play_time_fix off;
application myapp { application myapp {
live on; live on;
#video_sync on;
record keyframes; record keyframes;
record_path /tmp; record_path /tmp;
@ -44,11 +41,20 @@ rtmp {
on_record_done http://localhost:8080/record_done; on_record_done http://localhost:8080/record_done;
} }
application myapp2 {
live on;
}
application mypull { application mypull {
live on; live on;
pull myapp mystream localhost; pull myapp mystream localhost;
} }
application mypush {
live on;
push myapp mystream localhost;
push myapp2 mystream localhost;
}
} }
} }