From b439dacb8214b79ebef39b708ecbf9c55f3eb545 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Tue, 13 Nov 2012 21:29:05 +0400 Subject: [PATCH] implemented RTMP control handler chains --- ngx_rtmp_cmd_module.c | 49 ++++++++++++++++++ ngx_rtmp_cmd_module.h | 37 ++++++++++++-- ngx_rtmp_receive.c | 113 +++++++++++++++++++++++++++++------------- 3 files changed, 161 insertions(+), 38 deletions(-) diff --git a/ngx_rtmp_cmd_module.c b/ngx_rtmp_cmd_module.c index 224942f..0611914 100644 --- a/ngx_rtmp_cmd_module.c +++ b/ngx_rtmp_cmd_module.c @@ -20,6 +20,13 @@ ngx_rtmp_seek_pt ngx_rtmp_seek; ngx_rtmp_pause_pt ngx_rtmp_pause; +ngx_rtmp_stream_begin_pt ngx_rtmp_stream_begin; +ngx_rtmp_stream_eof_pt ngx_rtmp_stream_eof; +ngx_rtmp_stream_dry_pt ngx_rtmp_stream_dry; +ngx_rtmp_recorded_pt ngx_rtmp_recorded; +ngx_rtmp_set_buflen_pt ngx_rtmp_set_buflen; + + static ngx_int_t ngx_rtmp_cmd_postconfiguration(ngx_conf_t *cf); @@ -629,6 +636,42 @@ ngx_rtmp_cmd_seek(ngx_rtmp_session_t *s, ngx_rtmp_seek_t *v) } +static ngx_int_t +ngx_rtmp_cmd_stream_begin(ngx_rtmp_session_t *s, ngx_rtmp_stream_begin_t *v) +{ + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_stream_eof(ngx_rtmp_session_t *s, ngx_rtmp_stream_eof_t *v) +{ + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_stream_dry(ngx_rtmp_session_t *s, ngx_rtmp_stream_dry_t *v) +{ + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_recorded(ngx_rtmp_session_t *s, + ngx_rtmp_recorded_t *v) +{ + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_cmd_set_buflen(ngx_rtmp_session_t *s, ngx_rtmp_set_buflen_t *v) +{ + return NGX_OK; +} + + static ngx_rtmp_amf_handler_t ngx_rtmp_cmd_map[] = { { ngx_string("connect"), ngx_rtmp_cmd_connect_init }, { ngx_string("createStream"), ngx_rtmp_cmd_create_stream_init }, @@ -689,5 +732,11 @@ ngx_rtmp_cmd_postconfiguration(ngx_conf_t *cf) ngx_rtmp_seek = ngx_rtmp_cmd_seek; ngx_rtmp_pause = ngx_rtmp_cmd_pause; + ngx_rtmp_stream_begin = ngx_rtmp_cmd_stream_begin; + ngx_rtmp_stream_eof = ngx_rtmp_cmd_stream_eof; + ngx_rtmp_stream_dry = ngx_rtmp_cmd_stream_dry; + ngx_rtmp_recorded = ngx_rtmp_cmd_recorded; + ngx_rtmp_set_buflen = ngx_rtmp_cmd_set_buflen; + return NGX_OK; } diff --git a/ngx_rtmp_cmd_module.h b/ngx_rtmp_cmd_module.h index 724418d..9abce21 100644 --- a/ngx_rtmp_cmd_module.h +++ b/ngx_rtmp_cmd_module.h @@ -79,6 +79,23 @@ typedef struct { } ngx_rtmp_pause_t; +typedef struct { + uint32_t msid; +} ngx_rtmp_msid_t; + + +typedef ngx_rtmp_msid_t ngx_rtmp_stream_begin_t; +typedef ngx_rtmp_msid_t ngx_rtmp_stream_eof_t; +typedef ngx_rtmp_msid_t ngx_rtmp_stream_dry_t; +typedef ngx_rtmp_msid_t ngx_rtmp_recorded_t; + + +typedef struct { + uint32_t msid; + uint32_t buflen; +} ngx_rtmp_set_buflen_t; + + typedef ngx_int_t (*ngx_rtmp_connect_pt)(ngx_rtmp_session_t *s, ngx_rtmp_connect_t *v); typedef ngx_int_t (*ngx_rtmp_create_stream_pt)(ngx_rtmp_session_t *s, @@ -87,18 +104,26 @@ typedef ngx_int_t (*ngx_rtmp_close_stream_pt)(ngx_rtmp_session_t *s, ngx_rtmp_close_stream_t *v); typedef ngx_int_t (*ngx_rtmp_delete_stream_pt)(ngx_rtmp_session_t *s, ngx_rtmp_delete_stream_t *v); - typedef ngx_int_t (*ngx_rtmp_publish_pt)(ngx_rtmp_session_t *s, ngx_rtmp_publish_t *v); - typedef ngx_int_t (*ngx_rtmp_play_pt)(ngx_rtmp_session_t *s, ngx_rtmp_play_t *v); - typedef ngx_int_t (*ngx_rtmp_seek_pt)(ngx_rtmp_session_t *s, ngx_rtmp_seek_t *v); typedef ngx_int_t (*ngx_rtmp_pause_pt)(ngx_rtmp_session_t *s, ngx_rtmp_pause_t *v); +typedef ngx_int_t (*ngx_rtmp_stream_begin_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_stream_begin_t *v); +typedef ngx_int_t (*ngx_rtmp_stream_eof_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_stream_eof_t *v); +typedef ngx_int_t (*ngx_rtmp_stream_dry_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_stream_dry_t *v); +typedef ngx_int_t (*ngx_rtmp_recorded_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_recorded_t *v); +typedef ngx_int_t (*ngx_rtmp_set_buflen_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_set_buflen_t *v); + extern ngx_rtmp_connect_pt ngx_rtmp_connect; extern ngx_rtmp_create_stream_pt ngx_rtmp_create_stream; @@ -109,5 +134,11 @@ extern ngx_rtmp_play_pt ngx_rtmp_play; extern ngx_rtmp_seek_pt ngx_rtmp_seek; extern ngx_rtmp_pause_pt ngx_rtmp_pause; +extern ngx_rtmp_stream_begin_pt ngx_rtmp_stream_begin; +extern ngx_rtmp_stream_eof_pt ngx_rtmp_stream_eof; +extern ngx_rtmp_stream_dry_pt ngx_rtmp_stream_dry; +extern ngx_rtmp_set_buflen_pt ngx_rtmp_set_buflen; +extern ngx_rtmp_recorded_pt ngx_rtmp_recorded; + #endif /*_NGX_RTMP_CMD_H_INCLUDED_ */ diff --git a/ngx_rtmp_receive.c b/ngx_rtmp_receive.c index db26ae9..661aa02 100644 --- a/ngx_rtmp_receive.c +++ b/ngx_rtmp_receive.c @@ -5,6 +5,7 @@ #include "ngx_rtmp.h" #include "ngx_rtmp_amf.h" +#include "ngx_rtmp_cmd_module.h" #include @@ -79,32 +80,34 @@ ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s, ngx_int_t -ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s, - ngx_rtmp_header_t *h, ngx_chain_t *in) +ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h, + ngx_chain_t *in) { ngx_buf_t *b; u_char *p; uint16_t evt; - uint32_t val, arg; + uint32_t val; b = in->buf; if (b->last - b->pos < 6) { ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "too small buffer for user message: %d", - b->last - b->pos); + "too small buffer for user message: %d", + b->last - b->pos); return NGX_OK; } p = (u_char*)&evt; + p[0] = b->pos[1]; p[1] = b->pos[0]; ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "RTMP recv user evt %s (%d)", - ngx_rtmp_user_message_type(evt), (int)evt); + "RTMP recv user evt %s (%i)", + ngx_rtmp_user_message_type(evt), (ngx_int_t) evt); + + p = (u_char *) &val; - p = (u_char*)&val; p[0] = b->pos[5]; p[1] = b->pos[4]; p[2] = b->pos[3]; @@ -112,54 +115,95 @@ ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s, switch(evt) { case NGX_RTMP_USER_STREAM_BEGIN: - /* use =val as stream id which started */ - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "stream begin msid=%uD", val); - break; + { + ngx_rtmp_stream_begin_t v; + + v.msid = val; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "receive: stream_begin msid=%uD", v.msid); + + return ngx_rtmp_stream_begin(s, &v); + } case NGX_RTMP_USER_STREAM_EOF: - /* use =val as stream id which is over */ - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "stream eof msid=%uD", val); - break; + { + ngx_rtmp_stream_eof_t v; + + v.msid = val; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "receive: stream_eof msid=%uD", v.msid); + + return ngx_rtmp_stream_eof(s, &v); + } case NGX_RTMP_USER_STREAM_DRY: - /* stream =val is dry */ - break; + { + ngx_rtmp_stream_dry_t v; + + v.msid = val; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "receive: stream_dry msid=%uD", v.msid); + + return ngx_rtmp_stream_dry(s, &v); + } case NGX_RTMP_USER_SET_BUFLEN: - if (b->last - b->pos >= 10) { - p = (u_char*)&arg; + { + ngx_rtmp_set_buflen_t v; + + v.msid = val; + + if (b->last - b->pos < 10) { + return NGX_OK; + } + + p = (u_char *) &v.buflen; + p[0] = b->pos[9]; p[1] = b->pos[8]; p[2] = b->pos[7]; p[3] = b->pos[6]; - /* use =val as stream id && arg as buflen in msec*/ ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "msid=%uD buflen: %uD (msec)", val, arg); + "receive: set_buflen msid=%uD buflen=%uD", + v.msid, v.buflen); - s->buflen = arg; + /*TODO: move this to play module */ + s->buflen = v.buflen; + + return ngx_rtmp_set_buflen(s, &v); + } + + case NGX_RTMP_USER_RECORDED: + { + ngx_rtmp_recorded_t v; + + v.msid = val; + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, + "receive: recorded msid=%uD", v.msid); + + return ngx_rtmp_recorded(s, &v); } break; - case NGX_RTMP_USER_RECORDED: - /* stream =val is recorded */ - break; - case NGX_RTMP_USER_PING_REQUEST: - ngx_rtmp_send_ping_response(s, val); - break; + return ngx_rtmp_send_ping_response(s, val); case NGX_RTMP_USER_PING_RESPONSE: - /* use =val as incoming timestamp */ + + /* val = incoming timestamp */ + ngx_rtmp_reset_ping(s); - break; + + return NGX_OK; default: ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0, - "unexpected user event: %d", - (int)evt); + "unexpected user event: %i", (ngx_int_t) evt); return NGX_OK; } @@ -265,5 +309,4 @@ ngx_rtmp_receive_amf(ngx_rtmp_session_t *s, ngx_chain_t *in, act.log = s->connection->log; return ngx_rtmp_amf_read(&act, elts, nelts); -} - +}