added event & call handlers && created broadcast module

This commit is contained in:
Roman Arutyunyan 2012-03-12 21:58:00 +04:00
parent 39c846faa5
commit fca2c2c8f5
12 changed files with 807 additions and 626 deletions

19
TODO
View file

@ -2,24 +2,7 @@
Move AMF0 handlers to modules.
Move broadcast to module.
- Implement helpers for issuing
protocol control messages.
+ Implement multiplexing on
chunk-stream-id. (Message-Streams
may not be multiplexed within
a Chunk Stream):
"Typically, all messages in
the same chunk stream will come from the same message stream"
- Output buffers:
implement shared (per-loc/srv) buffers
for output of a) header(max 18b) b) chunk (fixed-sized)
We never change output chunk size.
Chunk buffers should be ref-counted.
- 'packet' <-> 'message'
- remove macros hell from ngx_rtmp_send.c
- implement loc confs (=fms apps)
loc options:

22
config
View file

@ -1,13 +1,15 @@
ngx_addon_name="ngx_rtmp_module"
CORE_MODULES="$CORE_MODULES ngx_rtmp_module ngx_rtmp_core_module"
CORE_MODULES="$CORE_MODULES
ngx_rtmp_module \
ngx_rtmp_core_module \
ngx_rtmp_broadcast_module"
NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
$ngx_addon_dir/ngx_rtmp.c \
$ngx_addon_dir/ngx_rtmp_handler.c \
$ngx_addon_dir/ngx_rtmp_core_module.c \
$ngx_addon_dir/ngx_rtmp_amf0.c \
$ngx_addon_dir/ngx_rtmp_send.c \
\
$ngx_addon_dir/ngx_rtmp_netconn.c \
$ngx_addon_dir/ngx_rtmp_netstream.c"
NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
$ngx_addon_dir/ngx_rtmp.c \
$ngx_addon_dir/ngx_rtmp_handler.c \
$ngx_addon_dir/ngx_rtmp_core_module.c \
$ngx_addon_dir/ngx_rtmp_amf0.c \
$ngx_addon_dir/ngx_rtmp_send.c \
$ngx_addon_dir/ngx_rtmp_receive.c \
$ngx_addon_dir/ngx_rtmp_broadcast_module.c"

View file

@ -20,6 +20,10 @@ static ngx_int_t ngx_rtmp_add_addrs6(ngx_conf_t *cf, ngx_rtmp_port_t *mport,
ngx_rtmp_conf_addr_t *addr);
#endif
static ngx_int_t ngx_rtmp_cmp_conf_addrs(const void *one, const void *two);
static ngx_int_t ngx_rtmp_init_events(ngx_conf_t *cf,
ngx_rtmp_core_main_conf_t *cmcf);
static ngx_int_t ngx_rtmp_init_phase_handlers(ngx_conf_t *cf,
ngx_rtmp_core_main_conf_t *cmcf);
ngx_uint_t ngx_rtmp_max_module;
@ -141,12 +145,25 @@ ngx_rtmp_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
}
}
/* parse inside the rtmp{} block */
pcf = *cf;
cf->ctx = ctx;
for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_RTMP_MODULE) {
continue;
}
module = ngx_modules[m]->ctx;
if (module->preconfiguration) {
if (module->preconfiguration(cf) != NGX_OK) {
return NGX_CONF_ERROR;
}
}
}
/* parse inside the rtmp{} block */
cf->module_type = NGX_RTMP_MODULE;
cf->cmd_type = NGX_RTMP_MAIN_CONF;
rv = ngx_conf_parse(cf, NULL);
@ -200,8 +217,29 @@ ngx_rtmp_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
}
}
if (ngx_rtmp_init_events(cf, cmcf) != NGX_OK) {
return NGX_CONF_ERROR;
}
for (m = 0; ngx_modules[m]; m++) {
if (ngx_modules[m]->type != NGX_RTMP_MODULE) {
continue;
}
module = ngx_modules[m]->ctx;
if (module->postconfiguration) {
if (module->postconfiguration(cf) != NGX_OK) {
return NGX_CONF_ERROR;
}
}
}
*cf = pcf;
if (ngx_rtmp_init_event_handlers(cf, cmcf) != NGX_OK) {
return NGX_CONF_ERROR;
}
if (ngx_array_init(&ports, cf->temp_pool, 4, sizeof(ngx_rtmp_conf_port_t))
!= NGX_OK)
@ -221,6 +259,87 @@ ngx_rtmp_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
}
static ngx_int_t
ngx_rtmp_init_events(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
{
for(n = 0; n < NGX_RTMP_MSG_MAX; ++n) {
if (ngx_array_init(&cmcf->events[n], cf->pool, 1,
sizeof(ngx_rtmp_event_handler_pt)) != NGX_OK)
{
return NGX_ERROR;
}
}
if (ngx_init_array(&conf->calls, cf->pool, 1,
sizeof(ngx_hash_key_t)) != NGX_OK)
{
return NGX_ERROR;
}
if (ngx_init_array(&conf->disconnect, cf->pool, 1,
sizeof(ngx_rtmp_disconnect_handler_pt)) != NGX_OK)
{
return NGX_ERROR;
}
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf)
{
ngx_hash_init_t calls_hash;
ngx_event_handler_pt *eh;
ngx_hash_key_t *h;
size_t n;
static size_t pm_events[] = {
NGX_RTMP_MSG_CHUNK_SIZE,
NGX_RTMP_MSG_ABORT,
NGX_RTMP_MSG_ACK,
NGX_RTMP_MSG_USER,
NGX_RTMP_MSG_ACK_SIZE,
NGX_RTMP_MSG_BANDWIDTH
};
/* init events */
for(n = 0; n < sizeof(pm_events) / sizeof(p_events[0]); ++n) {
eh = ngx_array_push(&cmcf->events[pm_events[n]]);
*eh = ngx_rtmp_protocol_message_handler;
}
eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_USER]);
*eh = ngx_rtmp_user_message_handler;
eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AMF0_CMD]);
*eh = ngx_rtmp_amf0_message_handler;
/* init calls */
for(n = 0; n < cmcf->nelts; ++n) {
h = &cmcf->calls.elts[n];
h->key_hash = ngx_hash_key_lc(h->key.data, h->key.len);
}
calls_hash.hash = &cmcf->calls_hash;
calls_hash.key = ngx_hash_key_lc;
calls_hash.max_size = 1024;
calls_hash.bucket_size = ngx_cacheline_size;
calls_hash.name = "calls_hash";
calls_hash.pool = cf->pool;
calls_hash.temp_pool = NULL;
if (ngx_hash_init(&calls_hash, cmcf->calls.elts, cmcf->calls.nelts)
!= NGX_OK)
{
return NGX_ERROR;
}
return NGX_OK;
}
static ngx_int_t
ngx_rtmp_add_ports(ngx_conf_t *cf, ngx_array_t *ports,
ngx_rtmp_listen_t *listen)

View file

@ -13,13 +13,6 @@
#include <ngx_event_connect.h>
#define NGX_RTMP_HANDSHAKE_SIZE 1536
#define NGX_RTMP_DEFAULT_CHUNK_SIZE 128
#define NGX_LOG_DEBUG_RTMP NGX_LOG_DEBUG_CORE
typedef struct {
void **main_conf;
void **srv_conf;
@ -101,115 +94,11 @@ typedef struct {
} ngx_rtmp_conf_addr_t;
typedef struct {
ngx_array_t servers; /* ngx_rtmp_core_srv_conf_t */
ngx_array_t listen; /* ngx_rtmp_listen_t */
} ngx_rtmp_core_main_conf_t;
#define NGX_RTMP_HANDSHAKE_SIZE 1536
#define NGX_RTMP_DEFAULT_CHUNK_SIZE 128
typedef struct {
uint32_t csid; /* chunk stream id */
uint32_t timestamp;
uint32_t mlen; /* message length */
uint8_t type; /* message type id */
uint32_t msid; /* message stream id */
} ngx_rtmp_packet_hdr_t;
#define NGX_RTMP_PUBLISHER 0x01
#define NGX_RTMP_SUBSCRIBER 0x02
typedef struct ngx_rtmp_stream_t {
ngx_rtmp_packet_hdr_t hdr;
ngx_chain_t *in;
} ngx_rtmp_stream_t;
struct ngx_rtmp_session_s {
uint32_t signature; /* "RTMP" */
ngx_connection_t *connection;
void **ctx;
void **main_conf;
void **srv_conf;
ngx_str_t *addr_text;
/* handshake */
ngx_buf_t buf;
ngx_uint_t hs_stage;
/* input
* stream 0 (reserved by RTMP spec)
* used for free chain link */
/* TODO: make stream #1 handle ANY single stream;
* that'll introduce support for
* unlimited number of streams given
* there's no interleaving between them */
ngx_rtmp_stream_t *streams;
uint32_t in_csid;
ngx_uint_t in_chunk_size;
ngx_pool_t *in_pool;
/* output */
ngx_chain_t *out;
ngx_rtmp_packet_hdr_t out_hdr;
/* broadcast */
ngx_str_t name;
struct ngx_rtmp_session_s
*next;
ngx_uint_t flags;
uint32_t csid;
};
typedef struct ngx_rtmp_session_s ngx_rtmp_session_t;
#define NGX_RTMP_SESSION_HASH_SIZE 16384
typedef struct {
ngx_msec_t timeout;
ngx_flag_t so_keepalive;
ngx_int_t max_streams;
/* shared output buffers */
ngx_uint_t out_chunk_size;
ngx_pool_t *pool;
ngx_chain_t *free;
ngx_rtmp_session_t **sessions; /* session hash map: name->session */
ngx_rtmp_conf_ctx_t *ctx;
} ngx_rtmp_core_srv_conf_t;
typedef struct {
ngx_str_t *client;
ngx_rtmp_session_t *session;
} ngx_rtmp_log_ctx_t;
typedef struct {
void *(*create_main_conf)(ngx_conf_t *cf);
char *(*init_main_conf)(ngx_conf_t *cf, void *conf);
void *(*create_srv_conf)(ngx_conf_t *cf);
char *(*merge_srv_conf)(ngx_conf_t *cf, void *prev,
void *conf);
} ngx_rtmp_module_t;
/* Chunk header:
* max 3 basic header
* + max 11 message header
* + max 4 extended header (timestamp) */
#define NGX_RTMP_MAX_CHUNK_HEADER 18
#define NGX_LOG_DEBUG_RTMP NGX_LOG_DEBUG_CORE
/* RTMP message types*/
@ -229,6 +118,7 @@ typedef struct {
#define NGX_RTMP_MSG_AMF0_SHARED 19
#define NGX_RTMP_MSG_AMF0_CMD 20
#define NGX_RTMP_MSG_AGGREGATE 22
#define NGX_RTMP_MSG_MAX 23
/* RMTP control message types */
@ -241,6 +131,104 @@ typedef struct {
#define NGX_RTMP_USER_PING_RESPONSE 7
/* Chunk header:
* max 3 basic header
* + max 11 message header
* + max 4 extended header (timestamp) */
#define NGX_RTMP_MAX_CHUNK_HEADER 18
typedef struct {
uint32_t csid; /* chunk stream id */
uint32_t timestamp;
uint32_t mlen; /* message length */
uint8_t type; /* message type id */
uint32_t msid; /* message stream id */
} ngx_rtmp_header_t;
typedef struct ngx_rtmp_stream_t {
ngx_rtmp_header_t hdr;
ngx_chain_t *in;
} ngx_rtmp_stream_t;
typedef struct {
uint32_t signature; /* "RTMP" */
ngx_connection_t *connection;
void **ctx;
void **main_conf;
void **srv_conf;
ngx_str_t *addr_text;
/* handshake */
ngx_buf_t buf;
ngx_uint_t hs_stage;
/* input stream 0 (reserved by RTMP spec)
* used for free chain link */
ngx_rtmp_stream_t *in_streams;
uint32_t in_csid;
ngx_uint_t in_chunk_size;
ngx_pool_t *in_pool;
ngx_chain_t *out;
} ngx_rtmp_session_t;
typedef ngx_int_t (*ngx_rtmp_event_handler_pt)(ngx_rtmp_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
typedef ngx_int_t (*ngx_rtmp_call_handler_pt)(ngx_rtmp_session_t *s,
double trans, ngx_chain_t *in);
typedef ngx_int_t (*ngx_rtmp_disconnect_handler_pt)(ngx_rtmp_session_t *s);
typedef struct {
ngx_array_t servers; /* ngx_rtmp_core_srv_conf_t */
ngx_array_t listen; /* ngx_rtmp_listen_t */
ngx_array_t events[NGX_RTMP_MSG_MAX];
ngx_hash_t calls_hash;
ngx_array_t calls;
ngx_array_t disconect;
} ngx_rtmp_core_main_conf_t;
typedef struct {
ngx_msec_t timeout;
ngx_flag_t so_keepalive;
ngx_int_t max_streams;
ngx_uint_t out_chunk_size;
ngx_pool_t *out_pool;
ngx_chain_t *out_free;
ngx_rtmp_conf_ctx_t *ctx;
} ngx_rtmp_core_srv_conf_t;
typedef struct {
ngx_str_t *client;
ngx_rtmp_session_t *session;
} ngx_rtmp_log_ctx_t;
typedef struct {
ngx_int_t (*preconfiguration)(ngx_conf_t *cf);
ngx_int_t (*postconfiguration)(ngx_conf_t *cf);
void *(*create_main_conf)(ngx_conf_t *cf);
char *(*init_main_conf)(ngx_conf_t *cf, void *conf);
void *(*create_srv_conf)(ngx_conf_t *cf);
char *(*merge_srv_conf)(ngx_conf_t *cf, void *prev,
void *conf);
} ngx_rtmp_module_t;
#define NGX_RTMP_MODULE 0x504D5452 /* "RTMP" */
#define NGX_RTMP_MAIN_CONF 0x02000000
@ -265,13 +253,23 @@ typedef struct {
#define ngx_rtmp_conf_get_module_srv_conf(cf, module) \
((ngx_rtmp_conf_ctx_t *) cf->ctx)->srv_conf[module.ctx_index]
void ngx_rtmp_init_connection(ngx_connection_t *c);
void ngx_rtmp_close_session(ngx_rtmp_session_t *s);
u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len);
/* Receiving messages */
ngx_int_t ngx_rtmp_protocol_message_handler(ngx_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
ngx_int_t ngx_rtmp_user_message_handler(ngx_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
ngx_int_t ngx_rtmp_amf0_message_handler(ngx_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in);
/* Sending messages */
ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s);
void ngx_rtmp_prepare_message(ngx_rtmp_packet_hdr_t *h,
void ngx_rtmp_prepare_message(ngx_rtmp_header_t *h,
ngx_chain_t *out, uint8_t fmt);
void ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out);
@ -315,42 +313,6 @@ ngx_int_t ngx_rtmp_receive_amf0(ngx_session_t *s, ngx_chain_t *in,
ngx_rtmp_amf0_elt_t *elts, size_t nelts)
/************** will go to modules */
/* Broadcasting */
void ngx_rtmp_join(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_uint_t flags);
void ngx_rtmp_leave(ngx_rtmp_session_t *s);
/* NetConnection methods */
ngx_int_t ngx_rtmp_connect(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_call(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_close(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_createstream(ngx_rtmp_session_t *s
double trans_id, , ngx_chain_t *l);
/* NetStream methods */
ngx_int_t ngx_rtmp_play(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_play2(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_deletestream(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_closestream(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_receivevideo(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_publish(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_seek(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
ngx_int_t ngx_rtmp_pause(ngx_rtmp_session_t *s,
double trans_id, ngx_chain_t *l);
extern ngx_uint_t ngx_rtmp_max_module;
extern ngx_module_t ngx_rtmp_core_module;

View file

@ -19,8 +19,6 @@
#include <ngx_core.h>
/*TODO: char -> u_char */
typedef struct {
ngx_int_t type;
char *name;

268
ngx_rtmp_broadcast_module.c Normal file
View file

@ -0,0 +1,268 @@
/*
* Copyright (c) 2012 Roman Arutyunyan
*/
#include <ngx_config.h>
#include <ngx_core.h>
#include "ngx_rtmp.h"
static ngx_int_t ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf);
static void * ngx_rtmp_broadcast_create_srv_conf(ngx_conf_t *cf);
static char * ngx_rtmp_broadcast_merge_srv_conf(ngx_conf_t *cf,
void *parent, void *child);
typedef struct {
/* use hash-map
* stream -> broadcast contexts */
ngx_int_t buckets;
struct ngx_rtmp_broadcast_ctx_s **contexts;
} ngx_rtmp_broadcast_srv_conf_t;
static ngx_command_t ngx_rtmp_broadcast_commands[] = {
{ ngx_string("broadcast_buckets"),
NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1,
ngx_conf_set_str_slot,
NGX_RTMP_SRV_CONF_OFFSET,
offsetof(ngx_rtmp_broadcast_srv_conf_t, buckets),
NULL },
ngx_null_command
};
static ngx_rtmp_module_t ngx_rtmp_broadcast_module_ctx = {
NULL, /* preconfiguration */
ngx_rtmp_broadcast_postconfiguration, /* postconfiguration */
NULL, /* create main configuration */
NULL, /* init main configuration */
ngx_rtmp_broadcast_create_srv_conf, /* create server configuration */
ngx_rtmp_broadcast_merge_srv_conf /* merge server configuration */
};
ngx_module_t ngx_rtmp_broadcast_module = {
NGX_MODULE_V1,
&ngx_rtmp_broadcast_module_ctx, /* module context */
ngx_rtmp_broadcast_commands, /* module directives */
NGX_RTMP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
NULL, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
#define NGX_RTMP_PUBLISHER 0x01
#define NGX_RTMP_SUBSCRIBER 0x02
#define NGX_RTMP_SESSION_HASH_SIZE 16384
typedef struct ngx_rtmp_broadcast_ctx_s {
ngx_str_t stream;
ngx_rtmp_session_t *session;
struct ngx_rtmp_broadcast_ctx_s *next;
ngx_uint_t flags; /* publisher/subscriber */
uint32_t csid;
} ngx_rtmp_broadcast_ctx_t;
static void *
ngx_rtmp_broadcast_create_srv_conf(ngx_conf_t *cf)
{
ngx_rtmp_broadcast_srv_conf_t *cscf;
cscf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_broadcast_srv_conf_t));
if (cscf == NULL) {
return NULL;
}
cscf->buckets = NGX_CONF_UNSET;
return cscf;
}
static char *
ngx_rtmp_broadcast_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
{
ngx_rtmp_broadcast_srv_conf_t *prev = parent;
ngx_rtmp_broadcast_srv_conf_t *conf = child;
ngx_conf_merge_value(conf->buckets, prev->buckets, 1024);
conf->contexts = ngx_pcalloc(cf->pool,
sizeof(ngx_rtmp_broadcast_ctx_t *) * conf->buckets);
return NGX_CONF_OK;
}
static ngx_rtmp_broadcast_ctx_t **
ngx_rtmp_broadcast_get_head(ngx_rtmp_broadcast_ctx_t *ctx)
{
ngx_rtmp_broadcast_srv_conf_t *bscf;
bscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_broadcast_module);
return &bscf->contexts[
ngx_hash_key(ctx->stream.data, ctx->stream.len)
% bscf->buckets];
}
static void
ngx_rtmp_broadcast_join(ngx_rtmp_session_t *s, ngx_str_t *stream,
ngx_uint_t flags)
{
ngx_connection_t *c;
ngx_rtmp_broadcast_ctx_t *ctx, **hctx;
c = s->connection;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module);
if (ctx == NULL) {
ctx = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_broadcast_ctx_t));
ctx->session = s;
ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_broadcast_module);
}
if (ctx->stream.len) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "already joined");
return;
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"join broadcast stream '%V'", &stream);
s->stream = *stream;
hctx = ngx_rtmp_broadcast_get_head(ctx);
ctx->next = *hctx;
ctx->flags = flags;
*hctx = ctx;
}
static void
ngx_rtmp_broadcast_leave(ngx_rtmp_session_t *s)
{
ngx_connection_t *c;
ngx_rtmp_broadcast_ctx_t *ctx, **hctx;
c = s->connection;
ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module);
if (ctx == NULL || !ctx->stream.len) {
return;
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"leave broadcast stream '%V'", &s->stream);
hctx = ngx_rtmp_broadcast_get_head(ctx);
ngx_str_null(&ctx->stream);
for(; *hctx; hctx = &(*hctx)->next) {
if (*hctx == ctx) {
*hctx = (*hctx)->next;
return;
}
}
}
static ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans,
ngx_chain_t *in)
{
static double trans;
static u_char app[1024];
static u_char url[1024];
static ngx_rtmp_amf0_elt_t in_cmd[] = {
{ NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) },
{ NGX_RTMP_AMF0_STRING, "pageUrl", url, sizeof(utl) },
};
static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
};
static ngx_rtmp_amf0_elt_t in_elts[] = {
{ NGX_RTMP_AMF0_OBJECT, NULL, in_cmd, sizeof(in_cmd) },
{ NGX_RTMP_AMF0_NULL, NULL, NULL, 0 },
};
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, NULL, "_result", sizeof("_result") - 1 },
{ NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , NULL, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) },
};
if (ngx_rtmp_receive_amf0(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
trans = in_trans;
ngx_str_set(&inf[0], "NetConnection.Connect.Success");
ngx_str_set(&inf[1], "status");
ngx_str_set(&inf[2], "Connection succeeded.");
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"connect() called; app='%s' url='%s'",
app, url);
/*ngx_rtmp_broadcast_join(s, app);*/
return ngx_rtmp_send_ack_size(s, 65536)
|| ngx_rtmp_send_bandwidth(s, 65536, NGX_RTMP_LIMIT_SOFT)
|| ngx_rtmp_send_user_stream_begin(s, 1)
|| ngx_rtmp_send_amf0(s, 3, 1, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]))
? NGX_ERROR
: NGX_OK;
}
static ngx_int_t
ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf)
{
ngx_rtmp_core_main_conf_t *cmcf;
ngx_hash_key_t *h;
ngx_rtmp_disconnect_handler_pt *dh;
cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_rtmp_module);
/* add connect() handler */
h = ngx_array_push(&cmcf->calls);
if (h == NULL) {
return NGX_ERROR;
}
ngx_str_set(&h->key, "connect");
h->value = ngx_rtmp_broadcast_connect;
/* add disconnect handler */
dh = ngx_array_push(&cmcf->disconnect);
if (dh == NULL) {
return NGX_ERROR;
}
*dh = ngx_rtmp_broadcast_leave;
return NGX_OK;
}

View file

@ -75,11 +75,12 @@ static ngx_command_t ngx_rtmp_core_commands[] = {
static ngx_rtmp_module_t ngx_rtmp_core_module_ctx = {
ngx_rtmp_core_create_main_conf, /* create main configuration */
NULL, /* init main configuration */
ngx_rtmp_core_create_srv_conf, /* create server configuration */
ngx_rtmp_core_merge_srv_conf /* merge server configuration */
NULL, /* preconfiguration */
NULL, /* postconfiguration */
ngx_rtmp_core_create_main_conf, /* create main configuration */
NULL, /* init main configuration */
ngx_rtmp_core_create_srv_conf, /* create server configuration */
ngx_rtmp_core_merge_srv_conf /* merge server configuration */
};
@ -129,22 +130,16 @@ ngx_rtmp_core_create_main_conf(ngx_conf_t *cf)
static void *
ngx_rtmp_core_create_srv_conf(ngx_conf_t *cf)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_rtmp_core_srv_conf_t *cscf;
size_t n
cscf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_core_srv_conf_t));
if (cscf == NULL) {
return NULL;
}
/*
* set by ngx_pcalloc():
*
* cscf->protocol = NULL;
*/
cscf->timeout = NGX_CONF_UNSET_MSEC;
cscf->so_keepalive = NGX_CONF_UNSET;
cscf->buffers = NGX_CONF_UNSET;
conf->max_streams = NGX_CONF_UNSET;
conf->out_chunk_size = NGX_CONF_UNSET;
@ -164,10 +159,14 @@ ngx_rtmp_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child)
ngx_conf_merge_value(conf->max_streams, prev->max_streams, 16);
ngx_conf_merge_value(conf->out_chunk_size, prev->out_chunk_size, 128);
conf->pool = ngx_create_pool(4096, cf->log);
if (prev->out_pool == NULL) {
prev->out_pool = ngx_create_pool(4096, cf->log);
if (prev->out_pool == NULL) {
return NGX_CONF_ERROR;
}
}
conf->sessions = ngx_pcalloc(cf->pool,
sizeof(ngx_rtmp_session_t*) * NGX_RTMP_SESSION_HASH_SIZE);
conf->out_pool = prev->out_pool;
return NGX_CONF_OK;
}

View file

@ -210,9 +210,9 @@ ngx_rtmp_init_session(ngx_connection_t *c)
return;
}
s->streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t)
s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t)
* cmcf->max_streams);
if (s->streams == NULL) {
if (s->in_streams == NULL) {
ngx_rtmp_close_session(s);
return;
}
@ -377,7 +377,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
uint32_t timestamp;
size_t size;
ngx_chain_t *in;
ngx_rtmp_packet_hdr_t *h;
ngx_rtmp_header_t *h;
uint8_t fmt;
uint32_t csid;
ngx_rtmp_stream_t *st, st0;
@ -391,7 +391,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
for(;;) {
st = &s->streams[s->csid];
st = &s->in_streams[s->csid];
if (st->in == NULL) {
if ((st->in = ngx_alloc_chain_link(s->in_pool)) == NULL
@ -492,7 +492,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
/* link to new stream */
s->csid = csid;
st = s->streams[csid];
st = s->in_streams[csid];
if (st->in == NULL) {
in->next = in;
} else {
@ -595,7 +595,7 @@ ngx_rtmp_recv(ngx_event_t *rev)
b->pos += h->mlen;
/* add used bufs to stream #0 */
st0 = &s->streams[0];
st0 = &s->in_streams[0];
st->in->next = st0->in->next;
st0->in->next = head;
}
@ -687,17 +687,17 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s)
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
if (cscf->free) {
out = cscf->free;
cscf->free = out->next;
if (cscf->out_free) {
out = cscf->out_free;
cscf->out_free = out->next;
} else {
out = ngx_alloc_chain_link(cscf->pool);
out = ngx_alloc_chain_link(cscf->out_pool);
if (out == NULL) {
return NULL;
}
out->buf = ngx_calloc_buf(cscf->pool);
out->buf = ngx_calloc_buf(cscf->out_pool);
if (out->buf == NULL) {
return NULL;
}
@ -705,7 +705,7 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s)
size = cscf->out_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
b = out->buf;
b->start = ngx_palloc(cscf->pool, size);
b->start = ngx_palloc(cscf->out_pool, size);
b->end = b->start + size;
}
@ -719,7 +719,7 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s)
void
ngx_rtmp_prepare_message(ngx_rtmp_packet_hdr_t *h, ngx_chain_t *out,
ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out,
uint8_t fmt)
{
ngx_chain_t *l;
@ -840,39 +840,14 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out)
static ngx_int_t
ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
ngx_rtmp_packet_hdr_t *h, ngx_chain_t *l)
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
ngx_rtmp_core_srv_conf_t *cscf;
ngx_connection_t *c;
ngx_buf_t *b;
struct {
uint16_t *v1;
uint16_t *v2;
uint16_t *v3;
} ping;
ngx_rtmp_session_t *ss;
static char invoke_name[64];
static double trans_id;
static ngx_rtmp_amf0_elt_t invoke_name_elt[] = {
{ NGX_RTMP_AMF0_STRING, NULL, invoke_name, sizeof(invoke_name) },
{ NGX_RTMP_AMF0_STRING, NULL, &trans_id, sizeof(trans_id) }
};
ngx_rtmp_amf0_ctx_t amf_ctx;
if (l == NULL) {
return NGX_ERROR;
}
c = s->connection;
b = l->buf;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
/* a session handles only one chunk stream
* but #2 is a special one for protocol messages */
if (h->csid != 2) {
s->csid = h->csid;
}
ngx_rtmp_core_main_conf_t *cmcf;
ngx_array_t *evhs;
size_t n;
ngx_rtmp_event_handler_pt evh;
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
#ifdef NGX_DEBUG
{
@ -891,142 +866,22 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
}
#endif
switch(h->type) {
case NGX_RTMP_MSG_CHUNK_SIZE:
break;
if (h->type >= NGX_RTMP_MSG_MAX) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"unexpected RTMP message type: %d", (int)h->type);
return NGX_OK;
}
case NGX_RTMP_MSG_ABORT:
break;
case NGX_RTMP_MSG_ACK:
break;
case NGX_RTMP_MSG_CTL:
if (b->last - b->pos < 6)
return NGX_ERROR;
ping.v1 = (uint16_t*)(b->pos);
ping.v2 = (uint16_t*)(b->pos + 2);
ping.v3 = (uint16_t*)(b->pos + 4);
switch(*ping.v1) {
case NGX_RTMP_USER_STREAM_BEGIN:
break;
case NGX_RTMP_USER_STREAM_EOF:
break;
case NGX_RTMP_USER_STREAM_DRY:
break;
case NGX_RTMP_USER_SET_BUFLEN:
break;
case NGX_RTMP_USER_RECORDED:
break;
case NGX_RTMP_USER_PING_REQUEST:
/* ping client from server */
/**ping.v1 = NGX_RTMP_PING_PONG;
ngx_rtmp_send_message(s, h, l);*/
break;
case NGX_RTMP_USER_PING_RESPONSE:
break;
}
break;
case NGX_RTMP_MSG_ACK_SIZE:
break;
case NGX_RTMP_MSG_BANDWIDTH:
break;
case NGX_RTMP_MSG_EDGE:
break;
case NGX_RTMP_MSG_AUDIO:
case NGX_RTMP_MSG_VIDEO:
/*
if (!(s->flags & NGX_RTMP_PUBLISHER)) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
"received audio/video from non-publisher");
return NGX_ERROR;
}
for(ss = *ngx_rtmp_get_session_head(s);
ss; ss = ss->next)
{
if (s != ss
&& ss->flags & NGX_RTMP_SUBSCRIBER
&& s->name.len == ss->name.len
&& !ngx_strncmp(s->name.data, ss->name.data,
s->name.len))
{
ngx_rtmp_send_message(ss, h, l);
}
}*/
break;
case NGX_RTMP_MSG_AMF3_META:
case NGX_RTMP_MSG_AMF3_SHARED:
case NGX_RTMP_MSG_AMF3_CMD:
/* FIXME: AMF3 it not yet supported */
break;
case NGX_RTMP_MSG_AMF0_META:
break;
case NGX_RTMP_MSG_AMF0_SHARED:
break;
case NGX_RTMP_MSG_AMF0_CMD:
amf_ctx.link = &l;
amf_ctx.free = &s->free;
amf_ctx.log = c->log;
memset(invoke_name, 0, sizeof(invoke_name));
if (ngx_rtmp_amf0_read(&amf_ctx, &invoke_name_elt, 1) != NGX_OK) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
"AMF0 cmd failed");
return NGX_ERROR;
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"AMF0 cmd '%s'",
invoke_name);
#define _CMD_CALL(name) \
if (!strcasecmp(invoke_name, #name)) { \
return ngx_rtmp_##name(s, trans_id, l); \
}
/* NetConnection calls */
_CMD_CALL(connect);
_CMD_CALL(call);
_CMD_CALL(close);
_CMD_CALL(createstream);
/* NetStream calls */
_CMD_CALL(play);
_CMD_CALL(play2);
_CMD_CALL(deletestream);
_CMD_CALL(closestream);
_CMD_CALL(receiveaudio);
_CMD_CALL(receivevideo);
_CMD_CALL(publish);
_CMD_CALL(seek);
_CMD_CALL(pause);
#undef _CMD_CALL
break;
default:
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"unexpected packet type %d",
(int)h->type);
evhs = &cmcf->events[h->type];
for(n = 0; n < evhs->nelts; ++n) {
evh = evh->elts[n];
if (!evh) {
continue;
}
if (evh(s, h, in) != NGX_OK) {
return NGX_ERROR;
}
}
return NGX_OK;
@ -1036,7 +891,19 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s,
void
ngx_rtmp_close_session(ngx_rtmp_session_t *s)
{
ngx_rtmp_leave(s);
size_t n;
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_disconnect_handler_pt *h;
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
for(n = 0; n < cmcf->disconnect.nelts; ++n) {
h = &cmcf->disconnect.elts[n];
if (*h) {
(*h)(s);
}
}
ngx_destroy_pool(s->in_pool);
ngx_rtmp_close_connection(s->connection);
}
@ -1088,72 +955,3 @@ ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len)
return p;
}
/************* this will go to module ****************/
ngx_rtmp_session_t **
ngx_rtmp_get_session_head(ngx_rtmp_session_t *s)
{
ngx_rtmp_core_srv_conf_t *cscf;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
return &cscf->sessions[
ngx_hash_key(s->name.data, s->name.len)
% NGX_RTMP_SESSION_HASH_SIZE];
}
void
ngx_rtmp_join(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_uint_t flags)
{
ngx_rtmp_session_t **ps;
ngx_connection_t *c;
c = s->connection;
if (s->name.len) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
"already joined");
return;
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP join '%V'",
&name);
s->name = *name;
ps = ngx_rtmp_get_session_head(s);
s->next = *ps;
s->flags = flags;
*ps = s;
}
void
ngx_rtmp_leave(ngx_rtmp_session_t *s)
{
ngx_rtmp_session_t **ps;
ngx_connection_t *c;
c = s->connection;
if (!s->name.len)
return;
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP leave '%V'",
&s->name);
ps = ngx_rtmp_get_session_head(s);
ngx_str_null(&s->name);
for(; *ps; ps = &(*ps)->next) {
if (*ps == s) {
*ps = (*ps)->next;
return;
}
}
}

View file

@ -1,94 +0,0 @@
/*
* Copyright (c) 2012 Roman Arutyunyan
*/
#include "ngx_rtmp_amf0.h"
ngx_int_t
ngx_rtmp_connect(ngx_rtmp_session_t *s, double in_trans, ngx_chain_t *in)
{
/* 1) send 'Window Acknowledgement Size'
*
* 2) send 'Set Peer Bandwidth'
*
* 3*) receive 'Window Acknowledgement'
*
* 4) send 'User Control Message(StreamBegin)'
* '
* 5) AMF0 reply:
*
* "_ result"
* 1
* NULL
* { code : "NetConnection.Connect.Success",
* level : "status",
* description : "Connection succeeded." }
*/
static double trans;
static char app[128];
static char flashver[128];
static char svfurl[128];
static ngx_rtmp_amf0_elt_t in_cmd[] = {
{ NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) },
{ NGX_RTMP_AMF0_STRING, "flashver", flashver, sizeof(flashver)},
{ NGX_RTMP_AMF0_STRING, "swfurl", svfurl, sizeof(svfurl) },
};
static ngx_rtmp_amf0_elt_t in_elts[] = {
{ NGX_RTMP_AMF0_OBJECT, 0, in_cmd, sizeof(in_cmd) },
{ NGX_RTMP_AMF0_NULL, 0, NULL, 0 },
};
static ngx_rtmp_amf0_elt_t out_inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
};
static ngx_rtmp_amf0_elt_t out_elts[] = {
{ NGX_RTMP_AMF0_STRING, 0, "_result", sizeof("_result") - 1 },
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , 0, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, 0, out_inf, sizeof(out_inf) },
};
if (ngx_rtmp_receive_amf0(s, in, in_elts,
sizeof(in_elts) / sizeof(in_elts[0])))
{
return NGX_ERROR;
}
trans = in_trans;
ngx_str_set(&inf[0], "NetConnection.Connect.Success");
ngx_str_set(&inf[1], "status");
ngx_str_set(&inf[2], "Connection succeeded.");
return ngx_rtmp_send_ack_size(s, 65536)
|| ngx_rtmp_send_bandwidth(s, 65536, NGX_RTMP_LIMIT_SOFT)
|| ngx_rtmp_send_user_stream_begin(s, 1)
|| ngx_rtmp_send_amf0(s, 3, 1, out_elts,
sizeof(out_elts) / sizeof(out_elts[0]))
? NGX_ERROR
: NGX_OK;
}
ngx_int_t
ngx_rtmp_call(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_close(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_createstream(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}

View file

@ -1,67 +0,0 @@
/*
* Copyright (c) 2012 Roman Arutyunyan
*/
#include "ngx_rtmp.h"
ngx_int_t
ngx_rtmp_play(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_play2(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_deletestream(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_closestream(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_receivevideo(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_publish(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_seek(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_pause(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}

223
ngx_rtmp_receive.c Normal file
View file

@ -0,0 +1,223 @@
/*
* Copyright (c) 2012 Roman Arutyunyan
*/
#include "ngx_rtmp.h"
#include "ngx_rtmp_amf0.h"
#include <string.h>
ngx_int_t
ngx_rtmp_protocol_message_handler(ngx_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
ngx_buf_t *b;
u_char *p;
uint32_t val;
uint8_t limit;
ngx_connection_t *c;
c = s->connection;
b = in->buf;
if (b->last - b->pos < 4) {
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"too small buffer for %d message: %d",
(int)h->type, b->last - b->pos);
return NGX_OK;
}
p = &val;
p[0] = b->pos[3];
p[1] = b->pos[2];
p[2] = b->pos[1];
p[3] = b->pos[0];
switch(h->type) {
case NGX_RTMP_MSG_CHUNK_SIZE:
/* set chunk size =val */
break;
case NGX_RTMP_MSG_ABORT:
/* abort chunk stream =val */
break;
case NGX_RTMP_MSG_ACK:
/* receive ack with sequence number =val */
break;
case NGX_RTMP_MSG_ACK_SIZE:
/* receive window size =val */
break;
case NGX_RTMP_MSG_BANDWIDTH:
if (b->last - b->pos >= 5) {
limit = *(uint8_t*)&b->pos[4];
(void)val;
(void)limit;
/* receive window size =val
* && limit */
}
break;
default:
return NGX_ERROR;
}
return NGX_OK;
}
ngx_int_t
ngx_rtmp_user_message_handler(ngx_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
ngx_buf_t *b;
u_char *p;
uint16_t evt;
uint32_t val, arg;
ngx_connection_t *c;
c = s->connection;
b = in->buf;
if (b->last - b->pos < 6) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"too small buffer for user message: %d",
b->last - b->pos);
return NGX_OK;
}
p = &evt;
p[0] = b->pos[1];
p[1] = b->pos[0];
p = &val;
p[0] = b->pos[5];
p[1] = b->pos[4];
p[2] = b->pos[3];
p[3] = b->pos[2];
switch(evt) {
case NGX_RTMP_USER_STREAM_BEGIN:
/* use =val as stream id which started */
break;
case NGX_RTMP_USER_STREAM_EOF:
/* use =val as stream id which is over */
break;
case NGX_RTMP_USER_STREAM_DRY:
/* stream =val is dry */
break;
case NGX_RTMP_USER_SET_BUFLEN:
if (b->last - b->pos >= 10) {
p = &arg;
p[0] = b->pos[9];
p[1] = b->pos[8];
p[2] = b->pos[7];
p[3] = b->pos[6];
(void)arg;
/* use =val as stream id && arg as buflen in msec*/
}
break;
case NGX_RTMP_USER_RECORDED:
/* stream =val is recorded */
break;
case NGX_RTMP_USER_PING_REQUEST:
ngx_rtmp_send_user_ping_response(s, val);
break;
case NGX_RTMP_USER_PING_RESPONSE:
/* use =val as incoming timestamp */
break;
default:
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"unexpected user event: %d",
(int)evt);
return NGX_OK;
}
return NGX_OK;
}
ngx_int_t
ngx_rtmp_amf0_message_handler(ngx_session_t *s,
ngx_rtmp_header_t *h, ngx_chain_t *in)
{
ngx_rtmp_amf0_ctx_t act;
ngx_connection_t *c;
ngx_rtmp_core_main_conf_t *cmcf;
ngx_rtmp_call_handler_pt *ch;
size_t len;
static double trans;
static u_char func[128];
static ngx_rtmp_amf0_elt_t elts[] = {
{ NGX_RTMP_AMF0_STRING, 0, func, sizeof(func) },
{ NGX_RTMP_AMF0_NUMBER, 0, NULL, 0 },
};
c = s->connection;
cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module);
/* read AMF0 func name & transaction id */
act.link = in;
act.log = s->connection->log;
memset(func, 0, sizeof(func));
if (ngx_rtmp_amf0_read(&ect, elts,
sizeof(elts) / sizeof(elts[0])) != NGX_OK)
{
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
"AMF0 cmd failed");
return NGX_ERROR;
}
len = ngx_strlen(func);
/* lookup function handler
* only the first handler is called so far
* because ngx_hash_find only returns one item;
* no good to patch NGINX core ;) */
ch = ngx_hash_find(&cmcf->calls_hash,
ngx_hash_key_lc(func, len), func, len);
if (ch) {
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"AMF0 func '%s' @%f passed to handler", func, trans);
return (*ch)(s, trans, in);
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"AMF0 cmd '%s' @%f no handler", func, trans);
return NGX_OK;
}
ngx_int_t
ngx_rtmp_receive_amf0(ngx_session_t *s, ngx_chain_t *in,
ngx_rtmp_amf0_elt_t *elts, size_t nelts)
{
ngx_rtmp_amf0_ctx_t act;
act.link = in;
act.log = s->connection->log;
return ngx_rtmp_amf0_read(&act, elts, nelts);
}

View file

@ -1,6 +1,12 @@
/*
* Copyright (c) 2012 Roman Arutyunyan
*/
#include "ngx_rtmp.h.h"
#include "ngx_rtmp_amf0.h"
#define NGX_RTMP_CTL_START(s, type) \
ngx_rtmp_packet_hdr_t __h; \
ngx_chain_t *__l; \
@ -23,10 +29,6 @@
#define NGX_RTMP_CTL_OUT1(v) \
*(__p->last++) = ((u_char*)&v)[0];
#define NGX_RTMP_CTL_OUT2(v) \
*(__p->last++) = ((u_char*)&v)[1]; \
*(__p->last++) = ((u_char*)&v)[0];
#define NGX_RTMP_CTL_OUT4(v) \
*(__p->last++) = ((u_char*)&v)[3]; \
*(__p->last++) = ((u_char*)&v)[2]; \
@ -212,15 +214,3 @@ ngx_rtmp_send_amf0(ngx_session_t *s, uint32_t csid, uint32_t msid,
NGX_RTMP_AMF0_END(s);
}
ngx_int_t
ngx_rtmp_receive_amf0(ngx_session_t *s, ngx_chain_t *in,
ngx_rtmp_amf0_elt_t *elts, size_t nelts)
{
ngx_rtmp_amf0_ctx_t act;
act.link = in;
act.log = s->connection->log;
return ngx_rtmp_amf0_read(&act, elts, nelts);
}