Merge branch 'notify-relay'

This commit is contained in:
Roman Arutyunyan 2013-05-13 17:26:10 +04:00
commit a47b23204b
4 changed files with 150 additions and 35 deletions

1
TODO
View file

@ -2,5 +2,4 @@
- akamai auth
- manual recorder custom file name
- bandwidth control (per-app & total)
- DNS round-robin url
- multiple streams per connection

View file

@ -10,6 +10,7 @@
#include "ngx_rtmp_cmd_module.h"
#include "ngx_rtmp_netcall_module.h"
#include "ngx_rtmp_record_module.h"
#include "ngx_rtmp_relay_module.h"
static ngx_rtmp_connect_pt next_connect;
@ -70,6 +71,7 @@ typedef struct {
ngx_uint_t method;
ngx_msec_t update_timeout;
ngx_flag_t update_strict;
ngx_flag_t relay_redirect;
} ngx_rtmp_notify_app_conf_t;
@ -181,6 +183,13 @@ static ngx_command_t ngx_rtmp_notify_commands[] = {
offsetof(ngx_rtmp_notify_app_conf_t, update_strict),
NULL },
{ ngx_string("notify_relay_redirect"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_RTMP_APP_CONF|NGX_CONF_TAKE1,
ngx_conf_set_flag_slot,
NGX_RTMP_APP_CONF_OFFSET,
offsetof(ngx_rtmp_notify_app_conf_t, relay_redirect),
NULL },
ngx_null_command
};
@ -231,6 +240,7 @@ ngx_rtmp_notify_create_app_conf(ngx_conf_t *cf)
nacf->method = NGX_CONF_UNSET;
nacf->update_timeout = NGX_CONF_UNSET;
nacf->update_strict = NGX_CONF_UNSET;
nacf->relay_redirect = NGX_CONF_UNSET;
return nacf;
}
@ -259,6 +269,7 @@ ngx_rtmp_notify_merge_app_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_msec_value(conf->update_timeout, prev->update_timeout,
30000);
ngx_conf_merge_value(conf->update_strict, prev->update_strict, 0);
ngx_conf_merge_value(conf->relay_redirect, prev->relay_redirect, 0);
return NGX_CONF_OK;
}
@ -968,9 +979,13 @@ static ngx_int_t
ngx_rtmp_notify_publish_handle(ngx_rtmp_session_t *s,
void *arg, ngx_chain_t *in)
{
ngx_rtmp_publish_t *v = arg;
ngx_int_t rc;
u_char name[NGX_RTMP_MAX_NAME];
ngx_rtmp_publish_t *v = arg;
ngx_int_t rc;
ngx_str_t local_name;
ngx_rtmp_relay_target_t target;
ngx_url_t *u;
ngx_rtmp_notify_app_conf_t *nacf;
u_char name[NGX_RTMP_MAX_NAME];
static ngx_str_t location = ngx_string("location");
@ -980,19 +995,61 @@ ngx_rtmp_notify_publish_handle(ngx_rtmp_session_t *s,
return NGX_ERROR;
}
if (rc == NGX_AGAIN) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"notify: publish redirect received");
rc = ngx_rtmp_notify_parse_http_header(s, in, &location, name,
sizeof(name) - 1);
if (rc > 0) {
*ngx_cpymem(v->name, name, rc) = 0;
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"notify: publish redirect to '%s'", v->name);
}
if (rc != NGX_AGAIN) {
goto next;
}
/* HTTP 3xx */
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"notify: publish redirect received");
rc = ngx_rtmp_notify_parse_http_header(s, in, &location, name,
sizeof(name) - 1);
if (rc <= 0) {
goto next;
}
if (ngx_strncasecmp(name, (u_char *) "rtmp://", 7)) {
*ngx_cpymem(v->name, name, rc) = 0;
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"notify: publish redirect to '%s'", v->name);
goto next;
}
/* push */
nacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_notify_module);
if (nacf->relay_redirect) {
*ngx_cpymem(v->name, name, rc) = 0;
}
ngx_log_error(NGX_LOG_ERR, s->connection->log, 0,
"notify: push '%s' to '%*s'", v->name, rc, name);
local_name.data = v->name;
local_name.len = ngx_strlen(v->name);
ngx_memzero(&target, sizeof(target));
u = &target.url;
u->url = local_name;
u->url.data = name + 7;
u->url.len = rc - 7;
u->default_port = 1935;
u->uri_part = 1;
u->no_resolve = 1; /* want ip here */
if (ngx_parse_url(s->connection->pool, u) != NGX_OK) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"notify: push failed '%V'", &local_name);
return NGX_ERROR;
}
ngx_rtmp_relay_push(s, &local_name, &target);
next:
return next_publish(s, v);
}
@ -1001,11 +1058,15 @@ static ngx_int_t
ngx_rtmp_notify_play_handle(ngx_rtmp_session_t *s,
void *arg, ngx_chain_t *in)
{
ngx_rtmp_play_t *v = arg;
ngx_int_t rc;
u_char name[NGX_RTMP_MAX_NAME];
ngx_rtmp_play_t *v = arg;
ngx_int_t rc;
ngx_str_t local_name;
ngx_rtmp_relay_target_t target;
ngx_url_t *u;
ngx_rtmp_notify_app_conf_t *nacf;
u_char name[NGX_RTMP_MAX_NAME];
static ngx_str_t location = ngx_string("location");
static ngx_str_t location = ngx_string("location");
rc = ngx_rtmp_notify_parse_http_retcode(s, in);
if (rc == NGX_ERROR) {
@ -1013,19 +1074,61 @@ ngx_rtmp_notify_play_handle(ngx_rtmp_session_t *s,
return NGX_ERROR;
}
if (rc == NGX_AGAIN) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"notify: play redirect received");
rc = ngx_rtmp_notify_parse_http_header(s, in, &location, name,
sizeof(name) - 1);
if (rc > 0) {
*ngx_cpymem(v->name, name, rc) = 0;
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"notify: play redirect to '%s'", v->name);
}
if (rc != NGX_AGAIN) {
goto next;
}
/* HTTP 3xx */
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"notify: play redirect received");
rc = ngx_rtmp_notify_parse_http_header(s, in, &location, name,
sizeof(name) - 1);
if (rc <= 0) {
goto next;
}
if (ngx_strncasecmp(name, (u_char *) "rtmp://", 7)) {
*ngx_cpymem(v->name, name, rc) = 0;
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"notify: play redirect to '%s'", v->name);
goto next;
}
/* pull */
nacf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_notify_module);
if (nacf->relay_redirect) {
*ngx_cpymem(v->name, name, rc) = 0;
}
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"notify: pull '%s' from '%*s'", v->name, rc, name);
local_name.data = v->name;
local_name.len = ngx_strlen(v->name);
ngx_memzero(&target, sizeof(target));
u = &target.url;
u->url = local_name;
u->url.data = name + 7;
u->url.len = rc - 7;
u->default_port = 1935;
u->uri_part = 1;
u->no_resolve = 1; /* want ip here */
if (ngx_parse_url(s->connection->pool, u) != NGX_OK) {
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
"notify: pull failed '%V'", &local_name);
return NGX_ERROR;
}
ngx_rtmp_relay_pull(s, &local_name, &target);
next:
return next_play(s, v);
}

View file

@ -342,6 +342,7 @@ ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name,
ngx_rtmp_session_t *rs;
ngx_peer_connection_t *pc;
ngx_connection_t *c;
ngx_addr_t *addr;
ngx_pool_t *pool;
ngx_int_t rc;
ngx_str_t v, *uri;
@ -438,18 +439,29 @@ ngx_rtmp_relay_create_connection(ngx_rtmp_conf_ctx_t *cctx, ngx_str_t* name,
if (pc == NULL) {
goto clear;
}
if (target->url.naddrs == 0) {
ngx_log_error(NGX_LOG_ERR, racf->log, 0,
"relay: no address");
goto clear;
}
/* get address */
addr = &target->url.addrs[target->counter % target->url.naddrs];
target->counter++;
/* copy log to keep shared log unchanged */
rctx->log = *racf->log;
pc->log = &rctx->log;
pc->get = ngx_rtmp_relay_get_peer;
pc->free = ngx_rtmp_relay_free_peer;
pc->name = &target->url.host;
pc->socklen = target->url.socklen;
pc->name = &addr->name;
pc->socklen = addr->socklen;
pc->sockaddr = (struct sockaddr *)ngx_palloc(pool, pc->socklen);
if (pc->sockaddr == NULL) {
goto clear;
}
ngx_memcpy(pc->sockaddr, &target->url.sockaddr, pc->socklen);
ngx_memcpy(pc->sockaddr, addr->sockaddr, pc->socklen);
rc = ngx_event_connect_peer(pc);
if (rc != NGX_OK && rc != NGX_AGAIN ) {

View file

@ -23,8 +23,9 @@ typedef struct {
ngx_int_t start;
ngx_int_t stop;
void *tag; /* usually module reference */
void *data; /* module-specific data */
void *tag; /* usually module reference */
void *data; /* module-specific data */
ngx_uint_t counter; /* mutable connection counter */
} ngx_rtmp_relay_target_t;