From fca2c2c8f5bb2d49788289cc65239ac7f90b2d55 Mon Sep 17 00:00:00 2001 From: Roman Arutyunyan Date: Mon, 12 Mar 2012 21:58:00 +0400 Subject: [PATCH] added event & call handlers && created broadcast module --- TODO | 19 +-- config | 22 +-- ngx_rtmp.c | 125 ++++++++++++++- ngx_rtmp.h | 264 ++++++++++++++------------------ ngx_rtmp_amf0.h | 2 - ngx_rtmp_broadcast_module.c | 268 ++++++++++++++++++++++++++++++++ ngx_rtmp_core_module.c | 31 ++-- ngx_rtmp_handler.c | 296 ++++++------------------------------ ngx_rtmp_netconn.c | 94 ------------ ngx_rtmp_netstream.c | 67 -------- ngx_rtmp_receive.c | 223 +++++++++++++++++++++++++++ ngx_rtmp_send.c | 22 +-- 12 files changed, 807 insertions(+), 626 deletions(-) create mode 100644 ngx_rtmp_broadcast_module.c delete mode 100644 ngx_rtmp_netconn.c delete mode 100644 ngx_rtmp_netstream.c create mode 100644 ngx_rtmp_receive.c diff --git a/TODO b/TODO index 0cc94b7..761ba50 100644 --- a/TODO +++ b/TODO @@ -2,24 +2,7 @@ Move AMF0 handlers to modules. Move broadcast to module. -- Implement helpers for issuing - protocol control messages. - -+ Implement multiplexing on - chunk-stream-id. (Message-Streams - may not be multiplexed within - a Chunk Stream): - - "Typically, all messages in - the same chunk stream will come from the same message stream" - -- Output buffers: - implement shared (per-loc/srv) buffers - for output of a) header(max 18b) b) chunk (fixed-sized) - We never change output chunk size. - Chunk buffers should be ref-counted. - -- 'packet' <-> 'message' +- remove macros hell from ngx_rtmp_send.c - implement loc confs (=fms apps) loc options: diff --git a/config b/config index c178330..6bdd9b2 100644 --- a/config +++ b/config @@ -1,13 +1,15 @@ ngx_addon_name="ngx_rtmp_module" -CORE_MODULES="$CORE_MODULES ngx_rtmp_module ngx_rtmp_core_module" +CORE_MODULES="$CORE_MODULES + ngx_rtmp_module \ + ngx_rtmp_core_module \ + ngx_rtmp_broadcast_module" -NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ - $ngx_addon_dir/ngx_rtmp.c \ - $ngx_addon_dir/ngx_rtmp_handler.c \ - $ngx_addon_dir/ngx_rtmp_core_module.c \ - $ngx_addon_dir/ngx_rtmp_amf0.c \ - $ngx_addon_dir/ngx_rtmp_send.c \ - \ - $ngx_addon_dir/ngx_rtmp_netconn.c \ - $ngx_addon_dir/ngx_rtmp_netstream.c" +NGX_ADDON_SRCS="$NGX_ADDON_SRCS \ + $ngx_addon_dir/ngx_rtmp.c \ + $ngx_addon_dir/ngx_rtmp_handler.c \ + $ngx_addon_dir/ngx_rtmp_core_module.c \ + $ngx_addon_dir/ngx_rtmp_amf0.c \ + $ngx_addon_dir/ngx_rtmp_send.c \ + $ngx_addon_dir/ngx_rtmp_receive.c \ + $ngx_addon_dir/ngx_rtmp_broadcast_module.c" diff --git a/ngx_rtmp.c b/ngx_rtmp.c index fbc8401..26e8d7c 100644 --- a/ngx_rtmp.c +++ b/ngx_rtmp.c @@ -20,6 +20,10 @@ static ngx_int_t ngx_rtmp_add_addrs6(ngx_conf_t *cf, ngx_rtmp_port_t *mport, ngx_rtmp_conf_addr_t *addr); #endif static ngx_int_t ngx_rtmp_cmp_conf_addrs(const void *one, const void *two); +static ngx_int_t ngx_rtmp_init_events(ngx_conf_t *cf, + ngx_rtmp_core_main_conf_t *cmcf); +static ngx_int_t ngx_rtmp_init_phase_handlers(ngx_conf_t *cf, + ngx_rtmp_core_main_conf_t *cmcf); ngx_uint_t ngx_rtmp_max_module; @@ -141,12 +145,25 @@ ngx_rtmp_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) } } - - /* parse inside the rtmp{} block */ - pcf = *cf; cf->ctx = ctx; + for (m = 0; ngx_modules[m]; m++) { + if (ngx_modules[m]->type != NGX_RTMP_MODULE) { + continue; + } + + module = ngx_modules[m]->ctx; + + if (module->preconfiguration) { + if (module->preconfiguration(cf) != NGX_OK) { + return NGX_CONF_ERROR; + } + } + } + + /* parse inside the rtmp{} block */ + cf->module_type = NGX_RTMP_MODULE; cf->cmd_type = NGX_RTMP_MAIN_CONF; rv = ngx_conf_parse(cf, NULL); @@ -200,8 +217,29 @@ ngx_rtmp_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) } } + if (ngx_rtmp_init_events(cf, cmcf) != NGX_OK) { + return NGX_CONF_ERROR; + } + + for (m = 0; ngx_modules[m]; m++) { + if (ngx_modules[m]->type != NGX_RTMP_MODULE) { + continue; + } + + module = ngx_modules[m]->ctx; + + if (module->postconfiguration) { + if (module->postconfiguration(cf) != NGX_OK) { + return NGX_CONF_ERROR; + } + } + } + *cf = pcf; + if (ngx_rtmp_init_event_handlers(cf, cmcf) != NGX_OK) { + return NGX_CONF_ERROR; + } if (ngx_array_init(&ports, cf->temp_pool, 4, sizeof(ngx_rtmp_conf_port_t)) != NGX_OK) @@ -221,6 +259,87 @@ ngx_rtmp_block(ngx_conf_t *cf, ngx_command_t *cmd, void *conf) } +static ngx_int_t +ngx_rtmp_init_events(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) +{ + for(n = 0; n < NGX_RTMP_MSG_MAX; ++n) { + if (ngx_array_init(&cmcf->events[n], cf->pool, 1, + sizeof(ngx_rtmp_event_handler_pt)) != NGX_OK) + { + return NGX_ERROR; + } + } + + if (ngx_init_array(&conf->calls, cf->pool, 1, + sizeof(ngx_hash_key_t)) != NGX_OK) + { + return NGX_ERROR; + } + + if (ngx_init_array(&conf->disconnect, cf->pool, 1, + sizeof(ngx_rtmp_disconnect_handler_pt)) != NGX_OK) + { + return NGX_ERROR; + } + + + return NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_init_event_handlers(ngx_conf_t *cf, ngx_rtmp_core_main_conf_t *cmcf) +{ + ngx_hash_init_t calls_hash; + ngx_event_handler_pt *eh; + ngx_hash_key_t *h; + size_t n; + static size_t pm_events[] = { + NGX_RTMP_MSG_CHUNK_SIZE, + NGX_RTMP_MSG_ABORT, + NGX_RTMP_MSG_ACK, + NGX_RTMP_MSG_USER, + NGX_RTMP_MSG_ACK_SIZE, + NGX_RTMP_MSG_BANDWIDTH + }; + + /* init events */ + for(n = 0; n < sizeof(pm_events) / sizeof(p_events[0]); ++n) { + eh = ngx_array_push(&cmcf->events[pm_events[n]]); + *eh = ngx_rtmp_protocol_message_handler; + } + + eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_USER]); + *eh = ngx_rtmp_user_message_handler; + + eh = ngx_array_push(&cmcf->events[NGX_RTMP_MSG_AMF0_CMD]); + *eh = ngx_rtmp_amf0_message_handler; + + + /* init calls */ + for(n = 0; n < cmcf->nelts; ++n) { + h = &cmcf->calls.elts[n]; + h->key_hash = ngx_hash_key_lc(h->key.data, h->key.len); + } + + calls_hash.hash = &cmcf->calls_hash; + calls_hash.key = ngx_hash_key_lc; + calls_hash.max_size = 1024; + calls_hash.bucket_size = ngx_cacheline_size; + calls_hash.name = "calls_hash"; + calls_hash.pool = cf->pool; + calls_hash.temp_pool = NULL; + + if (ngx_hash_init(&calls_hash, cmcf->calls.elts, cmcf->calls.nelts) + != NGX_OK) + { + return NGX_ERROR; + } + + return NGX_OK; +} + + static ngx_int_t ngx_rtmp_add_ports(ngx_conf_t *cf, ngx_array_t *ports, ngx_rtmp_listen_t *listen) diff --git a/ngx_rtmp.h b/ngx_rtmp.h index 5bd46d7..12788d3 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -13,13 +13,6 @@ #include -#define NGX_RTMP_HANDSHAKE_SIZE 1536 - -#define NGX_RTMP_DEFAULT_CHUNK_SIZE 128 - -#define NGX_LOG_DEBUG_RTMP NGX_LOG_DEBUG_CORE - - typedef struct { void **main_conf; void **srv_conf; @@ -101,115 +94,11 @@ typedef struct { } ngx_rtmp_conf_addr_t; -typedef struct { - ngx_array_t servers; /* ngx_rtmp_core_srv_conf_t */ - ngx_array_t listen; /* ngx_rtmp_listen_t */ -} ngx_rtmp_core_main_conf_t; +#define NGX_RTMP_HANDSHAKE_SIZE 1536 +#define NGX_RTMP_DEFAULT_CHUNK_SIZE 128 -typedef struct { - uint32_t csid; /* chunk stream id */ - uint32_t timestamp; - uint32_t mlen; /* message length */ - uint8_t type; /* message type id */ - uint32_t msid; /* message stream id */ -} ngx_rtmp_packet_hdr_t; - - -#define NGX_RTMP_PUBLISHER 0x01 -#define NGX_RTMP_SUBSCRIBER 0x02 - - -typedef struct ngx_rtmp_stream_t { - ngx_rtmp_packet_hdr_t hdr; - ngx_chain_t *in; -} ngx_rtmp_stream_t; - - -struct ngx_rtmp_session_s { - uint32_t signature; /* "RTMP" */ - - ngx_connection_t *connection; - - void **ctx; - void **main_conf; - void **srv_conf; - - ngx_str_t *addr_text; - - /* handshake */ - ngx_buf_t buf; - ngx_uint_t hs_stage; - - /* input - * stream 0 (reserved by RTMP spec) - * used for free chain link */ - - /* TODO: make stream #1 handle ANY single stream; - * that'll introduce support for - * unlimited number of streams given - * there's no interleaving between them */ - - ngx_rtmp_stream_t *streams; - uint32_t in_csid; - ngx_uint_t in_chunk_size; - ngx_pool_t *in_pool; - - /* output */ - ngx_chain_t *out; - ngx_rtmp_packet_hdr_t out_hdr; - - /* broadcast */ - ngx_str_t name; - struct ngx_rtmp_session_s - *next; - ngx_uint_t flags; - uint32_t csid; -}; - -typedef struct ngx_rtmp_session_s ngx_rtmp_session_t; - - -#define NGX_RTMP_SESSION_HASH_SIZE 16384 - - -typedef struct { - ngx_msec_t timeout; - ngx_flag_t so_keepalive; - ngx_int_t max_streams; - - /* shared output buffers */ - ngx_uint_t out_chunk_size; - ngx_pool_t *pool; - ngx_chain_t *free; - - ngx_rtmp_session_t **sessions; /* session hash map: name->session */ - - ngx_rtmp_conf_ctx_t *ctx; -} ngx_rtmp_core_srv_conf_t; - - -typedef struct { - ngx_str_t *client; - ngx_rtmp_session_t *session; -} ngx_rtmp_log_ctx_t; - - -typedef struct { - void *(*create_main_conf)(ngx_conf_t *cf); - char *(*init_main_conf)(ngx_conf_t *cf, void *conf); - - void *(*create_srv_conf)(ngx_conf_t *cf); - char *(*merge_srv_conf)(ngx_conf_t *cf, void *prev, - void *conf); -} ngx_rtmp_module_t; - - -/* Chunk header: - * max 3 basic header - * + max 11 message header - * + max 4 extended header (timestamp) */ -#define NGX_RTMP_MAX_CHUNK_HEADER 18 +#define NGX_LOG_DEBUG_RTMP NGX_LOG_DEBUG_CORE /* RTMP message types*/ @@ -229,6 +118,7 @@ typedef struct { #define NGX_RTMP_MSG_AMF0_SHARED 19 #define NGX_RTMP_MSG_AMF0_CMD 20 #define NGX_RTMP_MSG_AGGREGATE 22 +#define NGX_RTMP_MSG_MAX 23 /* RMTP control message types */ @@ -241,6 +131,104 @@ typedef struct { #define NGX_RTMP_USER_PING_RESPONSE 7 +/* Chunk header: + * max 3 basic header + * + max 11 message header + * + max 4 extended header (timestamp) */ +#define NGX_RTMP_MAX_CHUNK_HEADER 18 + + +typedef struct { + uint32_t csid; /* chunk stream id */ + uint32_t timestamp; + uint32_t mlen; /* message length */ + uint8_t type; /* message type id */ + uint32_t msid; /* message stream id */ +} ngx_rtmp_header_t; + + +typedef struct ngx_rtmp_stream_t { + ngx_rtmp_header_t hdr; + ngx_chain_t *in; +} ngx_rtmp_stream_t; + + +typedef struct { + uint32_t signature; /* "RTMP" */ + + ngx_connection_t *connection; + + void **ctx; + void **main_conf; + void **srv_conf; + + ngx_str_t *addr_text; + + /* handshake */ + ngx_buf_t buf; + ngx_uint_t hs_stage; + + /* input stream 0 (reserved by RTMP spec) + * used for free chain link */ + + ngx_rtmp_stream_t *in_streams; + uint32_t in_csid; + ngx_uint_t in_chunk_size; + ngx_pool_t *in_pool; + + ngx_chain_t *out; +} ngx_rtmp_session_t; + + +typedef ngx_int_t (*ngx_rtmp_event_handler_pt)(ngx_rtmp_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in); +typedef ngx_int_t (*ngx_rtmp_call_handler_pt)(ngx_rtmp_session_t *s, + double trans, ngx_chain_t *in); +typedef ngx_int_t (*ngx_rtmp_disconnect_handler_pt)(ngx_rtmp_session_t *s); + + +typedef struct { + ngx_array_t servers; /* ngx_rtmp_core_srv_conf_t */ + ngx_array_t listen; /* ngx_rtmp_listen_t */ + + ngx_array_t events[NGX_RTMP_MSG_MAX]; + ngx_hash_t calls_hash; + ngx_array_t calls; + ngx_array_t disconect; +} ngx_rtmp_core_main_conf_t; + + +typedef struct { + ngx_msec_t timeout; + ngx_flag_t so_keepalive; + ngx_int_t max_streams; + + ngx_uint_t out_chunk_size; + ngx_pool_t *out_pool; + ngx_chain_t *out_free; + + ngx_rtmp_conf_ctx_t *ctx; +} ngx_rtmp_core_srv_conf_t; + + +typedef struct { + ngx_str_t *client; + ngx_rtmp_session_t *session; +} ngx_rtmp_log_ctx_t; + + +typedef struct { + ngx_int_t (*preconfiguration)(ngx_conf_t *cf); + ngx_int_t (*postconfiguration)(ngx_conf_t *cf); + + void *(*create_main_conf)(ngx_conf_t *cf); + char *(*init_main_conf)(ngx_conf_t *cf, void *conf); + + void *(*create_srv_conf)(ngx_conf_t *cf); + char *(*merge_srv_conf)(ngx_conf_t *cf, void *prev, + void *conf); +} ngx_rtmp_module_t; + #define NGX_RTMP_MODULE 0x504D5452 /* "RTMP" */ #define NGX_RTMP_MAIN_CONF 0x02000000 @@ -265,13 +253,23 @@ typedef struct { #define ngx_rtmp_conf_get_module_srv_conf(cf, module) \ ((ngx_rtmp_conf_ctx_t *) cf->ctx)->srv_conf[module.ctx_index] + void ngx_rtmp_init_connection(ngx_connection_t *c); void ngx_rtmp_close_session(ngx_rtmp_session_t *s); u_char * ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len); + +/* Receiving messages */ +ngx_int_t ngx_rtmp_protocol_message_handler(ngx_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in); +ngx_int_t ngx_rtmp_user_message_handler(ngx_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in); +ngx_int_t ngx_rtmp_amf0_message_handler(ngx_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in); + /* Sending messages */ ngx_chain_t * ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s); -void ngx_rtmp_prepare_message(ngx_rtmp_packet_hdr_t *h, +void ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out, uint8_t fmt); void ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out); @@ -315,42 +313,6 @@ ngx_int_t ngx_rtmp_receive_amf0(ngx_session_t *s, ngx_chain_t *in, ngx_rtmp_amf0_elt_t *elts, size_t nelts) -/************** will go to modules */ - -/* Broadcasting */ -void ngx_rtmp_join(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_uint_t flags); -void ngx_rtmp_leave(ngx_rtmp_session_t *s); - -/* NetConnection methods */ -ngx_int_t ngx_rtmp_connect(ngx_rtmp_session_t *s, - double trans_id, ngx_chain_t *l); -ngx_int_t ngx_rtmp_call(ngx_rtmp_session_t *s, - double trans_id, ngx_chain_t *l); -ngx_int_t ngx_rtmp_close(ngx_rtmp_session_t *s, - double trans_id, ngx_chain_t *l); -ngx_int_t ngx_rtmp_createstream(ngx_rtmp_session_t *s - double trans_id, , ngx_chain_t *l); - -/* NetStream methods */ -ngx_int_t ngx_rtmp_play(ngx_rtmp_session_t *s, - double trans_id, ngx_chain_t *l); -ngx_int_t ngx_rtmp_play2(ngx_rtmp_session_t *s, - double trans_id, ngx_chain_t *l); -ngx_int_t ngx_rtmp_deletestream(ngx_rtmp_session_t *s, - double trans_id, ngx_chain_t *l); -ngx_int_t ngx_rtmp_closestream(ngx_rtmp_session_t *s, - double trans_id, ngx_chain_t *l); -ngx_int_t ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s, - double trans_id, ngx_chain_t *l); -ngx_int_t ngx_rtmp_receivevideo(ngx_rtmp_session_t *s, - double trans_id, ngx_chain_t *l); -ngx_int_t ngx_rtmp_publish(ngx_rtmp_session_t *s, - double trans_id, ngx_chain_t *l); -ngx_int_t ngx_rtmp_seek(ngx_rtmp_session_t *s, - double trans_id, ngx_chain_t *l); -ngx_int_t ngx_rtmp_pause(ngx_rtmp_session_t *s, - double trans_id, ngx_chain_t *l); - extern ngx_uint_t ngx_rtmp_max_module; extern ngx_module_t ngx_rtmp_core_module; diff --git a/ngx_rtmp_amf0.h b/ngx_rtmp_amf0.h index bb8f84f..1db5e43 100644 --- a/ngx_rtmp_amf0.h +++ b/ngx_rtmp_amf0.h @@ -19,8 +19,6 @@ #include -/*TODO: char -> u_char */ - typedef struct { ngx_int_t type; char *name; diff --git a/ngx_rtmp_broadcast_module.c b/ngx_rtmp_broadcast_module.c new file mode 100644 index 0000000..88a0b59 --- /dev/null +++ b/ngx_rtmp_broadcast_module.c @@ -0,0 +1,268 @@ +/* + * Copyright (c) 2012 Roman Arutyunyan + */ + + +#include +#include +#include "ngx_rtmp.h" + + +static ngx_int_t ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf); +static void * ngx_rtmp_broadcast_create_srv_conf(ngx_conf_t *cf); +static char * ngx_rtmp_broadcast_merge_srv_conf(ngx_conf_t *cf, + void *parent, void *child); + + +typedef struct { + /* use hash-map + * stream -> broadcast contexts */ + ngx_int_t buckets; + struct ngx_rtmp_broadcast_ctx_s **contexts; +} ngx_rtmp_broadcast_srv_conf_t; + + +static ngx_command_t ngx_rtmp_broadcast_commands[] = { + { ngx_string("broadcast_buckets"), + NGX_RTMP_MAIN_CONF|NGX_RTMP_SRV_CONF|NGX_CONF_TAKE1, + ngx_conf_set_str_slot, + NGX_RTMP_SRV_CONF_OFFSET, + offsetof(ngx_rtmp_broadcast_srv_conf_t, buckets), + NULL }, + + ngx_null_command +}; + + +static ngx_rtmp_module_t ngx_rtmp_broadcast_module_ctx = { + NULL, /* preconfiguration */ + ngx_rtmp_broadcast_postconfiguration, /* postconfiguration */ + NULL, /* create main configuration */ + NULL, /* init main configuration */ + ngx_rtmp_broadcast_create_srv_conf, /* create server configuration */ + ngx_rtmp_broadcast_merge_srv_conf /* merge server configuration */ +}; + + +ngx_module_t ngx_rtmp_broadcast_module = { + NGX_MODULE_V1, + &ngx_rtmp_broadcast_module_ctx, /* module context */ + ngx_rtmp_broadcast_commands, /* module directives */ + NGX_RTMP_MODULE, /* module type */ + NULL, /* init master */ + NULL, /* init module */ + NULL, /* init process */ + NULL, /* init thread */ + NULL, /* exit thread */ + NULL, /* exit process */ + NULL, /* exit master */ + NGX_MODULE_V1_PADDING +}; + + +#define NGX_RTMP_PUBLISHER 0x01 +#define NGX_RTMP_SUBSCRIBER 0x02 + +#define NGX_RTMP_SESSION_HASH_SIZE 16384 + + +typedef struct ngx_rtmp_broadcast_ctx_s { + ngx_str_t stream; + ngx_rtmp_session_t *session; + struct ngx_rtmp_broadcast_ctx_s *next; + ngx_uint_t flags; /* publisher/subscriber */ + uint32_t csid; +} ngx_rtmp_broadcast_ctx_t; + + +static void * +ngx_rtmp_broadcast_create_srv_conf(ngx_conf_t *cf) +{ + ngx_rtmp_broadcast_srv_conf_t *cscf; + + cscf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_broadcast_srv_conf_t)); + if (cscf == NULL) { + return NULL; + } + + cscf->buckets = NGX_CONF_UNSET; + + return cscf; +} + + +static char * +ngx_rtmp_broadcast_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) +{ + ngx_rtmp_broadcast_srv_conf_t *prev = parent; + ngx_rtmp_broadcast_srv_conf_t *conf = child; + + ngx_conf_merge_value(conf->buckets, prev->buckets, 1024); + + conf->contexts = ngx_pcalloc(cf->pool, + sizeof(ngx_rtmp_broadcast_ctx_t *) * conf->buckets); + + return NGX_CONF_OK; +} + + +static ngx_rtmp_broadcast_ctx_t ** +ngx_rtmp_broadcast_get_head(ngx_rtmp_broadcast_ctx_t *ctx) +{ + ngx_rtmp_broadcast_srv_conf_t *bscf; + + bscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_broadcast_module); + + return &bscf->contexts[ + ngx_hash_key(ctx->stream.data, ctx->stream.len) + % bscf->buckets]; +} + + +static void +ngx_rtmp_broadcast_join(ngx_rtmp_session_t *s, ngx_str_t *stream, + ngx_uint_t flags) +{ + ngx_connection_t *c; + ngx_rtmp_broadcast_ctx_t *ctx, **hctx; + + c = s->connection; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module); + if (ctx == NULL) { + ctx = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_broadcast_ctx_t)); + ctx->session = s; + ngx_rtmp_set_ctx(s, ctx, ngx_rtmp_broadcast_module); + } + + if (ctx->stream.len) { + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, "already joined"); + return; + } + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "join broadcast stream '%V'", &stream); + + s->stream = *stream; + hctx = ngx_rtmp_broadcast_get_head(ctx); + ctx->next = *hctx; + ctx->flags = flags; + *hctx = ctx; +} + + +static void +ngx_rtmp_broadcast_leave(ngx_rtmp_session_t *s) +{ + ngx_connection_t *c; + ngx_rtmp_broadcast_ctx_t *ctx, **hctx; + + c = s->connection; + + ctx = ngx_rtmp_get_module_ctx(s, ngx_rtmp_broadcast_module); + if (ctx == NULL || !ctx->stream.len) { + return; + } + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "leave broadcast stream '%V'", &s->stream); + + hctx = ngx_rtmp_broadcast_get_head(ctx); + ngx_str_null(&ctx->stream); + + for(; *hctx; hctx = &(*hctx)->next) { + if (*hctx == ctx) { + *hctx = (*hctx)->next; + return; + } + } +} + + +static ngx_rtmp_broadcast_connect(ngx_rtmp_session_t *s, double in_trans, + ngx_chain_t *in) +{ + static double trans; + static u_char app[1024]; + static u_char url[1024]; + + static ngx_rtmp_amf0_elt_t in_cmd[] = { + { NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) }, + { NGX_RTMP_AMF0_STRING, "pageUrl", url, sizeof(utl) }, + }; + + static ngx_rtmp_amf0_elt_t out_inf[] = { + { NGX_RTMP_AMF0_STRING, "code", NULL, 0 }, + { NGX_RTMP_AMF0_STRING, "level", NULL, 0 }, + { NGX_RTMP_AMF0_STRING, "description", NULL, 0 }, + }; + + static ngx_rtmp_amf0_elt_t in_elts[] = { + { NGX_RTMP_AMF0_OBJECT, NULL, in_cmd, sizeof(in_cmd) }, + { NGX_RTMP_AMF0_NULL, NULL, NULL, 0 }, + }; + + static ngx_rtmp_amf0_elt_t out_elts[] = { + { NGX_RTMP_AMF0_STRING, NULL, "_result", sizeof("_result") - 1 }, + { NGX_RTMP_AMF0_NUMBER, NULL, &trans, sizeof(trans) }, + { NGX_RTMP_AMF0_NULL , NULL, NULL, 0 }, + { NGX_RTMP_AMF0_OBJECT, NULL, out_inf, sizeof(out_inf) }, + }; + + if (ngx_rtmp_receive_amf0(s, in, in_elts, + sizeof(in_elts) / sizeof(in_elts[0]))) + { + return NGX_ERROR; + } + + trans = in_trans; + ngx_str_set(&inf[0], "NetConnection.Connect.Success"); + ngx_str_set(&inf[1], "status"); + ngx_str_set(&inf[2], "Connection succeeded."); + + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, + "connect() called; app='%s' url='%s'", + app, url); + + /*ngx_rtmp_broadcast_join(s, app);*/ + + return ngx_rtmp_send_ack_size(s, 65536) + || ngx_rtmp_send_bandwidth(s, 65536, NGX_RTMP_LIMIT_SOFT) + || ngx_rtmp_send_user_stream_begin(s, 1) + || ngx_rtmp_send_amf0(s, 3, 1, out_elts, + sizeof(out_elts) / sizeof(out_elts[0])) + ? NGX_ERROR + : NGX_OK; +} + + +static ngx_int_t +ngx_rtmp_broadcast_postconfiguration(ngx_conf_t *cf) +{ + ngx_rtmp_core_main_conf_t *cmcf; + ngx_hash_key_t *h; + ngx_rtmp_disconnect_handler_pt *dh; + + cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_rtmp_module); + + /* add connect() handler */ + h = ngx_array_push(&cmcf->calls); + + if (h == NULL) { + return NGX_ERROR; + } + + ngx_str_set(&h->key, "connect"); + h->value = ngx_rtmp_broadcast_connect; + + /* add disconnect handler */ + dh = ngx_array_push(&cmcf->disconnect); + + if (dh == NULL) { + return NGX_ERROR; + } + + *dh = ngx_rtmp_broadcast_leave; + + return NGX_OK; +} diff --git a/ngx_rtmp_core_module.c b/ngx_rtmp_core_module.c index 97963bd..9d49707 100644 --- a/ngx_rtmp_core_module.c +++ b/ngx_rtmp_core_module.c @@ -75,11 +75,12 @@ static ngx_command_t ngx_rtmp_core_commands[] = { static ngx_rtmp_module_t ngx_rtmp_core_module_ctx = { - ngx_rtmp_core_create_main_conf, /* create main configuration */ - NULL, /* init main configuration */ - - ngx_rtmp_core_create_srv_conf, /* create server configuration */ - ngx_rtmp_core_merge_srv_conf /* merge server configuration */ + NULL, /* preconfiguration */ + NULL, /* postconfiguration */ + ngx_rtmp_core_create_main_conf, /* create main configuration */ + NULL, /* init main configuration */ + ngx_rtmp_core_create_srv_conf, /* create server configuration */ + ngx_rtmp_core_merge_srv_conf /* merge server configuration */ }; @@ -129,22 +130,16 @@ ngx_rtmp_core_create_main_conf(ngx_conf_t *cf) static void * ngx_rtmp_core_create_srv_conf(ngx_conf_t *cf) { - ngx_rtmp_core_srv_conf_t *cscf; + ngx_rtmp_core_srv_conf_t *cscf; + size_t n cscf = ngx_pcalloc(cf->pool, sizeof(ngx_rtmp_core_srv_conf_t)); if (cscf == NULL) { return NULL; } - /* - * set by ngx_pcalloc(): - * - * cscf->protocol = NULL; - */ - cscf->timeout = NGX_CONF_UNSET_MSEC; cscf->so_keepalive = NGX_CONF_UNSET; - cscf->buffers = NGX_CONF_UNSET; conf->max_streams = NGX_CONF_UNSET; conf->out_chunk_size = NGX_CONF_UNSET; @@ -164,10 +159,14 @@ ngx_rtmp_core_merge_srv_conf(ngx_conf_t *cf, void *parent, void *child) ngx_conf_merge_value(conf->max_streams, prev->max_streams, 16); ngx_conf_merge_value(conf->out_chunk_size, prev->out_chunk_size, 128); - conf->pool = ngx_create_pool(4096, cf->log); + if (prev->out_pool == NULL) { + prev->out_pool = ngx_create_pool(4096, cf->log); + if (prev->out_pool == NULL) { + return NGX_CONF_ERROR; + } + } - conf->sessions = ngx_pcalloc(cf->pool, - sizeof(ngx_rtmp_session_t*) * NGX_RTMP_SESSION_HASH_SIZE); + conf->out_pool = prev->out_pool; return NGX_CONF_OK; } diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index 2dd41e8..de344d4 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -210,9 +210,9 @@ ngx_rtmp_init_session(ngx_connection_t *c) return; } - s->streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t) + s->in_streams = ngx_pcalloc(c->pool, sizeof(ngx_rtmp_stream_t) * cmcf->max_streams); - if (s->streams == NULL) { + if (s->in_streams == NULL) { ngx_rtmp_close_session(s); return; } @@ -377,7 +377,7 @@ ngx_rtmp_recv(ngx_event_t *rev) uint32_t timestamp; size_t size; ngx_chain_t *in; - ngx_rtmp_packet_hdr_t *h; + ngx_rtmp_header_t *h; uint8_t fmt; uint32_t csid; ngx_rtmp_stream_t *st, st0; @@ -391,7 +391,7 @@ ngx_rtmp_recv(ngx_event_t *rev) for(;;) { - st = &s->streams[s->csid]; + st = &s->in_streams[s->csid]; if (st->in == NULL) { if ((st->in = ngx_alloc_chain_link(s->in_pool)) == NULL @@ -492,7 +492,7 @@ ngx_rtmp_recv(ngx_event_t *rev) /* link to new stream */ s->csid = csid; - st = s->streams[csid]; + st = s->in_streams[csid]; if (st->in == NULL) { in->next = in; } else { @@ -595,7 +595,7 @@ ngx_rtmp_recv(ngx_event_t *rev) b->pos += h->mlen; /* add used bufs to stream #0 */ - st0 = &s->streams[0]; + st0 = &s->in_streams[0]; st->in->next = st0->in->next; st0->in->next = head; } @@ -687,17 +687,17 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s) cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - if (cscf->free) { - out = cscf->free; - cscf->free = out->next; + if (cscf->out_free) { + out = cscf->out_free; + cscf->out_free = out->next; } else { - out = ngx_alloc_chain_link(cscf->pool); + out = ngx_alloc_chain_link(cscf->out_pool); if (out == NULL) { return NULL; } - out->buf = ngx_calloc_buf(cscf->pool); + out->buf = ngx_calloc_buf(cscf->out_pool); if (out->buf == NULL) { return NULL; } @@ -705,7 +705,7 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s) size = cscf->out_chunk_size + NGX_RTMP_MAX_CHUNK_HEADER; b = out->buf; - b->start = ngx_palloc(cscf->pool, size); + b->start = ngx_palloc(cscf->out_pool, size); b->end = b->start + size; } @@ -719,7 +719,7 @@ ngx_rtmp_alloc_shared_buf(ngx_rtmp_session_t *s) void -ngx_rtmp_prepare_message(ngx_rtmp_packet_hdr_t *h, ngx_chain_t *out, +ngx_rtmp_prepare_message(ngx_rtmp_header_t *h, ngx_chain_t *out, uint8_t fmt) { ngx_chain_t *l; @@ -840,39 +840,14 @@ ngx_rtmp_send_message(ngx_rtmp_session_t *s, ngx_chain_t *out) static ngx_int_t ngx_rtmp_receive_message(ngx_rtmp_session_t *s, - ngx_rtmp_packet_hdr_t *h, ngx_chain_t *l) + ngx_rtmp_header_t *h, ngx_chain_t *in) { - ngx_rtmp_core_srv_conf_t *cscf; - ngx_connection_t *c; - ngx_buf_t *b; - struct { - uint16_t *v1; - uint16_t *v2; - uint16_t *v3; - } ping; - ngx_rtmp_session_t *ss; - static char invoke_name[64]; - static double trans_id; - static ngx_rtmp_amf0_elt_t invoke_name_elt[] = { - { NGX_RTMP_AMF0_STRING, NULL, invoke_name, sizeof(invoke_name) }, - { NGX_RTMP_AMF0_STRING, NULL, &trans_id, sizeof(trans_id) } - }; - ngx_rtmp_amf0_ctx_t amf_ctx; - - if (l == NULL) { - return NGX_ERROR; - } - - c = s->connection; - b = l->buf; - cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - - /* a session handles only one chunk stream - * but #2 is a special one for protocol messages */ - if (h->csid != 2) { - s->csid = h->csid; - } + ngx_rtmp_core_main_conf_t *cmcf; + ngx_array_t *evhs; + size_t n; + ngx_rtmp_event_handler_pt evh; + cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); #ifdef NGX_DEBUG { @@ -891,142 +866,22 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s, } #endif - switch(h->type) { - case NGX_RTMP_MSG_CHUNK_SIZE: - break; + if (h->type >= NGX_RTMP_MSG_MAX) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "unexpected RTMP message type: %d", (int)h->type); + return NGX_OK; + } - case NGX_RTMP_MSG_ABORT: - break; - - case NGX_RTMP_MSG_ACK: - break; - - case NGX_RTMP_MSG_CTL: - if (b->last - b->pos < 6) - return NGX_ERROR; - - ping.v1 = (uint16_t*)(b->pos); - ping.v2 = (uint16_t*)(b->pos + 2); - ping.v3 = (uint16_t*)(b->pos + 4); - - switch(*ping.v1) { - case NGX_RTMP_USER_STREAM_BEGIN: - break; - - case NGX_RTMP_USER_STREAM_EOF: - break; - - case NGX_RTMP_USER_STREAM_DRY: - break; - - case NGX_RTMP_USER_SET_BUFLEN: - break; - - case NGX_RTMP_USER_RECORDED: - break; - - case NGX_RTMP_USER_PING_REQUEST: - /* ping client from server */ - /**ping.v1 = NGX_RTMP_PING_PONG; - ngx_rtmp_send_message(s, h, l);*/ - break; - - case NGX_RTMP_USER_PING_RESPONSE: - break; - } - break; - - case NGX_RTMP_MSG_ACK_SIZE: - break; - - case NGX_RTMP_MSG_BANDWIDTH: - break; - - case NGX_RTMP_MSG_EDGE: - break; - - case NGX_RTMP_MSG_AUDIO: - case NGX_RTMP_MSG_VIDEO: - /* - if (!(s->flags & NGX_RTMP_PUBLISHER)) { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, - "received audio/video from non-publisher"); - return NGX_ERROR; - } - - for(ss = *ngx_rtmp_get_session_head(s); - ss; ss = ss->next) - { - if (s != ss - && ss->flags & NGX_RTMP_SUBSCRIBER - && s->name.len == ss->name.len - && !ngx_strncmp(s->name.data, ss->name.data, - s->name.len)) - { - ngx_rtmp_send_message(ss, h, l); - } - - }*/ - break; - - case NGX_RTMP_MSG_AMF3_META: - case NGX_RTMP_MSG_AMF3_SHARED: - case NGX_RTMP_MSG_AMF3_CMD: - /* FIXME: AMF3 it not yet supported */ - break; - - case NGX_RTMP_MSG_AMF0_META: - break; - - case NGX_RTMP_MSG_AMF0_SHARED: - break; - - case NGX_RTMP_MSG_AMF0_CMD: - amf_ctx.link = &l; - amf_ctx.free = &s->free; - amf_ctx.log = c->log; - - memset(invoke_name, 0, sizeof(invoke_name)); - if (ngx_rtmp_amf0_read(&amf_ctx, &invoke_name_elt, 1) != NGX_OK) { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, - "AMF0 cmd failed"); - return NGX_ERROR; - } - - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "AMF0 cmd '%s'", - invoke_name); - -#define _CMD_CALL(name) \ - if (!strcasecmp(invoke_name, #name)) { \ - return ngx_rtmp_##name(s, trans_id, l); \ - } - - /* NetConnection calls */ - _CMD_CALL(connect); - _CMD_CALL(call); - _CMD_CALL(close); - _CMD_CALL(createstream); - - /* NetStream calls */ - _CMD_CALL(play); - _CMD_CALL(play2); - _CMD_CALL(deletestream); - _CMD_CALL(closestream); - _CMD_CALL(receiveaudio); - _CMD_CALL(receivevideo); - _CMD_CALL(publish); - _CMD_CALL(seek); - _CMD_CALL(pause); - -#undef _CMD_CALL - - break; - - default: - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "unexpected packet type %d", - (int)h->type); + evhs = &cmcf->events[h->type]; + for(n = 0; n < evhs->nelts; ++n) { + evh = evh->elts[n]; + if (!evh) { + continue; + } + + if (evh(s, h, in) != NGX_OK) { + return NGX_ERROR; + } } return NGX_OK; @@ -1036,7 +891,19 @@ ngx_rtmp_receive_message(ngx_rtmp_session_t *s, void ngx_rtmp_close_session(ngx_rtmp_session_t *s) { - ngx_rtmp_leave(s); + size_t n; + ngx_rtmp_core_main_conf_t *cmcf; + ngx_rtmp_disconnect_handler_pt *h; + + cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); + + for(n = 0; n < cmcf->disconnect.nelts; ++n) { + h = &cmcf->disconnect.elts[n]; + if (*h) { + (*h)(s); + } + } + ngx_destroy_pool(s->in_pool); ngx_rtmp_close_connection(s->connection); } @@ -1088,72 +955,3 @@ ngx_rtmp_log_error(ngx_log_t *log, u_char *buf, size_t len) return p; } - -/************* this will go to module ****************/ -ngx_rtmp_session_t ** -ngx_rtmp_get_session_head(ngx_rtmp_session_t *s) -{ - ngx_rtmp_core_srv_conf_t *cscf; - - cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - - return &cscf->sessions[ - ngx_hash_key(s->name.data, s->name.len) - % NGX_RTMP_SESSION_HASH_SIZE]; -} - - -void -ngx_rtmp_join(ngx_rtmp_session_t *s, ngx_str_t *name, ngx_uint_t flags) -{ - ngx_rtmp_session_t **ps; - ngx_connection_t *c; - - c = s->connection; - - if (s->name.len) { - ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, - "already joined"); - return; - } - - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP join '%V'", - &name); - - s->name = *name; - ps = ngx_rtmp_get_session_head(s); - s->next = *ps; - s->flags = flags; - *ps = s; -} - - -void -ngx_rtmp_leave(ngx_rtmp_session_t *s) -{ - ngx_rtmp_session_t **ps; - ngx_connection_t *c; - - c = s->connection; - - if (!s->name.len) - return; - - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP leave '%V'", - &s->name); - - ps = ngx_rtmp_get_session_head(s); - - ngx_str_null(&s->name); - - for(; *ps; ps = &(*ps)->next) { - if (*ps == s) { - *ps = (*ps)->next; - return; - } - } -} - - diff --git a/ngx_rtmp_netconn.c b/ngx_rtmp_netconn.c deleted file mode 100644 index d97143a..0000000 --- a/ngx_rtmp_netconn.c +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright (c) 2012 Roman Arutyunyan - */ - -#include "ngx_rtmp_amf0.h" - -ngx_int_t -ngx_rtmp_connect(ngx_rtmp_session_t *s, double in_trans, ngx_chain_t *in) -{ - /* 1) send 'Window Acknowledgement Size' - * - * 2) send 'Set Peer Bandwidth' - * - * 3*) receive 'Window Acknowledgement' - * - * 4) send 'User Control Message(StreamBegin)' - * ' - * 5) AMF0 reply: - * - * "_ result" - * 1 - * NULL - * { code : "NetConnection.Connect.Success", - * level : "status", - * description : "Connection succeeded." } - */ - - static double trans; - static char app[128]; - static char flashver[128]; - static char svfurl[128]; - - static ngx_rtmp_amf0_elt_t in_cmd[] = { - { NGX_RTMP_AMF0_STRING, "app", app, sizeof(app) }, - { NGX_RTMP_AMF0_STRING, "flashver", flashver, sizeof(flashver)}, - { NGX_RTMP_AMF0_STRING, "swfurl", svfurl, sizeof(svfurl) }, - }; - - static ngx_rtmp_amf0_elt_t in_elts[] = { - { NGX_RTMP_AMF0_OBJECT, 0, in_cmd, sizeof(in_cmd) }, - { NGX_RTMP_AMF0_NULL, 0, NULL, 0 }, - }; - - static ngx_rtmp_amf0_elt_t out_inf[] = { - { NGX_RTMP_AMF0_STRING, "code", NULL, 0 }, - { NGX_RTMP_AMF0_STRING, "level", NULL, 0 }, - { NGX_RTMP_AMF0_STRING, "description", NULL, 0 }, - }; - - static ngx_rtmp_amf0_elt_t out_elts[] = { - { NGX_RTMP_AMF0_STRING, 0, "_result", sizeof("_result") - 1 }, - { NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) }, - { NGX_RTMP_AMF0_NULL , 0, NULL, 0 }, - { NGX_RTMP_AMF0_OBJECT, 0, out_inf, sizeof(out_inf) }, - }; - - if (ngx_rtmp_receive_amf0(s, in, in_elts, - sizeof(in_elts) / sizeof(in_elts[0]))) - { - return NGX_ERROR; - } - - trans = in_trans; - ngx_str_set(&inf[0], "NetConnection.Connect.Success"); - ngx_str_set(&inf[1], "status"); - ngx_str_set(&inf[2], "Connection succeeded."); - - return ngx_rtmp_send_ack_size(s, 65536) - || ngx_rtmp_send_bandwidth(s, 65536, NGX_RTMP_LIMIT_SOFT) - || ngx_rtmp_send_user_stream_begin(s, 1) - || ngx_rtmp_send_amf0(s, 3, 1, out_elts, - sizeof(out_elts) / sizeof(out_elts[0])) - ? NGX_ERROR - : NGX_OK; -} - -ngx_int_t -ngx_rtmp_call(ngx_rtmp_session_t *s, ngx_chain_t *l) -{ - return NGX_OK; -} - -ngx_int_t -ngx_rtmp_close(ngx_rtmp_session_t *s, ngx_chain_t *l) -{ - return NGX_OK; -} - -ngx_int_t -ngx_rtmp_createstream(ngx_rtmp_session_t *s, ngx_chain_t *l) -{ - return NGX_OK; -} - diff --git a/ngx_rtmp_netstream.c b/ngx_rtmp_netstream.c deleted file mode 100644 index 1628ffc..0000000 --- a/ngx_rtmp_netstream.c +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (c) 2012 Roman Arutyunyan - */ - -#include "ngx_rtmp.h" - -ngx_int_t -ngx_rtmp_play(ngx_rtmp_session_t *s, ngx_chain_t *l) -{ - return NGX_OK; -} - - -ngx_int_t -ngx_rtmp_play2(ngx_rtmp_session_t *s, ngx_chain_t *l) -{ - return NGX_OK; -} - - -ngx_int_t -ngx_rtmp_deletestream(ngx_rtmp_session_t *s, ngx_chain_t *l) -{ - return NGX_OK; -} - - -ngx_int_t -ngx_rtmp_closestream(ngx_rtmp_session_t *s, ngx_chain_t *l) -{ - return NGX_OK; -} - - -ngx_int_t -ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s, ngx_chain_t *l) -{ - return NGX_OK; -} - - -ngx_int_t -ngx_rtmp_receivevideo(ngx_rtmp_session_t *s, ngx_chain_t *l) -{ - return NGX_OK; -} - - -ngx_int_t -ngx_rtmp_publish(ngx_rtmp_session_t *s, ngx_chain_t *l) -{ - return NGX_OK; -} - - -ngx_int_t -ngx_rtmp_seek(ngx_rtmp_session_t *s, ngx_chain_t *l) -{ - return NGX_OK; -} - - -ngx_int_t -ngx_rtmp_pause(ngx_rtmp_session_t *s, ngx_chain_t *l) -{ - return NGX_OK; -} diff --git a/ngx_rtmp_receive.c b/ngx_rtmp_receive.c new file mode 100644 index 0000000..50acdab --- /dev/null +++ b/ngx_rtmp_receive.c @@ -0,0 +1,223 @@ +/* + * Copyright (c) 2012 Roman Arutyunyan + */ + + +#include "ngx_rtmp.h" +#include "ngx_rtmp_amf0.h" +#include + + +ngx_int_t +ngx_rtmp_protocol_message_handler(ngx_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in) +{ + ngx_buf_t *b; + u_char *p; + uint32_t val; + uint8_t limit; + ngx_connection_t *c; + + c = s->connection; + b = in->buf; + + if (b->last - b->pos < 4) { + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, + "too small buffer for %d message: %d", + (int)h->type, b->last - b->pos); + return NGX_OK; + } + + p = &val; + p[0] = b->pos[3]; + p[1] = b->pos[2]; + p[2] = b->pos[1]; + p[3] = b->pos[0]; + + switch(h->type) { + case NGX_RTMP_MSG_CHUNK_SIZE: + /* set chunk size =val */ + break; + + case NGX_RTMP_MSG_ABORT: + /* abort chunk stream =val */ + break; + + case NGX_RTMP_MSG_ACK: + /* receive ack with sequence number =val */ + break; + + case NGX_RTMP_MSG_ACK_SIZE: + /* receive window size =val */ + break; + + case NGX_RTMP_MSG_BANDWIDTH: + if (b->last - b->pos >= 5) { + limit = *(uint8_t*)&b->pos[4]; + + (void)val; + (void)limit; + + /* receive window size =val + * && limit */ + } + break; + + default: + return NGX_ERROR; + } + + return NGX_OK; +} + + +ngx_int_t +ngx_rtmp_user_message_handler(ngx_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in) +{ + ngx_buf_t *b; + u_char *p; + uint16_t evt; + uint32_t val, arg; + ngx_connection_t *c; + + c = s->connection; + b = in->buf; + + if (b->last - b->pos < 6) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "too small buffer for user message: %d", + b->last - b->pos); + return NGX_OK; + } + + p = &evt; + p[0] = b->pos[1]; + p[1] = b->pos[0]; + + p = &val; + p[0] = b->pos[5]; + p[1] = b->pos[4]; + p[2] = b->pos[3]; + p[3] = b->pos[2]; + + switch(evt) { + case NGX_RTMP_USER_STREAM_BEGIN: + /* use =val as stream id which started */ + break; + + case NGX_RTMP_USER_STREAM_EOF: + /* use =val as stream id which is over */ + break; + + case NGX_RTMP_USER_STREAM_DRY: + /* stream =val is dry */ + break; + + case NGX_RTMP_USER_SET_BUFLEN: + if (b->last - b->pos >= 10) { + p = &arg; + p[0] = b->pos[9]; + p[1] = b->pos[8]; + p[2] = b->pos[7]; + p[3] = b->pos[6]; + + (void)arg; + + /* use =val as stream id && arg as buflen in msec*/ + } + break; + + case NGX_RTMP_USER_RECORDED: + /* stream =val is recorded */ + break; + + case NGX_RTMP_USER_PING_REQUEST: + ngx_rtmp_send_user_ping_response(s, val); + break; + + case NGX_RTMP_USER_PING_RESPONSE: + /* use =val as incoming timestamp */ + break; + + default: + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "unexpected user event: %d", + (int)evt); + + return NGX_OK; + } + + return NGX_OK; +} + + +ngx_int_t +ngx_rtmp_amf0_message_handler(ngx_session_t *s, + ngx_rtmp_header_t *h, ngx_chain_t *in) +{ + ngx_rtmp_amf0_ctx_t act; + ngx_connection_t *c; + ngx_rtmp_core_main_conf_t *cmcf; + ngx_rtmp_call_handler_pt *ch; + size_t len; + + static double trans; + static u_char func[128]; + + static ngx_rtmp_amf0_elt_t elts[] = { + { NGX_RTMP_AMF0_STRING, 0, func, sizeof(func) }, + { NGX_RTMP_AMF0_NUMBER, 0, NULL, 0 }, + }; + + c = s->connection; + cmcf = ngx_rtmp_get_module_main_conf(s, ngx_rtmp_core_module); + + /* read AMF0 func name & transaction id */ + act.link = in; + act.log = s->connection->log; + memset(func, 0, sizeof(func)); + + if (ngx_rtmp_amf0_read(&ect, elts, + sizeof(elts) / sizeof(elts[0])) != NGX_OK) + { + ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, + "AMF0 cmd failed"); + return NGX_ERROR; + } + + len = ngx_strlen(func); + + /* lookup function handler + * only the first handler is called so far + * because ngx_hash_find only returns one item; + * no good to patch NGINX core ;) */ + ch = ngx_hash_find(&cmcf->calls_hash, + ngx_hash_key_lc(func, len), func, len); + + if (ch) { + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "AMF0 func '%s' @%f passed to handler", func, trans); + + return (*ch)(s, trans, in); + } + + ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, + "AMF0 cmd '%s' @%f no handler", func, trans); + + return NGX_OK; +} + + +ngx_int_t +ngx_rtmp_receive_amf0(ngx_session_t *s, ngx_chain_t *in, + ngx_rtmp_amf0_elt_t *elts, size_t nelts) +{ + ngx_rtmp_amf0_ctx_t act; + + act.link = in; + act.log = s->connection->log; + + return ngx_rtmp_amf0_read(&act, elts, nelts); +} + diff --git a/ngx_rtmp_send.c b/ngx_rtmp_send.c index 978d43b..573d4ae 100644 --- a/ngx_rtmp_send.c +++ b/ngx_rtmp_send.c @@ -1,6 +1,12 @@ +/* + * Copyright (c) 2012 Roman Arutyunyan + */ + + #include "ngx_rtmp.h.h" #include "ngx_rtmp_amf0.h" + #define NGX_RTMP_CTL_START(s, type) \ ngx_rtmp_packet_hdr_t __h; \ ngx_chain_t *__l; \ @@ -23,10 +29,6 @@ #define NGX_RTMP_CTL_OUT1(v) \ *(__p->last++) = ((u_char*)&v)[0]; -#define NGX_RTMP_CTL_OUT2(v) \ - *(__p->last++) = ((u_char*)&v)[1]; \ - *(__p->last++) = ((u_char*)&v)[0]; - #define NGX_RTMP_CTL_OUT4(v) \ *(__p->last++) = ((u_char*)&v)[3]; \ *(__p->last++) = ((u_char*)&v)[2]; \ @@ -212,15 +214,3 @@ ngx_rtmp_send_amf0(ngx_session_t *s, uint32_t csid, uint32_t msid, NGX_RTMP_AMF0_END(s); } -ngx_int_t -ngx_rtmp_receive_amf0(ngx_session_t *s, ngx_chain_t *in, - ngx_rtmp_amf0_elt_t *elts, size_t nelts) -{ - ngx_rtmp_amf0_ctx_t act; - - act.link = in; - act.log = s->connection->log; - - return ngx_rtmp_amf0_read(&act, elts, nelts); -} -