added more flags & extended types to AMF

This commit is contained in:
Roman Arutyunyan 2012-03-30 19:07:14 +04:00
parent a1ee083e25
commit 18d8526793
9 changed files with 170 additions and 90 deletions

1
config
View file

@ -26,3 +26,4 @@ NGX_ADDON_SRCS="$NGX_ADDON_SRCS \
$ngx_addon_dir/ngx_rtmp_netcall_module.c \
$ngx_addon_dir/ngx_rtmp_notify_module.c \
"
CFLAGS="$CFLAGS -I$ngx_addon_dir"

View file

@ -125,7 +125,7 @@ ngx_rtmp_amf_put(ngx_rtmp_amf_ctx_t *ctx, void *p, size_t n)
if (b == NULL || b->last == b->end) {
ln = ctx->alloc(ctx->cscf);
ln = ctx->alloc(ctx->arg);
if (ln == NULL) {
return NGX_ERROR;
}
@ -250,7 +250,8 @@ ngx_rtmp_amf_read(ngx_rtmp_amf_ctx_t *ctx, ngx_rtmp_amf_elt_t *elts,
size_t nelts)
{
void *data;
uint8_t type;
ngx_int_t type;
uint8_t type8;
size_t n;
uint16_t len;
ngx_int_t rc;
@ -258,19 +259,24 @@ ngx_rtmp_amf_read(ngx_rtmp_amf_ctx_t *ctx, ngx_rtmp_amf_elt_t *elts,
for(n = 0; n < nelts; ++n) {
switch (ngx_rtmp_amf_get(ctx, &type, sizeof(type))) {
case NGX_DONE:
if (elts->type & NGX_RTMP_AMF_OPTIONAL) {
return NGX_OK;
}
case NGX_ERROR:
return NGX_ERROR;
}
if (elts && elts->type & NGX_RTMP_AMF_TYPELESS) {
type = elts->type & ~NGX_RTMP_AMF_TYPELESS;
data = elts->data;
data = (n >= nelts || elts == NULL
|| (elts->type & ~NGX_RTMP_AMF_OPTIONAL) != type)
? NULL
: elts->data;
} else {
switch (ngx_rtmp_amf_get(ctx, &type8, 1)) {
case NGX_DONE:
if (elts->type & NGX_RTMP_AMF_OPTIONAL) {
return NGX_OK;
}
case NGX_ERROR:
return NGX_ERROR;
}
type = type8;
data = (elts && (elts->type & 0xff) == type)
? elts->data
: NULL;
}
switch (type) {
case NGX_RTMP_AMF_NUMBER:
@ -333,6 +339,26 @@ ngx_rtmp_amf_read(ngx_rtmp_amf_ctx_t *ctx, ngx_rtmp_amf_elt_t *elts,
return NGX_ERROR;
}
break;
case NGX_RTMP_AMF_INT8:
if (ngx_rtmp_amf_get(ctx, data, 1) != NGX_OK) {
return NGX_ERROR;
}
break;
case NGX_RTMP_AMF_INT16:
if (ngx_rtmp_amf_get(ctx, buf, 2) != NGX_OK) {
return NGX_ERROR;
}
ngx_rtmp_amf_reverse_copy(data, buf, 2);
break;
case NGX_RTMP_AMF_INT32:
if (ngx_rtmp_amf_get(ctx, buf, 4) != NGX_OK) {
return NGX_ERROR;
}
ngx_rtmp_amf_reverse_copy(data, buf, 4);
break;
case NGX_RTMP_AMF_END:
return NGX_OK;
@ -417,7 +443,8 @@ ngx_rtmp_amf_write(ngx_rtmp_amf_ctx_t *ctx,
ngx_rtmp_amf_elt_t *elts, size_t nelts)
{
size_t n;
uint8_t type;
ngx_int_t type;
uint8_t type8;
void *data;
uint16_t len;
u_char buf[8];
@ -428,8 +455,13 @@ ngx_rtmp_amf_write(ngx_rtmp_amf_ctx_t *ctx,
data = elts[n].data;
len = elts[n].len;
if (ngx_rtmp_amf_put(ctx, &type, sizeof(type)) != NGX_OK)
return NGX_ERROR;
if (type & NGX_RTMP_AMF_TYPELESS) {
type &= ~NGX_RTMP_AMF_TYPELESS;
} else {
type8 = (uint8_t)type;
if (ngx_rtmp_amf_put(ctx, &type8, 1) != NGX_OK)
return NGX_ERROR;
}
switch(type) {
case NGX_RTMP_AMF_NUMBER:
@ -468,11 +500,10 @@ ngx_rtmp_amf_write(ngx_rtmp_amf_ctx_t *ctx,
break;
case NGX_RTMP_AMF_OBJECT:
type = NGX_RTMP_AMF_END;
type8 = NGX_RTMP_AMF_END;
if (ngx_rtmp_amf_write_object(ctx, data,
elts[n].len / sizeof(ngx_rtmp_amf_elt_t)) != NGX_OK
|| ngx_rtmp_amf_put(ctx, &type,
sizeof(type)) != NGX_OK)
|| ngx_rtmp_amf_put(ctx, &type8, 1) != NGX_OK)
{
return NGX_ERROR;
}
@ -486,6 +517,30 @@ ngx_rtmp_amf_write(ngx_rtmp_amf_ctx_t *ctx,
}
break;
case NGX_RTMP_AMF_INT8:
if (ngx_rtmp_amf_put(ctx, data, 1) != NGX_OK) {
return NGX_ERROR;
}
break;
case NGX_RTMP_AMF_INT16:
if (ngx_rtmp_amf_put(ctx,
ngx_rtmp_amf_reverse_copy(buf,
data, 2), 2) != NGX_OK)
{
return NGX_ERROR;
}
break;
case NGX_RTMP_AMF_INT32:
if (ngx_rtmp_amf_put(ctx,
ngx_rtmp_amf_reverse_copy(buf,
data, 4), 4) != NGX_OK)
{
return NGX_ERROR;
}
break;
default:
return NGX_ERROR;
}

View file

@ -6,22 +6,30 @@
#ifndef _NGX_RTMP_AMF_H_INCLUDED_
#define _NGX_RTMP_AMF_H_INCLUDED_
#include <ngx_config.h>
#include <ngx_core.h>
/* basic types */
#define NGX_RTMP_AMF_NUMBER 0x00
#define NGX_RTMP_AMF_BOOLEAN 0x01
#define NGX_RTMP_AMF_STRING 0x02
#define NGX_RTMP_AMF_OBJECT 0x03
#define NGX_RTMP_AMF_NULL 0x05
#define NGX_RTMP_AMF_ARRAY_NULL 0x06
#define NGX_RTMP_AMF_MIXED_ARRAY 0x08
#define NGX_RTMP_AMF_END 0x09
#define NGX_RTMP_AMF_ARRAY 0x0a
#define NGX_RTMP_AMF_OPTIONAL 0x80
/* extended types */
#define NGX_RTMP_AMF_INT8 0x0100
#define NGX_RTMP_AMF_INT16 0x0101
#define NGX_RTMP_AMF_INT32 0x0102
#include <ngx_config.h>
#include <ngx_core.h>
/* r/w flags */
#define NGX_RTMP_AMF_OPTIONAL 0x1000
#define NGX_RTMP_AMF_TYPELESS 0x2000
typedef struct {
@ -32,65 +40,17 @@ typedef struct {
} ngx_rtmp_amf_elt_t;
struct ngx_rtmp_core_srv_conf_s;
typedef ngx_chain_t * (*ngx_rtmp_amf_alloc_pt)(void *arg);
typedef ngx_chain_t * (*ngx_rtmp_amf_alloc_pt)(struct ngx_rtmp_core_srv_conf_s
*cscf);
typedef struct {
ngx_chain_t *link, *first;
ngx_rtmp_amf_alloc_pt alloc;
struct ngx_rtmp_core_srv_conf_s *cscf;
void *arg;
ngx_log_t *log;
} ngx_rtmp_amf_ctx_t;
/*
*
* Examples:
struct {
char name[32];
double trans_id;
char app[32];
char flashver[32];
char v1[8];
int locked;
} vals;
ngx_rtmp_amf_elt_t props[] = {
{ NGX_RTMP_AMF_STRING, "app", vals.app, sizeof(vals.app) },
{ NGX_RTMP_AMF_STRING, "flashver", vals.flashver, sizeof(vals.flashver) }
};
ngx_rtmp_amf_elt_t list[] = {
{ NGX_RTMP_AMF_STRING, 0, vals.v1, sizeof(vals.v1) },
{ NGX_RTMP_AMF_BOOLEAN, 0, &vals.locked, sizeof(vals.locked) }
};
ngx_rtmp_amf_elt elts[] = {
{ NGX_RTMP_AMF_STRING, 0 vals.name, sizeof(vals.name) },
{ NGX_RTMP_AMF_NUMBER, 0 &vals.trans_id, sizeof(vals.trans_id) },
{ NGX_RTMP_AMF_OBJECT, 0, props, sizeof(props) },
{ NGX_RTMP_AMF_ARRAY, 0, list, sizeof(list) },
{ NGX_RTMP_AMF_NULL }
};
Reading:
-------
memset(&vals, 0, sizeof(vals));
ngx_rtmp_amf_read(l, elts, sizeof(elts));
Writing:
-------
ngx_rtmp_amf_write(l, free, elts, sizeof(elts));
*/
/* reading AMF */
ngx_int_t ngx_rtmp_amf_read(ngx_rtmp_amf_ctx_t *ctx,
ngx_rtmp_amf_elt_t *elts, size_t nelts);

View file

@ -40,8 +40,8 @@ typedef struct {
typedef struct {
u_char name[1024];
u_char type[1024];
u_char name[256];
u_char type[16];
} ngx_rtmp_publish_t;

View file

@ -19,6 +19,10 @@ static void ngx_rtmp_netcall_recv(ngx_event_t *rev);
static void ngx_rtmp_netcall_send(ngx_event_t *wev);
ngx_str_t ngx_rtmp_netcall_content_type_urlencoded =
ngx_string("application/x-www-form-urlencoded");
typedef struct {
ngx_msec_t timeout;
} ngx_rtmp_netcall_app_conf_t;
@ -322,6 +326,7 @@ ngx_rtmp_netcall_recv(ngx_event_t *rev)
return;
}
cs->inlast->next = NULL;
cs->inlast->buf = ngx_create_temp_buf(cc->pool, 1024);
if (cs->inlast->buf == NULL) {
ngx_rtmp_netcall_close(cc);
@ -414,7 +419,7 @@ ngx_rtmp_netcall_send(ngx_event_t *wev)
ngx_chain_t *
ngx_rtmp_netcall_http_format_header(ngx_url_t *url, ngx_pool_t *pool,
size_t content_length)
size_t content_length, ngx_str_t *content_type)
{
ngx_chain_t *cl;
ngx_buf_t *b;
@ -422,7 +427,7 @@ ngx_rtmp_netcall_http_format_header(ngx_url_t *url, ngx_pool_t *pool,
static char rq_tmpl[] =
"POST %V HTTP/1.0\r\n"
"Host: %V\r\n"
"Content-Type: application/x-www-form-urlencoded\r\n"
"Content-Type: %V\r\n"
"Connection: Close\r\n"
"Content-Length: %uz\r\n"
"\r\n"
@ -436,6 +441,7 @@ ngx_rtmp_netcall_http_format_header(ngx_url_t *url, ngx_pool_t *pool,
b = ngx_create_temp_buf(pool, sizeof(rq_tmpl)
+ url->uri.len
+ url->host.len
+ content_type->len
+ 5);
if (b == NULL) {
@ -445,7 +451,7 @@ ngx_rtmp_netcall_http_format_header(ngx_url_t *url, ngx_pool_t *pool,
cl->buf = b;
b->last = ngx_snprintf(b->last, b->end - b->last, rq_tmpl,
&url->uri, &url->host, content_length);
&url->uri, &url->host, content_type, content_length);
return cl;
}
@ -503,6 +509,55 @@ ngx_rtmp_netcall_http_format_session(ngx_rtmp_session_t *s, ngx_pool_t *pool)
}
ngx_chain_t *
ngx_rtmp_netcall_http_skip_header(ngx_chain_t *in)
{
ngx_buf_t *b;
/* find \n[\r]\n */
enum {
normal,
lf,
lfcr
} state = normal;
if (in == NULL) {
return NULL;
}
b = in->buf;
for ( ;; ) {
while (b->pos == b->last) {
in = in->next;
if (in == NULL) {
break;
}
b = in->buf;
}
switch (*b->pos++) {
case '\r':
state = (state == lf) ? lfcr : normal;
break;
case '\n':
if (state != normal) {
return in;
}
state = lf;
break;
default:
state = normal;
}
}
return NULL;
}
static ngx_int_t
ngx_rtmp_netcall_postconfiguration(ngx_conf_t *cf)
{

View file

@ -30,12 +30,14 @@ typedef struct {
ngx_int_t ngx_rtmp_netcall_create(ngx_rtmp_session_t *s,
ngx_rtmp_netcall_init_t *ci);
extern ngx_str_t ngx_rtmp_netcall_content_type_urlencoded;
/* HTTP formatting */
/* HTTP handling */
ngx_chain_t * ngx_rtmp_netcall_http_format_session(ngx_rtmp_session_t *s,
ngx_pool_t *pool);
ngx_chain_t * ngx_rtmp_netcall_http_format_header(ngx_url_t *url,
ngx_pool_t *pool, size_t content_length);
ngx_pool_t *pool, size_t content_length, ngx_str_t *content_type);
ngx_chain_t * ngx_rtmp_netcall_http_skip_header(ngx_chain_t *in);
#endif /* _NGX_RTMP_NETCALL_H_INCLUDED_ */

View file

@ -155,8 +155,8 @@ ngx_rtmp_notify_publish_create(ngx_rtmp_session_t *s, void *arg,
/* HTTP header */
hl = ngx_rtmp_netcall_http_format_header(nacf->publish_url, pool,
cl->buf->last - cl->buf->pos
+ (pl->buf->last - pl->buf->pos));
cl->buf->last - cl->buf->pos + (pl->buf->last - pl->buf->pos),
&ngx_rtmp_netcall_content_type_urlencoded);
if (hl == NULL) {
return NULL;
@ -221,8 +221,8 @@ ngx_rtmp_notify_play_create(ngx_rtmp_session_t *s, void *arg,
/* HTTP header */
hl = ngx_rtmp_netcall_http_format_header(nacf->play_url, pool,
cl->buf->last - cl->buf->pos
+ (pl->buf->last - pl->buf->pos));
cl->buf->last - cl->buf->pos + (pl->buf->last - pl->buf->pos),
&ngx_rtmp_netcall_content_type_urlencoded);
if (hl == NULL) {
return NULL;

View file

@ -278,8 +278,8 @@ ngx_rtmp_record_notify_create(ngx_rtmp_session_t *s, void *arg,
/* HTTP header */
hl = ngx_rtmp_netcall_http_format_header(racf->url, pool,
cl->buf->last - cl->buf->pos
+ (pl->buf->last - pl->buf->pos));
cl->buf->last - cl->buf->pos + (pl->buf->last - pl->buf->pos),
&ngx_rtmp_netcall_content_type_urlencoded);
if (hl == NULL) {
return NULL;

View file

@ -199,6 +199,13 @@ ngx_rtmp_send_user_unknown(ngx_rtmp_session_t *s, uint32_t timestamp)
}
static ngx_chain_t *
ngx_rtmp_alloc_amf_buf(void *arg)
{
return ngx_rtmp_alloc_shared_buf((ngx_rtmp_core_srv_conf_t *)arg);
}
/* AMF sender */
ngx_int_t
ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
@ -210,8 +217,8 @@ ngx_rtmp_send_amf(ngx_rtmp_session_t *s, ngx_rtmp_header_t *h,
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
memset(&act, 0, sizeof(act));
act.cscf = cscf;
act.alloc = ngx_rtmp_alloc_shared_buf;
act.arg = cscf;
act.alloc = ngx_rtmp_alloc_amf_buf;
act.log = s->connection->log;
if (ngx_rtmp_amf_write(&act, elts, nelts) != NGX_OK) {