diff --git a/README b/README index af3178f..466b667 100644 --- a/README +++ b/README @@ -4,6 +4,9 @@ NGINX-based RTMP server * Live streaming of video/audio +* Stream relay support for distributed + streaming: push & pull models + * Recording published streams in FLV file * H264 support @@ -76,6 +79,24 @@ rtmp { #allow play all; } + application mypush { + live on; + + # Every stream published here + # is automatically pushed to + # these two machines + push rtmp1.example.com; + push rtmp2.example.com:1934; + } + + application mypull { + live on; + + # Pull all streams from remote machine + # and play locally + pull rtmp3.example.com; + } + # Many publishers, many subscribers # no checks, no recording application videochat { diff --git a/ngx_rtmp_relay_module.c b/ngx_rtmp_relay_module.c index 0388deb..979ce56 100644 --- a/ngx_rtmp_relay_module.c +++ b/ngx_rtmp_relay_module.c @@ -157,6 +157,25 @@ ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) } +static ngx_rtmp_relay_ctx_t ** +ngx_rtmp_relay_find_ctx(ngx_rtmp_relay_app_conf_t *racf, ngx_str_t *name) +{ + ngx_uint_t hash; + ngx_rtmp_relay_ctx_t **cctx; + + hash = ngx_hash_key(name->data, name->len); + cctx = &racf->ctx[hash % racf->nbuckets]; + for (; *cctx; cctx = &(*cctx)->next) { + if ((*cctx)->name.len == name->len + && !ngx_memcmp(name->data, (*cctx)->name.data, name->len)) + { + break; + } + } + return cctx; +} + + static ngx_int_t ngx_rtmp_relay_get_peer(ngx_peer_connection_t *pc, void *data) { @@ -172,7 +191,7 @@ ngx_rtmp_relay_free_peer(ngx_peer_connection_t *pc, void *data, static void -ngx_rtmp_relay_create(ngx_rtmp_session_t *s, +ngx_rtmp_relay_init_remote(ngx_rtmp_session_t *s, ngx_rtmp_relay_target_t *target) { ngx_rtmp_relay_app_conf_t *racf; @@ -182,7 +201,6 @@ ngx_rtmp_relay_create(ngx_rtmp_session_t *s, ngx_pool_t *pool; ngx_int_t rc; ngx_rtmp_relay_ctx_t *rctx, **cctx; - ngx_uint_t hash; ngx_rtmp_addr_conf_t addr_conf; ngx_rtmp_conf_ctx_t addr_ctx; ngx_rtmp_relay_ctx_t *ctx; @@ -209,16 +227,25 @@ ngx_rtmp_relay_create(ngx_rtmp_session_t *s, rctx->name.data = ngx_palloc(pool, ctx->name.len); ngx_memcpy(rctx->name.data, ctx->name.data, ctx->name.len); - hash = ngx_hash_key(rctx->name.data, rctx->name.len); - cctx = &racf->ctx[hash % racf->nbuckets]; + cctx = ngx_rtmp_relay_find_ctx(racf, &rctx->name); + if (*cctx) { + /* add more pushes */ + if ((*cctx)->target == NULL) { + rctx->src = (*cctx)->src; + rctx->next = (*cctx)->dst; + (*cctx)->dst = rctx; + } - if (target->push) { + } else if (target->push) { + /* the first push */ ctx->next = *cctx; *cctx = ctx; ctx->src = ctx; ctx->dst = rctx; rctx->src = ctx; + } else { + /* pull */ rctx->next = *cctx; *cctx = rctx; rctx->src = rctx; @@ -279,13 +306,12 @@ clear: static ngx_int_t -ngx_rtmp_relay_init(ngx_rtmp_session_t *s, u_char *name) +ngx_rtmp_relay_init_local(ngx_rtmp_session_t *s, u_char *name) { size_t n, len; ngx_rtmp_relay_target_t *target; - ngx_uint_t hash; ngx_rtmp_relay_app_conf_t *racf; - ngx_rtmp_relay_ctx_t *sctx, *ctx, **cctx; + ngx_rtmp_relay_ctx_t *ctx, **cctx; racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module); if (racf == NULL) { @@ -314,22 +340,13 @@ ngx_rtmp_relay_init(ngx_rtmp_session_t *s, u_char *name) ngx_memcpy(ctx->name.data, name, len); /* find relay stream */ - hash = ngx_hash_key(name, len); - cctx = &racf->ctx[hash % racf->nbuckets]; - for (sctx = *cctx; sctx; sctx = sctx->next) { - if (sctx->name.len == len - && !ngx_memcmp(name, sctx->name.data, len)) - { - break; - } - } - - if (sctx) { + cctx = ngx_rtmp_relay_find_ctx(racf, &ctx->name); + if (*cctx) { /* add player to pull stream */ - if (sctx->target) { - ctx->src = sctx->src; - ctx->next = sctx->dst; - sctx->dst = ctx; + if ((*cctx)->target) { + ctx->src = (*cctx)->src; + ctx->next = (*cctx)->dst; + (*cctx)->dst = ctx; } return NGX_OK; } @@ -345,7 +362,7 @@ ngx_rtmp_relay_init(ngx_rtmp_session_t *s, u_char *name) "relay: create: name='%s' url='%V'", name, &target->url); - ngx_rtmp_relay_create(s, target); + ngx_rtmp_relay_init_remote(s, target); } } @@ -363,7 +380,7 @@ ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v) goto next; } - ngx_rtmp_relay_init(s, v->name); + ngx_rtmp_relay_init_local(s, v->name); next: return next_publish(s, v); @@ -380,7 +397,7 @@ ngx_rtmp_relay_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v) goto next; } - ngx_rtmp_relay_init(s, v->name); + ngx_rtmp_relay_init_local(s, v->name); next: return next_play(s, v); diff --git a/test/nginx.conf b/test/nginx.conf index 3f57d17..06ea958 100644 --- a/test/nginx.conf +++ b/test/nginx.conf @@ -23,14 +23,11 @@ rtmp { chunk_size 128; publish_time_fix off; -#play_time_fix off; application myapp { live on; -#video_sync on; - record keyframes; record_path /tmp; @@ -44,11 +41,20 @@ rtmp { on_record_done http://localhost:8080/record_done; } + application myapp2 { + live on; + } + application mypull { live on; pull myapp mystream localhost; } + application mypush { + live on; + push myapp mystream localhost; + push myapp2 mystream localhost; + } } }