mirror of
https://github.com/zotanmew/nginx-rtmp-module.git
synced 2024-05-11 06:41:08 +02:00
early relay implementation
This commit is contained in:
parent
c3067ced06
commit
83dbef4567
2
config
2
config
|
@ -9,6 +9,7 @@ CORE_MODULES="$CORE_MODULES
|
|||
ngx_rtmp_record_module \
|
||||
ngx_rtmp_netcall_module \
|
||||
ngx_rtmp_notify_module \
|
||||
ngx_rtmp_relay_module \
|
||||
"
|
||||
|
||||
|
||||
|
@ -34,6 +35,7 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
|
|||
$ngx_addon_dir/ngx_rtmp_netcall_module.c \
|
||||
$ngx_addon_dir/ngx_rtmp_notify_module.c \
|
||||
$ngx_addon_dir/ngx_rtmp_stat_module.c \
|
||||
$ngx_addon_dir/ngx_rtmp_relay_module.c \
|
||||
$ngx_addon_dir/ngx_rtmp_bandwidth.c \
|
||||
$ngx_addon_dir/ngx_rtmp_codecs.c \
|
||||
"
|
||||
|
|
24
ngx_rtmp.c
24
ngx_rtmp.c
|
@ -762,8 +762,30 @@ ngx_rtmp_cmp_conf_addrs(const void *one, const void *two)
|
|||
}
|
||||
|
||||
|
||||
ngx_int_t
|
||||
ngx_rtmp_fire_event(ngx_rtmp_session_t *s, ngx_uint_t evt,
|
||||
ngx_rtmp_header_t *h, ngx_chain_t *in)
|
||||
{
|
||||
ngx_rtmp_core_main_conf_t *cmcf;
|
||||
ngx_array_t *ch;
|
||||
ngx_rtmp_handler_pt *hh;
|
||||
size_t n;
|
||||
|
||||
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
|
||||
|
||||
ch = &cmcf->events[evt];
|
||||
hh = ch->elts;
|
||||
for(n = 0; n < ch->nelts; ++n, ++hh) {
|
||||
if (*hh && (*hh)(s, h, in) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
}
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
void *
|
||||
ngx_rtmp_rmemcpy(void *dst, void* src, size_t n)
|
||||
ngx_rtmp_rmemcpy(void *dst, const void* src, size_t n)
|
||||
{
|
||||
u_char *d, *s;
|
||||
|
||||
|
|
14
ngx_rtmp.h
14
ngx_rtmp.h
|
@ -126,7 +126,8 @@ typedef struct {
|
|||
|
||||
#define NGX_RTMP_CONNECT NGX_RTMP_MSG_MAX + 1
|
||||
#define NGX_RTMP_DISCONNECT NGX_RTMP_MSG_MAX + 2
|
||||
#define NGX_RTMP_MAX_EVENT NGX_RTMP_MSG_MAX + 3
|
||||
#define NGX_RTMP_HANDSHAKE_DONE NGX_RTMP_MSG_MAX + 3
|
||||
#define NGX_RTMP_MAX_EVENT NGX_RTMP_MSG_MAX + 4
|
||||
|
||||
|
||||
/* RMTP control message types */
|
||||
|
@ -193,8 +194,7 @@ typedef struct {
|
|||
ngx_str_t page_url;
|
||||
|
||||
/* handshake data */
|
||||
ngx_buf_t *hs_in;
|
||||
ngx_buf_t *hs_out1, *hs_out2;
|
||||
ngx_buf_t *hs_buf, *hs_bufs[3];
|
||||
ngx_uint_t hs_stage;
|
||||
|
||||
/* connection timestamps */
|
||||
|
@ -343,19 +343,23 @@ char* ngx_rtmp_message_type(uint8_t type);
|
|||
char* ngx_rtmp_user_message_type(uint16_t evt);
|
||||
#endif
|
||||
|
||||
|
||||
void ngx_rtmp_init_connection(ngx_connection_t *c);
|
||||
ngx_rtmp_session_t * ngx_rtmp_init_session(ngx_connection_t *c,
|
||||
ngx_rtmp_addr_conf_t *addr_conf);
|
||||
void ngx_rtmp_finalize_session(ngx_rtmp_session_t *s);
|
||||
void ngx_rtmp_handshake(ngx_rtmp_session_t *s);
|
||||
void ngx_rtmp_client_handshake(ngx_rtmp_session_t *s);
|
||||
void ngx_rtmp_free_handshake_buffers(ngx_rtmp_session_t *s);
|
||||
void ngx_rtmp_cycle(ngx_rtmp_session_t *s);
|
||||
ngx_int_t ngx_rtmp_fire_event(ngx_rtmp_session_t *s, ngx_uint_t evt,
|
||||
ngx_rtmp_header_t *h, ngx_chain_t *in);
|
||||
|
||||
|
||||
ngx_int_t ngx_rtmp_set_chunk_size(ngx_rtmp_session_t *s, ngx_uint_t size);
|
||||
|
||||
|
||||
/* Bit reverse: we need big-endians in many places */
|
||||
void * ngx_rtmp_rmemcpy(void *dst, void* src, size_t n);
|
||||
void * ngx_rtmp_rmemcpy(void *dst, const void* src, size_t n);
|
||||
|
||||
#define ngx_rtmp_rcpymem(dst, src, n) \
|
||||
(((u_char*)ngx_rtmp_rmemcpy(dst, src, n)) + (n))
|
||||
|
|
|
@ -46,8 +46,28 @@ ngx_rtmp_server_version[4] = {
|
|||
};
|
||||
|
||||
|
||||
#define NGX_RTMP_KEYLEN SHA256_DIGEST_LENGTH
|
||||
#define NGX_RTMP_HANDSHAKE_BUFSIZE 1537
|
||||
static const u_char
|
||||
ngx_rtmp_client_version[4] = {
|
||||
0x0C, 0x00, 0x0D, 0x0E
|
||||
};
|
||||
|
||||
|
||||
#define NGX_RTMP_HANDSHAKE_KEYLEN SHA256_DIGEST_LENGTH
|
||||
#define NGX_RTMP_HANDSHAKE_BUFSIZE 1537
|
||||
|
||||
|
||||
#define NGX_RTMP_HANDSHAKE_SERVER_RECV_CHALLENGE 1
|
||||
#define NGX_RTMP_HANDSHAKE_SERVER_SEND_CHALLENGE 2
|
||||
#define NGX_RTMP_HANDSHAKE_SERVER_SEND_RESPONSE 3
|
||||
#define NGX_RTMP_HANDSHAKE_SERVER_RECV_RESPONSE 4
|
||||
#define NGX_RTMP_HANDSHAKE_SERVER_DONE 5
|
||||
|
||||
|
||||
#define NGX_RTMP_HANDSHAKE_CLIENT_SEND_CHALLENGE 6
|
||||
#define NGX_RTMP_HANDSHAKE_CLIENT_RECV_CHALLENGE 7
|
||||
#define NGX_RTMP_HANDSHAKE_CLIENT_RECV_RESPONSE 8
|
||||
#define NGX_RTMP_HANDSHAKE_CLIENT_SEND_RESPONSE 9
|
||||
#define NGX_RTMP_HANDSHAKE_CLIENT_DONE 10
|
||||
|
||||
|
||||
static ngx_str_t ngx_rtmp_server_full_key
|
||||
|
@ -65,56 +85,34 @@ ngx_rtmp_make_digest(ngx_str_t *key, ngx_buf_t *src,
|
|||
{
|
||||
HMAC_CTX hmac;
|
||||
unsigned int len;
|
||||
ngx_int_t rc;
|
||||
|
||||
rc = NGX_ERROR;
|
||||
HMAC_CTX_init(&hmac);
|
||||
|
||||
if (HMAC_Init_ex(&hmac, key->data, key->len,
|
||||
EVP_sha256(), NULL) == 0)
|
||||
{
|
||||
ngx_log_error(NGX_LOG_INFO, log, 0, "HMAC_Init_ex error");
|
||||
goto out;
|
||||
}
|
||||
HMAC_Init_ex(&hmac, key->data, key->len, EVP_sha256(), NULL);
|
||||
|
||||
if (skip && src->pos <= skip && skip <= src->last) {
|
||||
if (skip != src->pos
|
||||
&& HMAC_Update(&hmac, src->pos, skip - src->pos) == 0)
|
||||
{
|
||||
ngx_log_error(NGX_LOG_INFO, log, 0, "HMAC_Update error");
|
||||
goto out;
|
||||
if (skip != src->pos) {
|
||||
HMAC_Update(&hmac, src->pos, skip - src->pos);
|
||||
}
|
||||
if (src->last != skip + NGX_RTMP_KEYLEN
|
||||
&& HMAC_Update(&hmac, skip + NGX_RTMP_KEYLEN,
|
||||
src->last - skip - NGX_RTMP_KEYLEN) == 0)
|
||||
{
|
||||
ngx_log_error(NGX_LOG_INFO, log, 0, "HMAC_Update error");
|
||||
goto out;
|
||||
if (src->last != skip + NGX_RTMP_HANDSHAKE_KEYLEN) {
|
||||
HMAC_Update(&hmac, skip + NGX_RTMP_HANDSHAKE_KEYLEN,
|
||||
src->last - skip - NGX_RTMP_HANDSHAKE_KEYLEN);
|
||||
}
|
||||
} else if (HMAC_Update(&hmac, src->pos, src->last - src->pos) == 0) {
|
||||
ngx_log_error(NGX_LOG_INFO, log, 0, "HMAC_Update error");
|
||||
goto out;
|
||||
} else {
|
||||
HMAC_Update(&hmac, src->pos, src->last - src->pos);
|
||||
}
|
||||
|
||||
if (HMAC_Final(&hmac, dst, &len) == 0) {
|
||||
ngx_log_error(NGX_LOG_INFO, log, 0, "HMAC_Final error");
|
||||
goto out;
|
||||
}
|
||||
|
||||
rc = NGX_OK;
|
||||
|
||||
out:
|
||||
HMAC_Final(&hmac, dst, &len);
|
||||
HMAC_CTX_cleanup(&hmac);
|
||||
|
||||
return rc;
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_find_digest(ngx_buf_t *b, size_t base, ngx_log_t *log)
|
||||
ngx_rtmp_find_digest(ngx_buf_t *b, ngx_str_t *key, size_t base, ngx_log_t *log)
|
||||
{
|
||||
size_t n, offs;
|
||||
u_char digest[NGX_RTMP_KEYLEN];
|
||||
u_char digest[NGX_RTMP_HANDSHAKE_KEYLEN];
|
||||
u_char *p;
|
||||
|
||||
offs = 0;
|
||||
|
@ -124,13 +122,11 @@ ngx_rtmp_find_digest(ngx_buf_t *b, size_t base, ngx_log_t *log)
|
|||
offs = (offs % 728) + base + 4;
|
||||
p = b->pos + offs;
|
||||
|
||||
if (ngx_rtmp_make_digest(&ngx_rtmp_client_partial_key, b,
|
||||
p, digest, log) != NGX_OK)
|
||||
{
|
||||
if (ngx_rtmp_make_digest(key, b, p, digest, log) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ngx_memcmp(digest, p, NGX_RTMP_KEYLEN) == 0) {
|
||||
if (ngx_memcmp(digest, p, NGX_RTMP_HANDSHAKE_KEYLEN) == 0) {
|
||||
return offs;
|
||||
}
|
||||
|
||||
|
@ -139,7 +135,8 @@ ngx_rtmp_find_digest(ngx_buf_t *b, size_t base, ngx_log_t *log)
|
|||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_write_digest(ngx_buf_t *b, size_t base, ngx_log_t *log)
|
||||
ngx_rtmp_write_digest(ngx_buf_t *b, ngx_str_t *key, size_t base,
|
||||
ngx_log_t *log)
|
||||
{
|
||||
size_t n, offs;
|
||||
u_char *p;
|
||||
|
@ -151,9 +148,7 @@ ngx_rtmp_write_digest(ngx_buf_t *b, size_t base, ngx_log_t *log)
|
|||
offs = (offs % 728) + base + 12;
|
||||
p = b->pos + offs;
|
||||
|
||||
if (ngx_rtmp_make_digest(&ngx_rtmp_server_partial_key,
|
||||
b, p, p, log) != NGX_OK)
|
||||
{
|
||||
if (ngx_rtmp_make_digest(key, b, p, p, log) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
|
@ -229,19 +224,13 @@ ngx_rtmp_free_handshake_buffer(ngx_rtmp_session_t *s, ngx_buf_t *b)
|
|||
void
|
||||
ngx_rtmp_free_handshake_buffers(ngx_rtmp_session_t *s)
|
||||
{
|
||||
if (s->hs_in) {
|
||||
ngx_rtmp_free_handshake_buffer(s, s->hs_in);
|
||||
s->hs_in = NULL;
|
||||
}
|
||||
size_t n;
|
||||
|
||||
if (s->hs_out1) {
|
||||
ngx_rtmp_free_handshake_buffer(s, s->hs_out1);
|
||||
s->hs_out1 = NULL;
|
||||
}
|
||||
|
||||
if (s->hs_out2) {
|
||||
ngx_rtmp_free_handshake_buffer(s, s->hs_out2);
|
||||
s->hs_out2 = NULL;
|
||||
for (n = 0; n < sizeof(s->hs_bufs) / sizeof(s->hs_bufs[0]); ++n) {
|
||||
if (s->hs_bufs[n]) {
|
||||
ngx_rtmp_free_handshake_buffer(s, s->hs_bufs[n]);
|
||||
s->hs_bufs[n] = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -256,16 +245,16 @@ ngx_rtmp_old_handshake_response(ngx_rtmp_session_t *s)
|
|||
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
|
||||
"RTMP old-style handshake");
|
||||
|
||||
src = s->hs_in->pos + 8;
|
||||
len = s->hs_in->last - src;
|
||||
src = s->hs_bufs[0]->pos + 8;
|
||||
len = s->hs_bufs[0]->last - src;
|
||||
|
||||
b = s->hs_out1;
|
||||
b = s->hs_bufs[1];
|
||||
*b->last++ = '\x03';
|
||||
b->last = ngx_rtmp_rcpymem(b->last, &s->epoch, 4);
|
||||
ngx_memzero(b->last, 4);
|
||||
b->last = ngx_cpymem(b->last + 4, src, len);
|
||||
|
||||
b = s->hs_out2;
|
||||
b = s->hs_bufs[2];
|
||||
b->last = ngx_rtmp_rcpymem(b->last, &s->peer_epoch, 4);
|
||||
ngx_memzero(b->last, 4);
|
||||
b->last = ngx_cpymem(b->last + 4, src, len);
|
||||
|
@ -280,11 +269,11 @@ ngx_rtmp_handshake_response(ngx_rtmp_session_t *s)
|
|||
u_char *p;
|
||||
ngx_buf_t *b;
|
||||
ngx_int_t offs;
|
||||
u_char digest[NGX_RTMP_KEYLEN];
|
||||
u_char digest[NGX_RTMP_HANDSHAKE_KEYLEN];
|
||||
ngx_str_t key;
|
||||
|
||||
/* read input buffer */
|
||||
b = s->hs_in;
|
||||
b = s->hs_bufs[0];
|
||||
if (*b->pos != '\x03') {
|
||||
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
|
||||
"Unexpected RTMP version: %i", (ngx_int_t)*b->pos);
|
||||
|
@ -303,9 +292,11 @@ ngx_rtmp_handshake_response(ngx_rtmp_session_t *s)
|
|||
return ngx_rtmp_old_handshake_response(s);
|
||||
}
|
||||
|
||||
offs = ngx_rtmp_find_digest(b, 772, s->connection->log);
|
||||
offs = ngx_rtmp_find_digest(b, &ngx_rtmp_client_partial_key,
|
||||
772, s->connection->log);
|
||||
if (offs == NGX_ERROR) {
|
||||
offs = ngx_rtmp_find_digest(b, 8, s->connection->log);
|
||||
offs = ngx_rtmp_find_digest(b, &ngx_rtmp_client_partial_key,
|
||||
8, s->connection->log);
|
||||
}
|
||||
if (offs == NGX_ERROR) {
|
||||
ngx_log_error(NGX_LOG_INFO, s->connection->log, 0,
|
||||
|
@ -313,7 +304,7 @@ ngx_rtmp_handshake_response(ngx_rtmp_session_t *s)
|
|||
return ngx_rtmp_old_handshake_response(s);
|
||||
}
|
||||
b->pos += offs;
|
||||
b->last = b->pos + NGX_RTMP_KEYLEN;
|
||||
b->last = b->pos + NGX_RTMP_HANDSHAKE_KEYLEN;
|
||||
if (ngx_rtmp_make_digest(&ngx_rtmp_server_full_key, b,
|
||||
NULL, digest, s->connection->log) != NGX_OK)
|
||||
{
|
||||
|
@ -323,19 +314,21 @@ ngx_rtmp_handshake_response(ngx_rtmp_session_t *s)
|
|||
"RTMP digest found at pos=%i", offs);
|
||||
|
||||
/* create first output buffer */
|
||||
b = s->hs_out1;
|
||||
b = s->hs_bufs[1];
|
||||
*b->last++ = '\x03';
|
||||
b->last = ngx_rtmp_rcpymem(b->last, &s->epoch, 4);
|
||||
b->last = ngx_cpymem(b->last, ngx_rtmp_server_version, 4);
|
||||
ngx_rtmp_fill_random_buffer(b);
|
||||
++b->pos;
|
||||
if (ngx_rtmp_write_digest(b, 0, s->connection->log) != NGX_OK) {
|
||||
if (ngx_rtmp_write_digest(b, &ngx_rtmp_server_partial_key,
|
||||
0, s->connection->log) != NGX_OK)
|
||||
{
|
||||
return NGX_ERROR;
|
||||
}
|
||||
--b->pos;
|
||||
|
||||
/* create second output buffer */
|
||||
b = s->hs_out2;
|
||||
b = s->hs_bufs[2];
|
||||
ngx_rtmp_fill_random_buffer(b);
|
||||
key.data = digest;
|
||||
key.len = sizeof(digest);
|
||||
|
@ -348,6 +341,38 @@ ngx_rtmp_handshake_response(ngx_rtmp_session_t *s)
|
|||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_handshake_client_response(ngx_rtmp_session_t *s)
|
||||
{
|
||||
/*TODO: implement good client response generation
|
||||
* to make it possible relaying data from/to FMS.
|
||||
*
|
||||
* This module as server ignores the last response
|
||||
* from client. */
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_handshake_make_client_request(ngx_rtmp_session_t *s)
|
||||
{
|
||||
ngx_buf_t *b;
|
||||
|
||||
b = s->hs_bufs[0];
|
||||
*b->last++ = '\x03';
|
||||
b->last = ngx_rtmp_rcpymem(b->last, &s->epoch, 4);
|
||||
b->last = ngx_rtmp_rcpymem(b->last, ngx_rtmp_client_version, 4);
|
||||
ngx_rtmp_fill_random_buffer(b);
|
||||
if (ngx_rtmp_write_digest(b, &ngx_rtmp_client_partial_key,
|
||||
0, s->connection->log) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)
|
||||
{
|
||||
|
@ -356,6 +381,13 @@ ngx_rtmp_handshake_done(ngx_rtmp_session_t *s)
|
|||
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
|
||||
"RTMP handshake done");
|
||||
|
||||
if (ngx_rtmp_fire_event(s, NGX_RTMP_HANDSHAKE_DONE,
|
||||
NULL, NULL) != NGX_OK)
|
||||
{
|
||||
ngx_rtmp_finalize_session(s);
|
||||
return;
|
||||
}
|
||||
|
||||
ngx_rtmp_cycle(s);
|
||||
}
|
||||
|
||||
|
@ -386,7 +418,7 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
|
|||
ngx_del_timer(rev);
|
||||
}
|
||||
|
||||
b = s->hs_in;
|
||||
b = s->hs_buf;
|
||||
|
||||
while (b->last != b->end) {
|
||||
n = c->recv(c, b->last, b->end - b->last);
|
||||
|
@ -411,20 +443,41 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
|
|||
ngx_del_event(c->read, NGX_READ_EVENT, 0);
|
||||
}
|
||||
|
||||
if (++s->hs_stage == 1) {
|
||||
s->hs_out1 = ngx_rtmp_alloc_handshake_buffer(s, 0);
|
||||
s->hs_out2 = ngx_rtmp_alloc_handshake_buffer(s, 1);
|
||||
if (ngx_rtmp_handshake_response(s) != NGX_OK) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0,
|
||||
"RTMP handshake error");
|
||||
ngx_rtmp_finalize_session(s);
|
||||
return;
|
||||
}
|
||||
ngx_rtmp_handshake_send(c->write);
|
||||
return;
|
||||
}
|
||||
switch (++s->hs_stage) {
|
||||
case NGX_RTMP_HANDSHAKE_SERVER_SEND_CHALLENGE:
|
||||
s->hs_bufs[1] = ngx_rtmp_alloc_handshake_buffer(s, 0);
|
||||
s->hs_bufs[2] = ngx_rtmp_alloc_handshake_buffer(s, 1);
|
||||
if (ngx_rtmp_handshake_response(s) != NGX_OK) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0,
|
||||
"RTMP handshake error");
|
||||
ngx_rtmp_finalize_session(s);
|
||||
return;
|
||||
}
|
||||
s->hs_buf = s->hs_bufs[0];
|
||||
ngx_rtmp_handshake_send(c->write);
|
||||
break;
|
||||
|
||||
ngx_rtmp_handshake_done(s);
|
||||
case NGX_RTMP_HANDSHAKE_SERVER_DONE:
|
||||
ngx_rtmp_handshake_done(s);
|
||||
break;
|
||||
|
||||
case NGX_RTMP_HANDSHAKE_CLIENT_RECV_RESPONSE:
|
||||
s->hs_bufs[2] = ngx_rtmp_alloc_handshake_buffer(s, 1);
|
||||
s->hs_buf = s->hs_bufs[2];
|
||||
ngx_rtmp_handshake_recv(c->read);
|
||||
break;
|
||||
|
||||
case NGX_RTMP_HANDSHAKE_CLIENT_SEND_RESPONSE:
|
||||
if (ngx_rtmp_handshake_client_response(s) != NGX_OK) {
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0,
|
||||
"RTMP client handshake error");
|
||||
ngx_rtmp_finalize_session(s);
|
||||
return;
|
||||
}
|
||||
s->hs_buf = s->hs_bufs[2];
|
||||
ngx_rtmp_handshake_send(c->write);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -455,9 +508,7 @@ ngx_rtmp_handshake_send(ngx_event_t *wev)
|
|||
ngx_del_timer(wev);
|
||||
}
|
||||
|
||||
restart:
|
||||
|
||||
b = (s->hs_stage == 1 ? s->hs_out1 : s->hs_out2);
|
||||
b = s->hs_buf;
|
||||
|
||||
while(b->pos != b->last) {
|
||||
n = c->send(c, b->pos, b->last - b->pos);
|
||||
|
@ -478,12 +529,28 @@ restart:
|
|||
b->pos += n;
|
||||
}
|
||||
|
||||
if (++s->hs_stage == 2) {
|
||||
goto restart;
|
||||
}
|
||||
switch (++s->hs_stage) {
|
||||
case NGX_RTMP_HANDSHAKE_SERVER_SEND_RESPONSE:
|
||||
s->hs_buf = s->hs_bufs[2];
|
||||
ngx_rtmp_handshake_send(wev);
|
||||
break;
|
||||
|
||||
s->hs_in->pos = s->hs_in->last = s->hs_in->start + 1;
|
||||
ngx_rtmp_handshake_recv(c->read);
|
||||
case NGX_RTMP_HANDSHAKE_SERVER_RECV_RESPONSE:
|
||||
s->hs_buf = s->hs_bufs[0];
|
||||
s->hs_buf->pos = s->hs_buf->last = s->hs_buf->start + 1;
|
||||
ngx_rtmp_handshake_recv(c->read);
|
||||
break;
|
||||
|
||||
case NGX_RTMP_HANDSHAKE_CLIENT_RECV_CHALLENGE:
|
||||
s->hs_bufs[1] = ngx_rtmp_alloc_handshake_buffer(s, 0);
|
||||
s->hs_buf = s->hs_bufs[1];
|
||||
ngx_rtmp_handshake_recv(c->read);
|
||||
break;
|
||||
|
||||
case NGX_RTMP_HANDSHAKE_CLIENT_DONE:
|
||||
ngx_rtmp_handshake_done(s);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -496,8 +563,32 @@ ngx_rtmp_handshake(ngx_rtmp_session_t *s)
|
|||
c->read->handler = ngx_rtmp_handshake_recv;
|
||||
c->write->handler = ngx_rtmp_handshake_send;
|
||||
|
||||
s->hs_in = ngx_rtmp_alloc_handshake_buffer(s, 0);
|
||||
s->hs_bufs[0] = ngx_rtmp_alloc_handshake_buffer(s, 0);
|
||||
s->hs_buf = s->hs_bufs[0];
|
||||
s->hs_stage = NGX_RTMP_HANDSHAKE_SERVER_RECV_CHALLENGE;
|
||||
|
||||
ngx_rtmp_handshake_recv(c->read);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
ngx_rtmp_client_handshake(ngx_rtmp_session_t *s)
|
||||
{
|
||||
ngx_connection_t *c;
|
||||
|
||||
c = s->connection;
|
||||
c->read->handler = ngx_rtmp_handshake_recv;
|
||||
c->write->handler = ngx_rtmp_handshake_send;
|
||||
|
||||
s->hs_bufs[0] = ngx_rtmp_alloc_handshake_buffer(s, 0);
|
||||
s->hs_buf = s->hs_bufs[0];
|
||||
s->hs_stage = NGX_RTMP_HANDSHAKE_CLIENT_SEND_CHALLENGE;
|
||||
|
||||
if (ngx_rtmp_handshake_make_client_request(s) != NGX_OK) {
|
||||
ngx_rtmp_finalize_session(s);
|
||||
return;
|
||||
}
|
||||
|
||||
ngx_rtmp_handshake_send(c->read);
|
||||
}
|
||||
|
||||
|
|
|
@ -7,7 +7,6 @@
|
|||
|
||||
|
||||
static void ngx_rtmp_close_connection(ngx_connection_t *c);
|
||||
static void ngx_rtmp_init_session(ngx_connection_t *c);
|
||||
static u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len);
|
||||
|
||||
|
||||
|
@ -18,7 +17,6 @@ ngx_rtmp_init_connection(ngx_connection_t *c)
|
|||
ngx_rtmp_port_t *port;
|
||||
struct sockaddr *sa;
|
||||
struct sockaddr_in *sin;
|
||||
ngx_rtmp_log_ctx_t *ctx;
|
||||
ngx_rtmp_in_addr_t *addr;
|
||||
ngx_rtmp_session_t *s;
|
||||
ngx_rtmp_addr_conf_t *addr_conf;
|
||||
|
@ -107,10 +105,28 @@ ngx_rtmp_init_connection(ngx_connection_t *c)
|
|||
}
|
||||
}
|
||||
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0, "*%ui client connected",
|
||||
c->number, &c->addr_text);
|
||||
|
||||
s = ngx_rtmp_init_session(c, addr_conf);
|
||||
|
||||
if (s) {
|
||||
ngx_rtmp_handshake(s);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ngx_rtmp_session_t *
|
||||
ngx_rtmp_init_session(ngx_connection_t *c, ngx_rtmp_addr_conf_t *addr_conf)
|
||||
{
|
||||
ngx_rtmp_session_t *s;
|
||||
ngx_rtmp_core_srv_conf_t *cscf;
|
||||
ngx_rtmp_log_ctx_t *ctx;
|
||||
|
||||
s = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_session_t));
|
||||
if (s == NULL) {
|
||||
ngx_rtmp_close_connection(c);
|
||||
return;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
s->main_conf = addr_conf->ctx->main_conf;
|
||||
|
@ -121,13 +137,10 @@ ngx_rtmp_init_connection(ngx_connection_t *c)
|
|||
c->data = s;
|
||||
s->connection = c;
|
||||
|
||||
ngx_log_error(NGX_LOG_INFO, c->log, 0, "*%ui client connected",
|
||||
c->number, &c->addr_text);
|
||||
|
||||
ctx = ngx_palloc(c->pool, sizeof(ngx_rtmp_log_ctx_t));
|
||||
if (ctx == NULL) {
|
||||
ngx_rtmp_close_connection(c);
|
||||
return;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ctx->client = &c->addr_text;
|
||||
|
@ -140,28 +153,12 @@ ngx_rtmp_init_connection(ngx_connection_t *c)
|
|||
|
||||
c->log_error = NGX_ERROR_INFO;
|
||||
|
||||
ngx_rtmp_init_session(c);
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ngx_rtmp_init_session(ngx_connection_t *c)
|
||||
{
|
||||
ngx_rtmp_session_t *s;
|
||||
ngx_rtmp_core_main_conf_t *cmcf;
|
||||
ngx_rtmp_core_srv_conf_t *cscf;
|
||||
size_t n;
|
||||
ngx_rtmp_handler_pt *h;
|
||||
ngx_array_t *ch;
|
||||
|
||||
s = c->data;
|
||||
|
||||
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
|
||||
|
||||
s->ctx = ngx_pcalloc(c->pool, sizeof(void *) * ngx_rtmp_max_module);
|
||||
if (s->ctx == NULL) {
|
||||
ngx_rtmp_close_connection(c);
|
||||
return;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
|
@ -169,7 +166,7 @@ ngx_rtmp_init_session(ngx_connection_t *c)
|
|||
* cscf->max_streams);
|
||||
if (s->in_streams == NULL) {
|
||||
ngx_rtmp_close_connection(c);
|
||||
return;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
s->epoch = ngx_current_msec;
|
||||
|
@ -177,21 +174,12 @@ ngx_rtmp_init_session(ngx_connection_t *c)
|
|||
ngx_rtmp_set_chunk_size(s, NGX_RTMP_DEFAULT_CHUNK_SIZE);
|
||||
|
||||
|
||||
/* call connect callbacks */
|
||||
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
|
||||
|
||||
ch = &cmcf->events[NGX_RTMP_CONNECT];
|
||||
h = ch->elts;
|
||||
for(n = 0; n < ch->nelts; ++n, ++h) {
|
||||
if (*h) {
|
||||
if ((*h)(s, NULL, NULL) != NGX_OK) {
|
||||
ngx_rtmp_finalize_session(s);
|
||||
return;
|
||||
}
|
||||
}
|
||||
if (ngx_rtmp_fire_event(s, NGX_RTMP_CONNECT, NULL, NULL) != NGX_OK) {
|
||||
ngx_rtmp_finalize_session(s);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ngx_rtmp_handshake(s);
|
||||
return s;
|
||||
}
|
||||
|
||||
|
||||
|
@ -248,9 +236,6 @@ ngx_rtmp_close_session_handler(ngx_event_t *e)
|
|||
ngx_connection_t *c;
|
||||
ngx_rtmp_core_main_conf_t *cmcf;
|
||||
ngx_rtmp_core_srv_conf_t *cscf;
|
||||
ngx_rtmp_handler_pt *h;
|
||||
ngx_array_t *dh;
|
||||
size_t n;
|
||||
|
||||
s = e->data;
|
||||
c = s->connection;
|
||||
|
@ -261,14 +246,7 @@ ngx_rtmp_close_session_handler(ngx_event_t *e)
|
|||
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "close session");
|
||||
|
||||
if (s) {
|
||||
dh = &cmcf->events[NGX_RTMP_DISCONNECT];
|
||||
h = dh->elts;
|
||||
|
||||
for(n = 0; n < dh->nelts; ++n, ++h) {
|
||||
if (*h) {
|
||||
(*h)(s, NULL, NULL);
|
||||
}
|
||||
}
|
||||
ngx_rtmp_fire_event(s, NGX_RTMP_DISCONNECT, NULL, NULL);
|
||||
|
||||
if (s->in_old_pool) {
|
||||
ngx_destroy_pool(s->in_old_pool);
|
||||
|
|
779
ngx_rtmp_relay_module.c
Normal file
779
ngx_rtmp_relay_module.c
Normal file
|
@ -0,0 +1,779 @@
|
|||
/*
|
||||
* Copyright (c) 2012 Roman Arutyunyan
|
||||
*/
|
||||
|
||||
|
||||
#include "ngx_rtmp.h"
|
||||
#include "ngx_rtmp_cmd_module.h"
|
||||
|
||||
|
||||
static ngx_rtmp_publish_pt next_publish;
|
||||
static ngx_rtmp_play_pt next_play;
|
||||
|
||||
|
||||
static ngx_int_t ngx_rtmp_relay_postconfiguration(ngx_conf_t *cf);
|
||||
static void * ngx_rtmp_relay_create_app_conf(ngx_conf_t *cf);
|
||||
static char * ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf,
|
||||
void *parent, void *child);
|
||||
static char * ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd,
|
||||
void *conf);
|
||||
static ngx_int_t ngx_rtmp_relay_publish(ngx_rtmp_session_t *s,
|
||||
ngx_rtmp_publish_t *v);
|
||||
|
||||
|
||||
/* _____
|
||||
* =push= | |---publish--->
|
||||
* ---publish--->| |---publish--->
|
||||
* (src) | |---publish--->
|
||||
* ----- (next,relay)
|
||||
* need reconnect
|
||||
* =pull= _____
|
||||
* -----play---->| |
|
||||
* -----play---->| |----play----->
|
||||
* -----play---->| | (src,relay)
|
||||
* (next) -----
|
||||
*/
|
||||
|
||||
|
||||
typedef struct {
|
||||
ngx_url_t url;
|
||||
ngx_str_t app;
|
||||
ngx_str_t name;
|
||||
unsigned push:1;
|
||||
} ngx_rtmp_relay_target_t;
|
||||
|
||||
|
||||
typedef struct ngx_rtmp_relay_ctx_s ngx_rtmp_relay_ctx_t;
|
||||
|
||||
struct ngx_rtmp_relay_ctx_s {
|
||||
ngx_str_t name;
|
||||
ngx_rtmp_session_t *session;
|
||||
ngx_rtmp_relay_target_t *target;
|
||||
ngx_rtmp_relay_ctx_t *src;
|
||||
ngx_rtmp_relay_ctx_t *dst;
|
||||
ngx_rtmp_relay_ctx_t *next;
|
||||
ngx_event_t recon;
|
||||
};
|
||||
|
||||
|
||||
typedef struct {
|
||||
ngx_array_t targets;
|
||||
ngx_log_t *log;
|
||||
ngx_uint_t nbuckets;
|
||||
ngx_rtmp_relay_ctx_t **ctx;
|
||||
} ngx_rtmp_relay_app_conf_t;
|
||||
|
||||
|
||||
#define NGX_RTMP_RELAY_CONNECT_TRANS 1
|
||||
#define NGX_RTMP_RELAY_CREATE_STREAM_TRANS 2
|
||||
|
||||
|
||||
#define NGX_RTMP_RELAY_CSID_AMF_INI 3
|
||||
#define NGX_RTMP_RELAY_CSID_AMF 5
|
||||
#define NGX_RTMP_RELAY_MSID 1
|
||||
|
||||
|
||||
/*
|
||||
push remoteapp mystream 192.168.0.10
|
||||
push mystream 192.168.0.10
|
||||
push 192.168.0.10
|
||||
*/
|
||||
|
||||
static ngx_command_t ngx_rtmp_relay_commands[] = {
|
||||
|
||||
{ ngx_string("push"),
|
||||
NGX_RTMP_APP_CONF|NGX_CONF_TAKE1|NGX_CONF_TAKE2|NGX_CONF_TAKE3,
|
||||
ngx_rtmp_relay_push_pull,
|
||||
NGX_RTMP_APP_CONF_OFFSET,
|
||||
0,
|
||||
NULL },
|
||||
|
||||
{ ngx_string("pull"),
|
||||
NGX_RTMP_APP_CONF| NGX_CONF_TAKE1|NGX_CONF_TAKE2|NGX_CONF_TAKE3,
|
||||
ngx_rtmp_relay_push_pull,
|
||||
NGX_RTMP_APP_CONF_OFFSET,
|
||||
0,
|
||||
NULL },
|
||||
|
||||
ngx_null_command
|
||||
};
|
||||
|
||||
|
||||
static ngx_rtmp_module_t ngx_rtmp_relay_module_ctx = {
|
||||
NULL, /* preconfiguration */
|
||||
ngx_rtmp_relay_postconfiguration, /* postconfiguration */
|
||||
NULL, /* create main configuration */
|
||||
NULL, /* init main configuration */
|
||||
NULL, /* create server configuration */
|
||||
NULL, /* merge server configuration */
|
||||
ngx_rtmp_relay_create_app_conf, /* create app configuration */
|
||||
ngx_rtmp_relay_merge_app_conf /* merge app configuration */
|
||||
};
|
||||
|
||||
|
||||
ngx_module_t ngx_rtmp_relay_module = {
|
||||
NGX_MODULE_V1,
|
||||
&ngx_rtmp_relay_module_ctx, /* module context */
|
||||
ngx_rtmp_relay_commands, /* module directives */
|
||||
NGX_RTMP_MODULE, /* module type */
|
||||
NULL, /* init master */
|
||||
NULL, /* init module */
|
||||
NULL, /* init process */
|
||||
NULL, /* init thread */
|
||||
NULL, /* exit thread */
|
||||
NULL, /* exit process */
|
||||
NULL, /* exit master */
|
||||
NGX_MODULE_V1_PADDING
|
||||
};
|
||||
|
||||
|
||||
static void *
|
||||
ngx_rtmp_relay_create_app_conf(ngx_conf_t *cf)
|
||||
{
|
||||
ngx_rtmp_relay_app_conf_t *racf;
|
||||
|
||||
racf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_relay_app_conf_t));
|
||||
if (racf == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ngx_array_init(&racf->targets, cf->pool, 1, sizeof(ngx_rtmp_relay_target_t));
|
||||
|
||||
racf->nbuckets = 1024;
|
||||
racf->log = &cf->cycle->new_log;
|
||||
|
||||
return racf;
|
||||
}
|
||||
|
||||
|
||||
static char *
|
||||
ngx_rtmp_relay_merge_app_conf(ngx_conf_t *cf, void *parent, void *child)
|
||||
{
|
||||
return NGX_CONF_OK;
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_get_peer(ngx_peer_connection_t *pc, void *data)
|
||||
{
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ngx_rtmp_relay_free_peer(ngx_peer_connection_t *pc, void *data,
|
||||
ngx_uint_t state)
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ngx_rtmp_relay_create(ngx_rtmp_relay_ctx_t *ctx,
|
||||
ngx_rtmp_relay_target_t *target)
|
||||
{
|
||||
ngx_rtmp_relay_app_conf_t *racf;
|
||||
ngx_peer_connection_t *pc;
|
||||
ngx_rtmp_session_t *rs;
|
||||
ngx_connection_t *c;
|
||||
ngx_pool_t *pool;
|
||||
ngx_int_t rc;
|
||||
ngx_rtmp_session_t *s;
|
||||
ngx_rtmp_relay_ctx_t *rctx, **cctx;
|
||||
ngx_uint_t hash;
|
||||
ngx_rtmp_addr_conf_t addr_conf;
|
||||
|
||||
racf = ngx_rtmp_get_module_app_conf(ctx->session, ngx_rtmp_relay_module);
|
||||
|
||||
pool = NULL;
|
||||
pool = ngx_create_pool(4096, racf->log);
|
||||
if (pool == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
rctx = ngx_pcalloc(pool, sizeof(ngx_rtmp_relay_ctx_t));
|
||||
if (rctx == NULL) {
|
||||
goto clear;
|
||||
}
|
||||
|
||||
rctx->target = target;
|
||||
rctx->name.len = ctx->name.len;
|
||||
rctx->name.data = ngx_palloc(pool, ctx->name.len);
|
||||
ngx_memcpy(rctx->name.data, ctx->name.data, ctx->name.len);
|
||||
|
||||
hash = ngx_hash_key(rctx->name.data, rctx->name.len);
|
||||
cctx = &racf->ctx[hash % racf->nbuckets];
|
||||
|
||||
if (target->push) {
|
||||
ctx->next = *cctx;
|
||||
*cctx = ctx;
|
||||
ctx->src = ctx;
|
||||
ctx->dst = rctx;
|
||||
rctx->src = ctx;
|
||||
} else {
|
||||
rctx->next = *cctx;
|
||||
*cctx = rctx;
|
||||
rctx->src = rctx;
|
||||
rctx->dst = ctx;
|
||||
ctx->src = rctx;
|
||||
}
|
||||
|
||||
/* connect */
|
||||
pc = ngx_pcalloc(pool, sizeof(ngx_peer_connection_t));
|
||||
if (pc == NULL) {
|
||||
goto clear;
|
||||
}
|
||||
|
||||
pc->log = racf->log;
|
||||
pc->get = ngx_rtmp_relay_get_peer;
|
||||
pc->free = ngx_rtmp_relay_free_peer;
|
||||
pc->name = &target->url.host;
|
||||
pc->socklen = target->url.socklen;
|
||||
pc->sockaddr = (struct sockaddr *)&target->url.sockaddr;
|
||||
|
||||
rc = ngx_event_connect_peer(pc);
|
||||
if (rc != NGX_OK && rc != NGX_AGAIN ) {
|
||||
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, racf->log, 0,
|
||||
"relay: connection failed");
|
||||
goto clear;
|
||||
}
|
||||
|
||||
c = pc->connection;
|
||||
c->pool = pool;
|
||||
|
||||
ngx_memzero(&addr_conf, sizeof(addr_conf));
|
||||
addr_conf.ctx->main_conf = s->main_conf;
|
||||
addr_conf.ctx->srv_conf = s->srv_conf;
|
||||
ngx_str_set(&addr_conf.addr_text, "ngx-relay");
|
||||
|
||||
rs = ngx_rtmp_init_session(c, &addr_conf);
|
||||
if (rs == NULL) {
|
||||
/* no need to destroy pool */
|
||||
return;
|
||||
}
|
||||
|
||||
rs->app_conf = ctx->session->app_conf;
|
||||
rctx->session = rs;
|
||||
ngx_rtmp_set_ctx(rs, rctx, ngx_rtmp_relay_module);
|
||||
|
||||
ngx_rtmp_client_handshake(s);
|
||||
|
||||
return;
|
||||
|
||||
clear:
|
||||
if (pool) {
|
||||
ngx_destroy_pool(pool);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_init(ngx_rtmp_session_t *s, u_char *name)
|
||||
{
|
||||
size_t n, len;
|
||||
ngx_rtmp_relay_target_t *target;
|
||||
ngx_uint_t hash;
|
||||
ngx_rtmp_relay_app_conf_t *racf;
|
||||
ngx_rtmp_relay_ctx_t *sctx, *ctx, **cctx;
|
||||
|
||||
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
|
||||
if (racf == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
len = ngx_strlen(name);
|
||||
|
||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
|
||||
if (ctx == NULL) {
|
||||
ctx = ngx_pcalloc(s->connection->pool, sizeof(ngx_rtmp_relay_ctx_t));
|
||||
if (ctx == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_relay_module);
|
||||
}
|
||||
ctx->session = s;
|
||||
ctx->name.len = len;
|
||||
ctx->name.data = ngx_palloc(s->connection->pool, len);
|
||||
ngx_memcpy(ctx->name.data, name, len);
|
||||
|
||||
/* find relay stream */
|
||||
hash = ngx_hash_key(name, len) % racf->nbuckets;
|
||||
cctx = &racf->ctx[hash];
|
||||
for (sctx = *cctx; sctx; sctx = sctx->next) {
|
||||
if (sctx->name.len == len
|
||||
&& !ngx_memcmp(name, sctx->name.data, len))
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (sctx) {
|
||||
/* add player to pull stream */
|
||||
if (sctx->target) {
|
||||
ctx->src = sctx->src;
|
||||
ctx->next = sctx->dst;
|
||||
sctx->dst = ctx;
|
||||
}
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
/* create relays */
|
||||
target = racf->targets.elts;
|
||||
for (n = 0; n < racf->targets.nelts; ++n, ++target) {
|
||||
if (target->name.len == 0 ||
|
||||
(len == target->name.len
|
||||
&& !ngx_memcmp(name, target->name.data, len)))
|
||||
{
|
||||
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
|
||||
"relay: create: name='%s' url='%V'",
|
||||
name, &target->url);
|
||||
|
||||
ngx_rtmp_relay_create(ctx, target);
|
||||
}
|
||||
}
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_publish(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v)
|
||||
{
|
||||
ngx_rtmp_relay_app_conf_t *racf;
|
||||
|
||||
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
|
||||
if (racf == NULL || racf->targets.nelts == 0) {
|
||||
goto next;
|
||||
}
|
||||
|
||||
ngx_rtmp_relay_init(s, v->name);
|
||||
|
||||
next:
|
||||
return next_publish(s, v);
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_play(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v)
|
||||
{
|
||||
ngx_rtmp_relay_app_conf_t *racf;
|
||||
|
||||
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
|
||||
if (racf == NULL || racf->targets.nelts == 0) {
|
||||
goto next;
|
||||
}
|
||||
|
||||
ngx_rtmp_relay_init(s, v->name);
|
||||
|
||||
next:
|
||||
return next_play(s, v);
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_play_local(ngx_rtmp_session_t *s)
|
||||
{
|
||||
ngx_rtmp_play_t v;
|
||||
ngx_rtmp_relay_ctx_t *ctx;
|
||||
|
||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
|
||||
if (ctx == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_memzero(&v, sizeof(ngx_rtmp_play_t));
|
||||
*(ngx_cpymem(v.name, ctx->name.data,
|
||||
ngx_min(sizeof(v.name) - 1, ctx->name.len))) = 0;
|
||||
|
||||
return ngx_rtmp_play(s, &v);
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_publish_local(ngx_rtmp_session_t *s)
|
||||
{
|
||||
ngx_rtmp_publish_t v;
|
||||
ngx_rtmp_relay_ctx_t *ctx;
|
||||
|
||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
|
||||
if (ctx == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
ngx_memzero(&v, sizeof(ngx_rtmp_publish_t));
|
||||
*(ngx_cpymem(v.name, ctx->name.data,
|
||||
ngx_min(sizeof(v.name) - 1, ctx->name.len))) = 0;
|
||||
|
||||
return ngx_rtmp_publish(s, &v);
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_send_connect(ngx_rtmp_session_t *s)
|
||||
{
|
||||
static double trans = NGX_RTMP_RELAY_CONNECT_TRANS;
|
||||
|
||||
static ngx_rtmp_amf_elt_t out_cmd[] = {
|
||||
|
||||
{ NGX_RTMP_AMF_STRING,
|
||||
ngx_string("app"),
|
||||
NULL, 0 }, /* <-- to fill */
|
||||
|
||||
{ NGX_RTMP_AMF_STRING,
|
||||
ngx_string("flashVer"),
|
||||
"ngx-rtmp-relay", 0 }
|
||||
};
|
||||
|
||||
static ngx_rtmp_amf_elt_t out_elts[] = {
|
||||
|
||||
{ NGX_RTMP_AMF_STRING,
|
||||
ngx_null_string,
|
||||
"connect", 0 },
|
||||
|
||||
{ NGX_RTMP_AMF_NUMBER,
|
||||
ngx_null_string,
|
||||
&trans, 0 },
|
||||
|
||||
{ NGX_RTMP_AMF_OBJECT,
|
||||
ngx_null_string,
|
||||
out_cmd, sizeof(out_cmd) }
|
||||
};
|
||||
|
||||
ngx_rtmp_relay_ctx_t *ctx;
|
||||
ngx_rtmp_header_t h;
|
||||
|
||||
|
||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
|
||||
if (ctx == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
out_cmd[0].data = ctx->target->app.data;
|
||||
out_cmd[0].len = ctx->target->app.len;
|
||||
|
||||
ngx_memzero(&h, sizeof(h));
|
||||
h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
|
||||
h.type = NGX_RTMP_MSG_AMF_CMD;
|
||||
|
||||
return ngx_rtmp_send_amf(s, &h, out_elts,
|
||||
sizeof(out_elts) / sizeof(out_elts[0]));
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_send_create_stream(ngx_rtmp_session_t *s)
|
||||
{
|
||||
static double trans = NGX_RTMP_RELAY_CREATE_STREAM_TRANS;
|
||||
|
||||
static ngx_rtmp_amf_elt_t out_elts[] = {
|
||||
|
||||
{ NGX_RTMP_AMF_STRING,
|
||||
ngx_null_string,
|
||||
"createStream", 0 },
|
||||
|
||||
{ NGX_RTMP_AMF_NUMBER,
|
||||
ngx_null_string,
|
||||
&trans, 0 }
|
||||
};
|
||||
|
||||
ngx_rtmp_header_t h;
|
||||
|
||||
|
||||
ngx_memzero(&h, sizeof(h));
|
||||
h.csid = NGX_RTMP_RELAY_CSID_AMF_INI;
|
||||
h.type = NGX_RTMP_MSG_AMF_CMD;
|
||||
|
||||
return ngx_rtmp_send_amf(s, &h, out_elts,
|
||||
sizeof(out_elts) / sizeof(out_elts[0]));
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_send_publish(ngx_rtmp_session_t *s)
|
||||
{
|
||||
static double trans;
|
||||
|
||||
static ngx_rtmp_amf_elt_t out_elts[] = {
|
||||
|
||||
{ NGX_RTMP_AMF_STRING,
|
||||
ngx_null_string,
|
||||
"publish", 0 },
|
||||
|
||||
{ NGX_RTMP_AMF_NUMBER,
|
||||
ngx_null_string,
|
||||
&trans, 0 },
|
||||
|
||||
{ NGX_RTMP_AMF_STRING,
|
||||
ngx_null_string,
|
||||
NULL, 0 }, /* <- to fill */
|
||||
|
||||
{ NGX_RTMP_AMF_STRING,
|
||||
ngx_null_string,
|
||||
"live", 0 }
|
||||
};
|
||||
|
||||
ngx_rtmp_header_t h;
|
||||
ngx_rtmp_relay_ctx_t *ctx;
|
||||
|
||||
|
||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
|
||||
if (ctx == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
out_elts[2].data = ctx->name.data;
|
||||
out_elts[2].len = ctx->name.len;
|
||||
|
||||
ngx_memzero(&h, sizeof(h));
|
||||
h.csid = NGX_RTMP_RELAY_CSID_AMF;
|
||||
h.type = NGX_RTMP_MSG_AMF_CMD;
|
||||
|
||||
return ngx_rtmp_send_amf(s, &h, out_elts,
|
||||
sizeof(out_elts) / sizeof(out_elts[0]));
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_send_play(ngx_rtmp_session_t *s)
|
||||
{
|
||||
static double trans;
|
||||
|
||||
static ngx_rtmp_amf_elt_t out_elts[] = {
|
||||
|
||||
{ NGX_RTMP_AMF_STRING,
|
||||
ngx_null_string,
|
||||
"play", 0 },
|
||||
|
||||
{ NGX_RTMP_AMF_NUMBER,
|
||||
ngx_null_string,
|
||||
&trans, 0 },
|
||||
|
||||
{ NGX_RTMP_AMF_STRING,
|
||||
ngx_null_string,
|
||||
NULL, 0 }, /* <- to fill */
|
||||
};
|
||||
|
||||
ngx_rtmp_header_t h;
|
||||
ngx_rtmp_relay_ctx_t *ctx;
|
||||
|
||||
|
||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
|
||||
if (ctx == NULL) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
out_elts[2].data = ctx->name.data;
|
||||
out_elts[2].len = ctx->name.len;
|
||||
|
||||
ngx_memzero(&h, sizeof(h));
|
||||
h.csid = NGX_RTMP_RELAY_CSID_AMF;
|
||||
h.type = NGX_RTMP_MSG_AMF_CMD;
|
||||
|
||||
return ngx_rtmp_send_amf(s, &h, out_elts,
|
||||
sizeof(out_elts) / sizeof(out_elts[0]));
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_on_result(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
|
||||
ngx_chain_t *in)
|
||||
{
|
||||
static double trans;
|
||||
ngx_rtmp_relay_ctx_t *ctx;
|
||||
|
||||
static ngx_rtmp_amf_elt_t in_elts[] = {
|
||||
|
||||
{ NGX_RTMP_AMF_NUMBER,
|
||||
ngx_null_string,
|
||||
&trans, 0 },
|
||||
};
|
||||
|
||||
|
||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
|
||||
if (ctx == NULL || ctx->target == NULL) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
/* TODO: add analyzing <code> for errors */
|
||||
trans = 0;
|
||||
if (ngx_rtmp_receive_amf(s, in, in_elts,
|
||||
sizeof(in_elts) / sizeof(in_elts[0])))
|
||||
{
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
switch ((ngx_int_t)trans) {
|
||||
case NGX_RTMP_RELAY_CONNECT_TRANS:
|
||||
return ngx_rtmp_relay_send_create_stream(s);
|
||||
|
||||
case NGX_RTMP_RELAY_CREATE_STREAM_TRANS:
|
||||
if (ctx->target->push) {
|
||||
if (ngx_rtmp_relay_send_publish(s) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
return ngx_rtmp_relay_play_local(s);
|
||||
|
||||
} else {
|
||||
if (ngx_rtmp_relay_send_play(s) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
return ngx_rtmp_relay_publish_local(s);
|
||||
}
|
||||
|
||||
default:
|
||||
return NGX_OK;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_handshake_done(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
|
||||
ngx_chain_t *in)
|
||||
{
|
||||
ngx_rtmp_relay_ctx_t *ctx;
|
||||
|
||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
|
||||
if (ctx == NULL || ctx->src == NULL) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
return ngx_rtmp_relay_send_connect(s);
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_disconnect(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
|
||||
ngx_chain_t *in)
|
||||
{
|
||||
ngx_rtmp_relay_app_conf_t *racf;
|
||||
ngx_rtmp_relay_ctx_t *ctx, **cctx;
|
||||
ngx_uint_t hash;
|
||||
|
||||
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_relay_module);
|
||||
if (ctx == NULL || ctx->src == NULL) {
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
/* destination end disconnecting */
|
||||
if (ctx->src != ctx) {
|
||||
for (cctx = &ctx->src->dst; *cctx; cctx = &(*cctx)->next) {
|
||||
if (*cctx == ctx) {
|
||||
*cctx = ctx->next;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*TODO: add push reconnect */
|
||||
/* if (ctx->target) {...} */
|
||||
|
||||
if (ctx->src->dst == NULL) {
|
||||
ngx_rtmp_finalize_session(ctx->src->session);
|
||||
}
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
/* source end disconnecting */
|
||||
for (cctx = &ctx->src->dst; *cctx; cctx = &(*cctx)->next) {
|
||||
(*cctx)->src = NULL;
|
||||
ngx_rtmp_finalize_session((*cctx)->session);
|
||||
}
|
||||
ctx->src = NULL;
|
||||
|
||||
racf = ngx_rtmp_get_module_app_conf(s, ngx_rtmp_relay_module);
|
||||
hash = ngx_hash_key(ctx->name.data, ctx->name.len);
|
||||
cctx = &racf->ctx[hash % racf->nbuckets];
|
||||
for (; *cctx && *cctx != ctx; cctx = &(*cctx)->next);
|
||||
if (*cctx) {
|
||||
*cctx = ctx->next;
|
||||
}
|
||||
ngx_rtmp_finalize_session(ctx->session);
|
||||
|
||||
return NGX_OK;
|
||||
}
|
||||
|
||||
|
||||
static char *
|
||||
ngx_rtmp_relay_push_pull(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
|
||||
{
|
||||
ngx_str_t *value;
|
||||
ngx_rtmp_core_app_conf_t *cacf;
|
||||
ngx_rtmp_relay_app_conf_t *racf;
|
||||
ngx_rtmp_relay_target_t *target;
|
||||
ngx_url_t *u;
|
||||
|
||||
value = cf->args->elts;
|
||||
|
||||
cacf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_core_module);
|
||||
racf = ngx_rtmp_conf_get_module_app_conf(cf, ngx_rtmp_relay_module);
|
||||
|
||||
target = ngx_array_push(&racf->targets);
|
||||
ngx_memzero(target, sizeof(ngx_rtmp_relay_target_t));
|
||||
if (value[0].data[3] == 'h') { /* push */
|
||||
target->push = 1;
|
||||
}
|
||||
|
||||
u = &target->url;
|
||||
u->default_port = 1935;
|
||||
u->uri_part = 1;
|
||||
|
||||
switch (cf->args->nelts) {
|
||||
case 4:
|
||||
target->app = value[1];
|
||||
target->name = value[2];
|
||||
u->url = value[3];
|
||||
break;
|
||||
|
||||
case 3:
|
||||
target->app = cacf->name;
|
||||
target->name = value[1];
|
||||
u->url = value[2];
|
||||
break;
|
||||
|
||||
case 2:
|
||||
target->app = cacf->name;
|
||||
u->url = value[1];
|
||||
break;
|
||||
}
|
||||
|
||||
if (ngx_parse_url(cf->pool, u) != NGX_OK) {
|
||||
if (u->err) {
|
||||
ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
|
||||
"%s in url \"%V\"", u->err, &u->url);
|
||||
}
|
||||
return NGX_CONF_ERROR;
|
||||
}
|
||||
|
||||
return NGX_CONF_OK;
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_rtmp_relay_postconfiguration(ngx_conf_t *cf)
|
||||
{
|
||||
ngx_rtmp_core_main_conf_t *cmcf;
|
||||
ngx_rtmp_handler_pt *h;
|
||||
ngx_rtmp_amf_handler_t *ch;
|
||||
|
||||
cmcf = ngx_rtmp_conf_get_module_main_conf(cf, ngx_rtmp_core_module);
|
||||
|
||||
|
||||
h = ngx_array_push(&cmcf->events[NGX_RTMP_HANDSHAKE_DONE]);
|
||||
*h = ngx_rtmp_relay_handshake_done;
|
||||
|
||||
h = ngx_array_push(&cmcf->events[NGX_RTMP_DISCONNECT]);
|
||||
*h = ngx_rtmp_relay_disconnect;
|
||||
|
||||
|
||||
next_publish = ngx_rtmp_publish;
|
||||
ngx_rtmp_publish = ngx_rtmp_relay_publish;
|
||||
|
||||
next_play = ngx_rtmp_play;
|
||||
ngx_rtmp_play = ngx_rtmp_relay_play;
|
||||
|
||||
|
||||
ch = ngx_array_push(&cmcf->amf);
|
||||
ngx_str_set(&ch->name, "_result");
|
||||
ch->handler = ngx_rtmp_relay_on_result;
|
||||
|
||||
return NGX_OK;
|
||||
}
|
Loading…
Reference in a new issue