From 11e14ee9282f2fdc5abbf8cfafa3fdad80b7f371 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Tue, 7 May 2013 17:43:03 +0400 Subject: [PATCH 1/6] implemented push/pull from on_play/on_publish --- ngx_rtmp_notify_module.c | 161 ++++++++++++++++++++++++++++++++------- 1 file changed, 132 insertions(+), 29 deletions(-) diff --git a/ngx_rtmp_notify_module.c b/ngx_rtmp_notify_module.c index d724e74..50f76d3 100644 --- a/ngx_rtmp_notify_module.c +++ b/ngx_rtmp_notify_module.c @@ -10,6 +10,7 @@ #include "ngx_rtmp_cmd_module.h" #include "ngx_rtmp_netcall_module.h" #include "ngx_rtmp_record_module.h" +#include "ngx_rtmp_relay_module.h" static ngx_rtmp_connect_pt next_connect; @@ -70,6 +71,7 @@ typedef struct { ngx_uint_t method; ngx_msec_t update_timeout; ngx_flag_t update_strict; + ngx_flag_t relay_redirect; } ngx_rtmp_notify_app_conf_t; @@ -180,6 +182,13 @@ static ngx_command_t ngx_rtmp_notify_commands[] = { offsetof(ngx_rtmp_notify_app_conf_t, update_strict), NULL }, + { ngx_string("notify_relay_redirect"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1, + ngx_conf_set_flag_slot, + NGX_RTMP_APP_CONF_OFFSET, + offsetof(ngx_rtmp_notify_app_conf_t, relay_redirect), + NULL }, + ngx_null_command }; @@ -230,6 +239,7 @@ ngx_rtmp_notify_create_app_conf(ngx_conf_t *cf) nacf->method = NGX_CONF_UNSET; nacf->update_timeout = NGX_CONF_UNSET; nacf->update_strict = NGX_CONF_UNSET; + nacf->relay_redirect = NGX_CONF_UNSET; return nacf; } @@ -258,6 +268,7 @@ ngx_rtmp_notify_merge_app_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_msec_value(conf->update_timeout, prev->update_timeout, 30000); ngx_conf_merge_value(conf->update_strict, prev->update_strict, 0); + ngx_conf_merge_value(conf->relay_redirect, prev->relay_redirect, 0); return NGX_CONF_OK; } @@ -962,9 +973,13 @@ static ngx_int_t ngx_rtmp_notify_publish_handle(ngx_rtmp_session_t *s, void *arg, ngx_chain_t *in) { - ngx_rtmp_publish_t *v = arg; - ngx_int_t rc; - u_char name[NGX_RTMP_MAX_NAME]; + ngx_rtmp_publish_t *v = arg; + ngx_int_t rc; + ngx_str_t local_name; + ngx_rtmp_relay_target_t target; + ngx_url_t *u; + ngx_rtmp_notify_app_conf_t *nacf; + u_char name[NGX_RTMP_MAX_NAME]; static ngx_str_t location = ngx_string("location"); @@ -974,19 +989,61 @@ ngx_rtmp_notify_publish_handle(ngx_rtmp_session_t *s, return NGX_ERROR; } - if (rc == NGX_AGAIN) { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "notify: publish redirect received"); - - rc = ngx_rtmp_notify_parse_http_header(s, in, &location, name, - sizeof(name) - 1); - if (rc > 0) { - *ngx_cpymem(v->name, name, rc) = 0; - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "notify: publish redirect to '%s'", v->name); - } + if (rc != NGX_AGAIN) { + goto next; } + /* HTTP 3xx */ + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "notify: publish redirect received"); + + rc = ngx_rtmp_notify_parse_http_header(s, in, &location, name, + sizeof(name) - 1); + if (rc <= 0) { + goto next; + } + + if (ngx_strncmp(name, "rtmp://", 7)) { + *ngx_cpymem(v->name, name, rc) = 0; + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "notify: publish redirect to '%s'", v->name); + goto next; + } + + /* push */ + + nacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_notify_module); + if (nacf->relay_redirect) { + *ngx_cpymem(v->name, name, rc) = 0; + } + + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "notify: push '%s' to '%s'", v->name, name); + + local_name.data = v->name; + local_name.len = ngx_strlen(v->name); + + ngx_memzero(&target, sizeof(target)); + + u = &target.url; + u->url = local_name; + u->url.data = name + 7; + u->url.len = rc - 7; + u->default_port = 1935; + u->uri_part = 1; + u->no_resolve = 1; /* want ip here */ + + if (ngx_parse_url(s->connection->pool, u) != NGX_OK) { + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "notify: push failed '%V'", &local_name); + return NGX_ERROR; + } + + ngx_rtmp_relay_push(s, &local_name, &target); + +next: + return next_publish(s, v); } @@ -995,11 +1052,15 @@ static ngx_int_t ngx_rtmp_notify_play_handle(ngx_rtmp_session_t *s, void *arg, ngx_chain_t *in) { - ngx_rtmp_play_t *v = arg; - ngx_int_t rc; - u_char name[NGX_RTMP_MAX_NAME]; + ngx_rtmp_play_t *v = arg; + ngx_int_t rc; + ngx_str_t local_name; + ngx_rtmp_relay_target_t target; + ngx_url_t *u; + ngx_rtmp_notify_app_conf_t *nacf; + u_char name[NGX_RTMP_MAX_NAME]; - static ngx_str_t location = ngx_string("location"); + static ngx_str_t location = ngx_string("location"); rc = ngx_rtmp_notify_parse_http_retcode(s, in); if (rc == NGX_ERROR) { @@ -1007,19 +1068,61 @@ ngx_rtmp_notify_play_handle(ngx_rtmp_session_t *s, return NGX_ERROR; } - if (rc == NGX_AGAIN) { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "notify: play redirect received"); - - rc = ngx_rtmp_notify_parse_http_header(s, in, &location, name, - sizeof(name) - 1); - if (rc > 0) { - *ngx_cpymem(v->name, name, rc) = 0; - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "notify: play redirect to '%s'", v->name); - } + if (rc != NGX_AGAIN) { + goto next; } + /* HTTP 3xx */ + + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "notify: play redirect received"); + + rc = ngx_rtmp_notify_parse_http_header(s, in, &location, name, + sizeof(name) - 1); + if (rc <= 0) { + goto next; + } + + if (ngx_strncmp(name, "rtmp://", 7)) { + *ngx_cpymem(v->name, name, rc) = 0; + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "notify: play redirect to '%s'", v->name); + goto next; + } + + /* pull */ + + nacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_notify_module); + if (nacf->relay_redirect) { + *ngx_cpymem(v->name, name, rc) = 0; + } + + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "notify: pull '%s' from '%s'", v->name, name); + + local_name.data = v->name; + local_name.len = ngx_strlen(v->name); + + ngx_memzero(&target, sizeof(target)); + + u = &target.url; + u->url = local_name; + u->url.data = name + 7; + u->url.len = rc - 7; + u->default_port = 1935; + u->uri_part = 1; + u->no_resolve = 1; /* want ip here */ + + if (ngx_parse_url(s->connection->pool, u) != NGX_OK) { + ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, + "notify: pull failed '%V'", &local_name); + return NGX_ERROR; + } + + ngx_rtmp_relay_pull(s, &local_name, &target); + +next: + return next_play(s, v); } From db5d36174379fbcef944732f7a75b6645c6dc127 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Mon, 13 May 2013 09:59:34 +0400 Subject: [PATCH 2/6] fixed remote connect in relay module to use the right address --- ngx_rtmp_notify_module.c | 2 +- ngx_rtmp_relay_module.c | 17 ++++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/ngx_rtmp_notify_module.c b/ngx_rtmp_notify_module.c index 50f76d3..27afe5a 100644 --- a/ngx_rtmp_notify_module.c +++ b/ngx_rtmp_notify_module.c @@ -1098,7 +1098,7 @@ ngx_rtmp_notify_play_handle(ngx_rtmp_session_t *s, } ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "notify: pull '%s' from '%s'", v->name, name); + "notify: pull '%s' from '%*s'", v->name, rc, name); local_name.data = v->name; local_name.len = ngx_strlen(v->name); diff --git a/ngx_rtmp_relay_module.c b/ngx_rtmp_relay_module.c index 9f6a6cf..9ccbc07 100644 --- a/ngx_rtmp_relay_module.c +++ b/ngx_rtmp_relay_module.c @@ -342,6 +342,7 @@ ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name, ngx_rtmp_session_t *rs; ngx_peer_connection_t *pc; ngx_connection_t *c; + ngx_addr_t *addr; ngx_pool_t *pool; ngx_int_t rc; ngx_str_t v, *uri; @@ -438,18 +439,28 @@ ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name, if (pc == NULL) { goto clear; } + + if (target->url.naddrs == 0) { + ngx_log_error(NGX_LOG_INFO, racf->log, 0, + "relay: no addresses"); + goto clear; + } + + /* use the first address */ + addr = target->url.addrs; + /* copy log to keep shared log unchanged */ rctx->log = *racf->log; pc->log = &rctx->log; pc->get = ngx_rtmp_relay_get_peer; pc->free = ngx_rtmp_relay_free_peer; - pc->name = &target->url.host; - pc->socklen = target->url.socklen; + pc->name = &addr->name; + pc->socklen = addr->socklen; pc->sockaddr = (struct sockaddr *)ngx_palloc(pool, pc->socklen); if (pc->sockaddr == NULL) { goto clear; } - ngx_memcpy(pc->sockaddr, &target->url.sockaddr, pc->socklen); + ngx_memcpy(pc->sockaddr, addr->sockaddr, pc->socklen); rc = ngx_event_connect_peer(pc); if (rc != NGX_OK && rc != NGX_AGAIN ) { From 244961076ff0da4f8f2277e2b911cdf20e63575b Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Mon, 13 May 2013 12:13:11 +0400 Subject: [PATCH 3/6] added round-robin relay url balancing --- ngx_rtmp_relay_module.c | 5 +++-- ngx_rtmp_relay_module.h | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/ngx_rtmp_relay_module.c b/ngx_rtmp_relay_module.c index 9ccbc07..d5c13f4 100644 --- a/ngx_rtmp_relay_module.c +++ b/ngx_rtmp_relay_module.c @@ -446,8 +446,9 @@ ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name, goto clear; } - /* use the first address */ - addr = target->url.addrs; + /* get address */ + addr = &target->url.addrs[target->counter % target->url.naddrs]; + target->counter++; /* copy log to keep shared log unchanged */ rctx->log = *racf->log; diff --git a/ngx_rtmp_relay_module.h b/ngx_rtmp_relay_module.h index cec70e0..67dbb04 100644 --- a/ngx_rtmp_relay_module.h +++ b/ngx_rtmp_relay_module.h @@ -23,8 +23,9 @@ typedef struct { ngx_int_t start; ngx_int_t stop; - void *tag; /* usually module reference */ - void *data; /* module-specific data */ + void *tag; /* usually module reference */ + void *data; /* module-specific data */ + ngx_uint_t counter; /* mutable connection counter */ } ngx_rtmp_relay_target_t; From 849dbf6b89cf83693d427bf8b69051d6b339b88a Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Mon, 13 May 2013 12:16:14 +0400 Subject: [PATCH 4/6] removed dns round robin from todo --- TODO | 1 - 1 file changed, 1 deletion(-) diff --git a/TODO b/TODO index 000296a..99b854e 100644 --- a/TODO +++ b/TODO @@ -2,5 +2,4 @@ - akamai auth - manual recorder custom file name - bandwidth control (per-app & total) -- DNS round-robin url - multiple streams per connection From aeb20d0a247673af3d176b03d6b49dee40f0a9dc Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Mon, 13 May 2013 13:35:56 +0400 Subject: [PATCH 5/6] minor logger fixes & more debug logging --- ngx_rtmp_notify_module.c | 14 ++++++++++---- ngx_rtmp_relay_module.c | 4 ++-- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/ngx_rtmp_notify_module.c b/ngx_rtmp_notify_module.c index 27afe5a..173318b 100644 --- a/ngx_rtmp_notify_module.c +++ b/ngx_rtmp_notify_module.c @@ -1004,7 +1004,7 @@ ngx_rtmp_notify_publish_handle(ngx_rtmp_session_t *s, goto next; } - if (ngx_strncmp(name, "rtmp://", 7)) { + if (ngx_strncasecmp(name, (u_char *) "rtmp://", 7)) { *ngx_cpymem(v->name, name, rc) = 0; ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "notify: publish redirect to '%s'", v->name); @@ -1018,8 +1018,8 @@ ngx_rtmp_notify_publish_handle(ngx_rtmp_session_t *s, *ngx_cpymem(v->name, name, rc) = 0; } - ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, - "notify: push '%s' to '%s'", v->name, name); + ngx_log_error(NGX_LOG_ERR, s->connection->log, 0, + "notify: push '%s' to '%*s'", v->name, rc, name); local_name.data = v->name; local_name.len = ngx_strlen(v->name); @@ -1083,7 +1083,7 @@ ngx_rtmp_notify_play_handle(ngx_rtmp_session_t *s, goto next; } - if (ngx_strncmp(name, "rtmp://", 7)) { + if (ngx_strncasecmp(name, (u_char *) "rtmp://", 7)) { *ngx_cpymem(v->name, name, rc) = 0; ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "notify: play redirect to '%s'", v->name); @@ -1113,12 +1113,18 @@ ngx_rtmp_notify_play_handle(ngx_rtmp_session_t *s, u->uri_part = 1; u->no_resolve = 1; /* want ip here */ + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "notify: parse_url '%V'", &u->url); + if (ngx_parse_url(s->connection->pool, u) != NGX_OK) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "notify: pull failed '%V'", &local_name); return NGX_ERROR; } + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "notify: naddrs=%ui", u->naddrs); + ngx_rtmp_relay_pull(s, &local_name, &target); next: diff --git a/ngx_rtmp_relay_module.c b/ngx_rtmp_relay_module.c index d5c13f4..b4014cc 100644 --- a/ngx_rtmp_relay_module.c +++ b/ngx_rtmp_relay_module.c @@ -441,8 +441,8 @@ ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name, } if (target->url.naddrs == 0) { - ngx_log_error(NGX_LOG_INFO, racf->log, 0, - "relay: no addresses"); + ngx_log_error(NGX_LOG_ERR, racf->log, 0, + "relay: no address"); goto clear; } From 58bd6029463c79347d45be4488a3786abe5cebb8 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Mon, 13 May 2013 17:25:54 +0400 Subject: [PATCH 6/6] removed extra logging --- ngx_rtmp_notify_module.c | 6 ------ 1 file changed, 6 deletions(-) diff --git a/ngx_rtmp_notify_module.c b/ngx_rtmp_notify_module.c index 173318b..9777c1b 100644 --- a/ngx_rtmp_notify_module.c +++ b/ngx_rtmp_notify_module.c @@ -1113,18 +1113,12 @@ ngx_rtmp_notify_play_handle(ngx_rtmp_session_t *s, u->uri_part = 1; u->no_resolve = 1; /* want ip here */ - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "notify: parse_url '%V'", &u->url); - if (ngx_parse_url(s->connection->pool, u) != NGX_OK) { ngx_log_error(NGX_LOG_INFO, s->connection->log, 0, "notify: pull failed '%V'", &local_name); return NGX_ERROR; } - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "notify: naddrs=%ui", u->naddrs); - ngx_rtmp_relay_pull(s, &local_name, &target); next: