From 78ea1b12666c73eed207f5ff385398912869a0d1 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Thu, 15 Mar 2012 10:23:49 +0400 Subject: [PATCH] improved handshake to match specs --- ngx_rtmp.h | 25 ++++++--- ngx_rtmp_handler.c | 132 ++++++++++++++++++++++++++++++--------------- 2 files changed, 109 insertions(+), 48 deletions(-) diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 6d820f8..4c3d6c5 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -96,14 +96,23 @@ typedef struct { } ngx_rtmp_conf_addr_t; +#define NGX_RTMP_VERSION 3 + +#define NGX_LOG_DEBUG_RTMP NGX_LOG_DEBUG_CORE + #define NGX_RTMP_HANDSHAKE_SIZE 1536 #define NGX_RTMP_DEFAULT_CHUNK_SIZE 128 -#define NGX_LOG_DEBUG_RTMP NGX_LOG_DEBUG_CORE + +/* RTMP handshake stages */ +#define NGX_RTMP_HS_READ_DATA 0 +#define NGX_RTMP_HS_WRITE_DATA 1 +#define NGX_RTMP_HS_WRITE_ECHO 2 +#define NGX_RTMP_HS_READ_ECHO 3 -/* RTMP message types*/ +/* RTMP message types */ #define NGX_RTMP_MSG_CHUNK_SIZE 1 #define NGX_RTMP_MSG_ABORT 2 #define NGX_RTMP_MSG_ACK 3 @@ -142,7 +151,7 @@ typedef struct { typedef struct { uint32_t csid; /* chunk stream id */ - uint32_t timestamp; + uint32_t timestamp; /* timestamp (delta) */ uint32_t mlen; /* message length */ uint8_t type; /* message type id */ uint32_t msid; /* message stream id */ @@ -167,12 +176,15 @@ typedef struct ngx_rtmp_session_s { ngx_str_t *addr_text; - /* handshake */ - ngx_buf_t buf; + ngx_buf_t hs_buf; ngx_uint_t hs_stage; + /* connection timestamps */ + uint32_t epoch; + uint32_t peer_epoch; + /* input stream 0 (reserved by RTMP spec) - * used for free chain link */ + * is used as free chain link */ ngx_rtmp_stream_t *in_streams; uint32_t in_csid; @@ -260,6 +272,7 @@ typedef struct { void ngx_rtmp_init_connection(ngx_connection_t *c); void ngx_rtmp_close_connection(ngx_connection_t *c); u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len); +uint32_t ngx_rtmp_get_timestamp(); /* Receiving messages */ diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index 98b9f9a..3e6bc55 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -23,6 +23,11 @@ static void ngx_rtmp_send(ngx_event_t *rev); static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, ngx_chain_t *in); + +static char handshake_text[] = + "This RTMP handshake is generated by nginx-rtmp-module. "; + + #ifdef NGX_DEBUG static char* ngx_rtmp_packet_type(uint8_t type) { @@ -221,8 +226,9 @@ ngx_rtmp_init_session(ngx_connection_t *c) + sizeof(ngx_pool_t), c->log); /* start handshake */ - b = &s->buf; size = NGX_RTMP_HANDSHAKE_SIZE + 1; + + b = &s->hs_buf; b->start = b->pos = b->last = ngx_pcalloc(s->in_pool, size); b->end = b->start + size; b->temporary = 1; @@ -234,14 +240,31 @@ ngx_rtmp_init_session(ngx_connection_t *c) } +uint32_t +ngx_rtmp_get_timestamp() +{ + ngx_time_t *tod; + uint32_t t; + + tod = ngx_timeofday(); + + t = tod->sec; + t *= 1e6; + t += tod->msec; + + return t; +} + + void ngx_rtmp_handshake_recv(ngx_event_t *rev) { - ssize_t n; - ngx_connection_t *c; - ngx_rtmp_session_t *s; - ngx_rtmp_core_srv_conf_t *cscf; - ngx_buf_t *b; + ssize_t n; + ngx_connection_t *c; + ngx_rtmp_session_t *s; + ngx_rtmp_core_srv_conf_t *cscf; + ngx_buf_t *b; + u_char *p; c = rev->data; s = c->data; @@ -262,9 +285,9 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) ngx_del_timer(rev); } - b = &s->buf; + b = &s->hs_buf; - while(b->last != b->end) { + while (b->last != b->end) { n = c->recv(c, b->last, b->end - b->last); @@ -273,18 +296,6 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) return; } - if (n > 0) { - if (b->last == b->start - && s->hs_stage == 0 && *b->last != '\x03') - { - ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, - "invalid handshake signature"); - ngx_rtmp_close_connection(c); - return; - } - b->last += n; - } - if (n == NGX_AGAIN) { ngx_add_timer(rev, cscf->timeout); if (ngx_handle_read_event(c->read, 0) != NGX_OK) { @@ -292,11 +303,36 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) } return; } + + if (b->last == b->start + && s->hs_stage == NGX_RTMP_HS_READ_DATA + && *b->last != NGX_RTMP_VERSION) + { + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, + "invalid handshake signature"); + ngx_rtmp_close_connection(c); + return; + } + + b->last += n; } ngx_del_event(c->read, NGX_READ_EVENT, 0); - if (s->hs_stage++ == 0) { + ++s->hs_stage; + + if (s->hs_stage == NGX_RTMP_HS_WRITE_DATA) { + s->epoch = ngx_rtmp_get_timestamp(); + p = (u_char*)&s->peer_epoch; + *p++ = b->pos[3]; + *p++ = b->pos[2]; + *p++ = b->pos[1]; + *p++ = b->pos[0]; + p = (u_char*)&s->epoch; + b->pos[7] = *p++; + b->pos[6] = *p++; + b->pos[5] = *p++; + b->pos[4] = *p++; ngx_rtmp_handshake_send(c->write); return; } @@ -307,8 +343,9 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) c->read->handler = ngx_rtmp_recv; c->write->handler = ngx_rtmp_send; - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP handshake done"); + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP handshake done; epoch=%D peer_epoch=%d", + s->epoch, s->peer_epoch); ngx_rtmp_recv(rev); } @@ -317,11 +354,12 @@ ngx_rtmp_handshake_recv(ngx_event_t *rev) void ngx_rtmp_handshake_send(ngx_event_t *wev) { - ngx_int_t n; - ngx_connection_t *c; - ngx_rtmp_session_t *s; - ngx_rtmp_core_srv_conf_t *cscf; - ngx_buf_t *b; + ngx_int_t n; + ngx_connection_t *c; + ngx_rtmp_session_t *s; + ngx_rtmp_core_srv_conf_t *cscf; + ngx_buf_t *b; + size_t offs; c = wev->data; s = c->data; @@ -332,7 +370,8 @@ ngx_rtmp_handshake_send(ngx_event_t *wev) } if (wev->timedout) { - ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, + "client timed out"); c->timedout = 1; ngx_rtmp_close_connection(c); return; @@ -342,11 +381,26 @@ ngx_rtmp_handshake_send(ngx_event_t *wev) ngx_del_timer(wev); } - b = &s->buf; + b = &s->hs_buf; restart: + while(b->pos != b->last) { - n = c->send(c, b->pos, b->last - b->pos); + + if (s->hs_stage == NGX_RTMP_HS_WRITE_ECHO) { + n = c->send(c, b->pos, b->last - b->pos); + + } else if (b->pos - b->start < 4) { + /* use the timestamp from echo packet */ + n = c->send(c, b->pos + 4, 4 - (b->pos - b->start)); + + } else { + offs = (b->pos - b->start - 4) % sizeof(handshake_text); + n = c->send(c, (u_char*)handshake_text + offs, + ngx_min(b->last - b->pos, + (ngx_int_t)(sizeof(handshake_text) - offs))); + } + if (n > 0) { b->pos += n; } @@ -365,13 +419,15 @@ restart: } } - if (s->hs_stage++ == 1) { + ++s->hs_stage; + + if (s->hs_stage == NGX_RTMP_HS_WRITE_ECHO) { b->pos = b->start + 1; goto restart; } - b->pos = b->last = b->start + 1; ngx_del_event(wev, NGX_WRITE_EVENT, 0); + b->pos = b->last = b->start + 1; ngx_rtmp_handshake_recv(c->read); } @@ -441,15 +497,7 @@ ngx_rtmp_recv(ngx_event_t *rev) ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, "reusing formerly read data: %d", old_size); -#if 0 - /* DEBUG! */ - { - size_t i; - for(i = 0; i < 16 && i < old_size; ++i) { - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, "reuse %d", (int)*(old_pos + i)); - } - } -#endif + b->pos = b->start; b->last = ngx_movemem(b->pos, old_pos, old_size);