improved handshake to match specs

This commit is contained in:
Roman Arutyunyan 2012-03-15 10:23:49 +04:00
parent 03960eb7b4
commit 78ea1b1266
2 changed files with 109 additions and 48 deletions

View file

@ -96,14 +96,23 @@ typedef struct {
} ngx_rtmp_conf_addr_t;
#define NGX_RTMP_VERSION 3
#define NGX_LOG_DEBUG_RTMP NGX_LOG_DEBUG_CORE
#define NGX_RTMP_HANDSHAKE_SIZE 1536
#define NGX_RTMP_DEFAULT_CHUNK_SIZE 128
#define NGX_LOG_DEBUG_RTMP NGX_LOG_DEBUG_CORE
/* RTMP handshake stages */
#define NGX_RTMP_HS_READ_DATA 0
#define NGX_RTMP_HS_WRITE_DATA 1
#define NGX_RTMP_HS_WRITE_ECHO 2
#define NGX_RTMP_HS_READ_ECHO 3
/* RTMP message types*/
/* RTMP message types */
#define NGX_RTMP_MSG_CHUNK_SIZE 1
#define NGX_RTMP_MSG_ABORT 2
#define NGX_RTMP_MSG_ACK 3
@ -142,7 +151,7 @@ typedef struct {
typedef struct {
uint32_t csid; /* chunk stream id */
uint32_t timestamp;
uint32_t timestamp; /* timestamp (delta) */
uint32_t mlen; /* message length */
uint8_t type; /* message type id */
uint32_t msid; /* message stream id */
@ -167,12 +176,15 @@ typedef struct ngx_rtmp_session_s {
ngx_str_t *addr_text;
/* handshake */
ngx_buf_t buf;
ngx_buf_t hs_buf;
ngx_uint_t hs_stage;
/* connection timestamps */
uint32_t epoch;
uint32_t peer_epoch;
/* input stream 0 (reserved by RTMP spec)
* used for free chain link */
* is used as free chain link */
ngx_rtmp_stream_t *in_streams;
uint32_t in_csid;
@ -260,6 +272,7 @@ typedef struct {
void ngx_rtmp_init_connection(ngx_connection_t *c);
void ngx_rtmp_close_connection(ngx_connection_t *c);
u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len);
uint32_t ngx_rtmp_get_timestamp();
/* Receiving messages */

View file

@ -23,6 +23,11 @@ static void ngx_rtmp_send(ngx_event_t *rev);
static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
static char handshake_text[] =
"This RTMP handshake is generated by nginx-rtmp-module. ";
#ifdef NGX_DEBUG
static char*
ngx_rtmp_packet_type(uint8_t type) {
@ -221,8 +226,9 @@ ngx_rtmp_init_session(ngx_connection_t *c)
+ sizeof(ngx_pool_t), c->log);
/* start handshake */
b = &s->buf;
size = NGX_RTMP_HANDSHAKE_SIZE + 1;
b = &s->hs_buf;
b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size);
b->end = b->start + size;
b->temporary = 1;
@ -234,14 +240,31 @@ ngx_rtmp_init_session(ngx_connection_t *c)
}
uint32_t
ngx_rtmp_get_timestamp()
{
ngx_time_t *tod;
uint32_t t;
tod = ngx_timeofday();
t = tod->sec;
t *= 1e6;
t += tod->msec;
return t;
}
void
ngx_rtmp_handshake_recv(ngx_event_t *rev)
{
ssize_t n;
ngx_connection_t *c;
ngx_rtmp_session_t *s;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_buf_t *b;
ssize_t n;
ngx_connection_t *c;
ngx_rtmp_session_t *s;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_buf_t *b;
u_char *p;
c = rev->data;
s = c->data;
@ -262,9 +285,9 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
ngx_del_timer(rev);
}
b = &s->buf;
b = &s->hs_buf;
while(b->last != b->end) {
while (b->last != b->end) {
n = c->recv(c, b->last, b->end - b->last);
@ -273,18 +296,6 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
return;
}
if (n > 0) {
if (b->last == b->start
&& s->hs_stage == 0 && *b->last != '\x03')
{
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"invalid handshake signature");
ngx_rtmp_close_connection(c);
return;
}
b->last += n;
}
if (n == NGX_AGAIN) {
ngx_add_timer(rev, cscf->timeout);
if (ngx_handle_read_event(c->read, 0) != NGX_OK) {
@ -292,11 +303,36 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
}
return;
}
if (b->last == b->start
&& s->hs_stage == NGX_RTMP_HS_READ_DATA
&& *b->last != NGX_RTMP_VERSION)
{
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR,
"invalid handshake signature");
ngx_rtmp_close_connection(c);
return;
}
b->last += n;
}
ngx_del_event(c->read, NGX_READ_EVENT, 0);
if (s->hs_stage++ == 0) {
++s->hs_stage;
if (s->hs_stage == NGX_RTMP_HS_WRITE_DATA) {
s->epoch = ngx_rtmp_get_timestamp();
p = (u_char*)&s->peer_epoch;
*p++ = b->pos[3];
*p++ = b->pos[2];
*p++ = b->pos[1];
*p++ = b->pos[0];
p = (u_char*)&s->epoch;
b->pos[7] = *p++;
b->pos[6] = *p++;
b->pos[5] = *p++;
b->pos[4] = *p++;
ngx_rtmp_handshake_send(c->write);
return;
}
@ -307,8 +343,9 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
c->read->handler = ngx_rtmp_recv;
c->write->handler = ngx_rtmp_send;
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP handshake done");
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP handshake done; epoch=%D peer_epoch=%d",
s->epoch, s->peer_epoch);
ngx_rtmp_recv(rev);
}
@ -317,11 +354,12 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev)
void
ngx_rtmp_handshake_send(ngx_event_t *wev)
{
ngx_int_t n;
ngx_connection_t *c;
ngx_rtmp_session_t *s;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_buf_t *b;
ngx_int_t n;
ngx_connection_t *c;
ngx_rtmp_session_t *s;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_buf_t *b;
size_t offs;
c = wev->data;
s = c->data;
@ -332,7 +370,8 @@ ngx_rtmp_handshake_send(ngx_event_t *wev)
}
if (wev->timedout) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT,
"client timed out");
c->timedout = 1;
ngx_rtmp_close_connection(c);
return;
@ -342,11 +381,26 @@ ngx_rtmp_handshake_send(ngx_event_t *wev)
ngx_del_timer(wev);
}
b = &s->buf;
b = &s->hs_buf;
restart:
while(b->pos != b->last) {
n = c->send(c, b->pos, b->last - b->pos);
if (s->hs_stage == NGX_RTMP_HS_WRITE_ECHO) {
n = c->send(c, b->pos, b->last - b->pos);
} else if (b->pos - b->start < 4) {
/* use the timestamp from echo packet */
n = c->send(c, b->pos + 4, 4 - (b->pos - b->start));
} else {
offs = (b->pos - b->start - 4) % sizeof(handshake_text);
n = c->send(c, (u_char*)handshake_text + offs,
ngx_min(b->last - b->pos,
(ngx_int_t)(sizeof(handshake_text) - offs)));
}
if (n > 0) {
b->pos += n;
}
@ -365,13 +419,15 @@ restart:
}
}
if (s->hs_stage++ == 1) {
++s->hs_stage;
if (s->hs_stage == NGX_RTMP_HS_WRITE_ECHO) {
b->pos = b->start + 1;
goto restart;
}
b->pos = b->last = b->start + 1;
ngx_del_event(wev, NGX_WRITE_EVENT, 0);
b->pos = b->last = b->start + 1;
ngx_rtmp_handshake_recv(c->read);
}
@ -441,15 +497,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"reusing formerly read data: %d", old_size);
#if 0
/* DEBUG! */
{
size_t i;
for(i = 0; i < 16 && i < old_size; ++i) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, "reuse %d", (int)*(old_pos + i));
}
}
#endif
b->pos = b->start;
b->last = ngx_movemem(b->pos, old_pos, old_size);