implemented RTMP control handler chains

This commit is contained in:
Roman Arutyunyan 2012-11-13 21:29:05 +04:00
parent 5999daebc2
commit b439dacb82
3 changed files with 161 additions and 38 deletions

View file

@ -20,6 +20,13 @@ ngx_rtmp_seek_pt ngx_rtmp_seek;
ngx_rtmp_pause_pt ngx_rtmp_pause;
ngx_rtmp_stream_begin_pt ngx_rtmp_stream_begin;
ngx_rtmp_stream_eof_pt ngx_rtmp_stream_eof;
ngx_rtmp_stream_dry_pt ngx_rtmp_stream_dry;
ngx_rtmp_recorded_pt ngx_rtmp_recorded;
ngx_rtmp_set_buflen_pt ngx_rtmp_set_buflen;
static ngx_int_t ngx_rtmp_cmd_postconfiguration(ngx_conf_t *cf);
@ -629,6 +636,42 @@ ngx_rtmp_cmd_seek(ngx_rtmp_session_t *s, ngx_rtmp_seek_t *v)
}
static ngx_int_t
ngx_rtmp_cmd_stream_begin(ngx_rtmp_session_t *s, ngx_rtmp_stream_begin_t *v)
{
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_cmd_stream_eof(ngx_rtmp_session_t *s, ngx_rtmp_stream_eof_t *v)
{
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_cmd_stream_dry(ngx_rtmp_session_t *s, ngx_rtmp_stream_dry_t *v)
{
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_cmd_recorded(ngx_rtmp_session_t *s,
ngx_rtmp_recorded_t *v)
{
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_cmd_set_buflen(ngx_rtmp_session_t *s, ngx_rtmp_set_buflen_t *v)
{
return NGX_OK;
}
static ngx_rtmp_amf_handler_t ngx_rtmp_cmd_map[] = {
{ ngx_string("connect"), ngx_rtmp_cmd_connect_init },
{ ngx_string("createStream"), ngx_rtmp_cmd_create_stream_init },
@ -689,5 +732,11 @@ ngx_rtmp_cmd_postconfiguration(ngx_conf_t *cf)
ngx_rtmp_seek = ngx_rtmp_cmd_seek;
ngx_rtmp_pause = ngx_rtmp_cmd_pause;
ngx_rtmp_stream_begin = ngx_rtmp_cmd_stream_begin;
ngx_rtmp_stream_eof = ngx_rtmp_cmd_stream_eof;
ngx_rtmp_stream_dry = ngx_rtmp_cmd_stream_dry;
ngx_rtmp_recorded = ngx_rtmp_cmd_recorded;
ngx_rtmp_set_buflen = ngx_rtmp_cmd_set_buflen;
return NGX_OK;
}

View file

@ -79,6 +79,23 @@ typedef struct {
} ngx_rtmp_pause_t;
typedef struct {
uint32_t msid;
} ngx_rtmp_msid_t;
typedef ngx_rtmp_msid_t ngx_rtmp_stream_begin_t;
typedef ngx_rtmp_msid_t ngx_rtmp_stream_eof_t;
typedef ngx_rtmp_msid_t ngx_rtmp_stream_dry_t;
typedef ngx_rtmp_msid_t ngx_rtmp_recorded_t;
typedef struct {
uint32_t msid;
uint32_t buflen;
} ngx_rtmp_set_buflen_t;
typedef ngx_int_t (*ngx_rtmp_connect_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_connect_t *v);
typedef ngx_int_t (*ngx_rtmp_create_stream_pt)(ngx_rtmp_session_t *s,
@ -87,18 +104,26 @@ typedef ngx_int_t (*ngx_rtmp_close_stream_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_close_stream_t *v);
typedef ngx_int_t (*ngx_rtmp_delete_stream_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_delete_stream_t *v);
typedef ngx_int_t (*ngx_rtmp_publish_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_publish_t *v);
typedef ngx_int_t (*ngx_rtmp_play_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_play_t *v);
typedef ngx_int_t (*ngx_rtmp_seek_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_seek_t *v);
typedef ngx_int_t (*ngx_rtmp_pause_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_pause_t *v);
typedef ngx_int_t (*ngx_rtmp_stream_begin_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_stream_begin_t *v);
typedef ngx_int_t (*ngx_rtmp_stream_eof_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_stream_eof_t *v);
typedef ngx_int_t (*ngx_rtmp_stream_dry_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_stream_dry_t *v);
typedef ngx_int_t (*ngx_rtmp_recorded_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_recorded_t *v);
typedef ngx_int_t (*ngx_rtmp_set_buflen_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_set_buflen_t *v);
extern ngx_rtmp_connect_pt ngx_rtmp_connect;
extern ngx_rtmp_create_stream_pt ngx_rtmp_create_stream;
@ -109,5 +134,11 @@ extern ngx_rtmp_play_pt ngx_rtmp_play;
extern ngx_rtmp_seek_pt ngx_rtmp_seek;
extern ngx_rtmp_pause_pt ngx_rtmp_pause;
extern ngx_rtmp_stream_begin_pt ngx_rtmp_stream_begin;
extern ngx_rtmp_stream_eof_pt ngx_rtmp_stream_eof;
extern ngx_rtmp_stream_dry_pt ngx_rtmp_stream_dry;
extern ngx_rtmp_set_buflen_pt ngx_rtmp_set_buflen;
extern ngx_rtmp_recorded_pt ngx_rtmp_recorded;
#endif /*_NGX_RTMP_CMD_H_INCLUDED_ */

View file

@ -5,6 +5,7 @@
#include "ngx_rtmp.h"
#include "ngx_rtmp_amf.h"
#include "ngx_rtmp_cmd_module.h"
#include <string.h>
@ -79,32 +80,34 @@ ngx_rtmp_protocol_message_handler(ngx_rtmp_session_t *s,
ngx_int_t
ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
ngx_chain_t *in)
{
ngx_buf_t *b;
u_char *p;
uint16_t evt;
uint32_t val, arg;
uint32_t val;
b = in->buf;
if (b->last - b->pos < 6) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"too small buffer for user message: %d",
b->last - b->pos);
"too small buffer for user message: %d",
b->last - b->pos);
return NGX_OK;
}
p = (u_char*)&evt;
p[0] = b->pos[1];
p[1] = b->pos[0];
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"RTMP recv user evt %s (%d)",
ngx_rtmp_user_message_type(evt), (int)evt);
"RTMP recv user evt %s (%i)",
ngx_rtmp_user_message_type(evt), (ngx_int_t) evt);
p = (u_char *) &val;
p = (u_char*)&val;
p[0] = b->pos[5];
p[1] = b->pos[4];
p[2] = b->pos[3];
@ -112,54 +115,95 @@ ngx_rtmp_user_message_handler(ngx_rtmp_session_t *s,
switch(evt) {
case NGX_RTMP_USER_STREAM_BEGIN:
/* use =val as stream id which started */
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"stream begin msid=%uD", val);
break;
{
ngx_rtmp_stream_begin_t v;
v.msid = val;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"receive: stream_begin msid=%uD", v.msid);
return ngx_rtmp_stream_begin(s, &v);
}
case NGX_RTMP_USER_STREAM_EOF:
/* use =val as stream id which is over */
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"stream eof msid=%uD", val);
break;
{
ngx_rtmp_stream_eof_t v;
v.msid = val;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"receive: stream_eof msid=%uD", v.msid);
return ngx_rtmp_stream_eof(s, &v);
}
case NGX_RTMP_USER_STREAM_DRY:
/* stream =val is dry */
break;
{
ngx_rtmp_stream_dry_t v;
v.msid = val;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"receive: stream_dry msid=%uD", v.msid);
return ngx_rtmp_stream_dry(s, &v);
}
case NGX_RTMP_USER_SET_BUFLEN:
if (b->last - b->pos >= 10) {
p = (u_char*)&arg;
{
ngx_rtmp_set_buflen_t v;
v.msid = val;
if (b->last - b->pos < 10) {
return NGX_OK;
}
p = (u_char *) &v.buflen;
p[0] = b->pos[9];
p[1] = b->pos[8];
p[2] = b->pos[7];
p[3] = b->pos[6];
/* use =val as stream id && arg as buflen in msec*/
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"msid=%uD buflen: %uD (msec)", val, arg);
"receive: set_buflen msid=%uD buflen=%uD",
v.msid, v.buflen);
s->buflen = arg;
/*TODO: move this to play module */
s->buflen = v.buflen;
return ngx_rtmp_set_buflen(s, &v);
}
case NGX_RTMP_USER_RECORDED:
{
ngx_rtmp_recorded_t v;
v.msid = val;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"receive: recorded msid=%uD", v.msid);
return ngx_rtmp_recorded(s, &v);
}
break;
case NGX_RTMP_USER_RECORDED:
/* stream =val is recorded */
break;
case NGX_RTMP_USER_PING_REQUEST:
ngx_rtmp_send_ping_response(s, val);
break;
return ngx_rtmp_send_ping_response(s, val);
case NGX_RTMP_USER_PING_RESPONSE:
/* use =val as incoming timestamp */
/* val = incoming timestamp */
ngx_rtmp_reset_ping(s);
break;
return NGX_OK;
default:
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, s->connection->log, 0,
"unexpected user event: %d",
(int)evt);
"unexpected user event: %i", (ngx_int_t) evt);
return NGX_OK;
}
@ -265,5 +309,4 @@ ngx_rtmp_receive_amf(ngx_rtmp_session_t *s, ngx_chain_t *in,
act.log = s->connection->log;
return ngx_rtmp_amf_read(&act, elts, nelts);
}
}