improved amf0 writing & partially implemented connect()

This commit is contained in:
Roman Arutyunyan 2012-03-10 01:49:09 +04:00
parent 60f7962b5b
commit 69bbf67118
6 changed files with 650 additions and 384 deletions

37
TODO
View file

@ -1,7 +1,32 @@
+1) string allocations inside ngx_chain_t
+2) AMF0 creation
3) add AMF0 command parsing & replies to RTMP handler
4) improve handshake
5) add session_buckets option to core srv conf
- RTMP replies
Do we need loc confs (=fms apps) ?
- standard-compliance
- improve handshake
- names
- Input buffers:
allocate a continuous block &
allocate input buffers
in it. Upon chunk size change
re-split the same buffer for
new chunk size. Use temp
buffer as temporary
storage of overflow data.
Max chunk size if 65K which is
really a lot.
- Output buffers:
implement shared (per-loc/srv) buffers
for output of a) header(max 18b) b) chunk (fixed-sized)
We never change output chunk size.
Chunk buffers should be ref-counted.
- implement loc confs (=fms apps)
loc options:
- session buckets
- broadcast/file
- input buffer size per connection
- output chunk size
- output chunk buffer size
- output header buffer size
- HTTP callbacks on invoke calls

View file

@ -102,18 +102,18 @@ typedef struct {
typedef struct {
ngx_array_t servers; /* ngx_rtmp_core_srv_conf_t */
ngx_array_t listen; /* ngx_rtmp_listen_t */
ngx_array_t servers; /* ngx_rtmp_core_srv_conf_t */
ngx_array_t listen; /* ngx_rtmp_listen_t */
} ngx_rtmp_core_main_conf_t;
typedef struct {
uint8_t type;
uint8_t channel;
uint8_t hsize;
uint32_t size;
uint32_t timer;
uint32_t stream;
uint8_t fmt; /* header format */
uint32_t csid; /* chunk stream id */
uint32_t timestamp;
uint32_t mlen; /* message length */
uint8_t type; /* message type id */
uint32_t msid; /* message stream id */
} ngx_rtmp_packet_hdr_t;
@ -156,6 +156,7 @@ struct ngx_rtmp_session_s {
struct ngx_rtmp_session_s
*next;
ngx_uint_t flags;
uint32_t csid;
};
typedef struct ngx_rtmp_session_s ngx_rtmp_session_t;
@ -191,28 +192,40 @@ typedef struct {
} ngx_rtmp_module_t;
/* RTMP packet types*/
#define NGX_RTMP_PACKET_CHUNK_SIZE 0x01
#define NGX_RTMP_PACKET_BYTES_READ 0x03
#define NGX_RTMP_PACKET_PING 0x04
#define NGX_RTMP_PACKET_SERVER_BW 0x05
#define NGX_RTMP_PACKET_CLIENT_BW 0x06
#define NGX_RTMP_PACKET_AUDIO 0x08
#define NGX_RTMP_PACKET_VIDEO 0x09
#define NGX_RTMP_PACKET_FLEX 0x0f
#define NGX_RTMP_PACKET_FLEX_SO 0x10
#define NGX_RTMP_PACKET_FLEX_MSG 0x11
#define NGX_RTMP_PACKET_NOTIFY 0x12
#define NGX_RTMP_PACKET_SO 0x13
#define NGX_RTMP_PACKET_INVOKE 0x14
/* Chunk header:
* max 3 basic header
* + max 11 message header
* + max 4 extended header (timestamp) */
#define NGX_RTMP_MAX_CHUNK_HEADER 18
/* RMTP ping types */
#define NGX_RMTP_PING_CLEAR_STEAM 0
#define NGX_RMTP_PING_CLEAR_BUFFER 1
#define NGX_RMTP_PING_CLIENT_TIME 3
#define NGX_RMTP_PING_RESET_STREAM 4
#define NGX_RMTP_PING_PING 6
#define NGX_RMTP_PING_PONG 7
/* RTMP packet types*/
#define NGX_RTMP_PACKET_CHUNK_SIZE 1
#define NGX_RTMP_PACKET_ABORT 2
#define NGX_RTMP_PACKET_ACK 3
#define NGX_RTMP_PACKET_CTL 4
#define NGX_RTMP_PACKET_ACK_SIZE 5
#define NGX_RTMP_PACKET_BANDWIDTH 6
#define NGX_RTMP_PACKET_EDGE 7
#define NGX_RTMP_PACKET_AUDIO 8
#define NGX_RTMP_PACKET_VIDEO 9
#define NGX_RTMP_PACKET_AMF3_META 15
#define NGX_RTMP_PACKET_AMF3_SHARED 16
#define NGX_RTMP_PACKET_AMF3_CMD 17
#define NGX_RTMP_PACKET_AMF0_META 18
#define NGX_RTMP_PACKET_AMF0_SHARED 19
#define NGX_RTMP_PACKET_AMF0_CMD 20
#define NGX_RTMP_PACKET_AGGREGATE 22
/* RMTP control message types */
#define NGX_RTMP_CTL_STREAM_BEGIN 0
#define NGX_RTMP_CTL_STREAM_EOF 1
#define NGX_RTMP_CTL_STREAM_DRY 2
#define NGX_RTMP_CTL_SET_BUFLEN 3
#define NGX_RTMP_CTL_RECORDED 4
#define NGX_RTMP_CTL_PING_REQUEST 6
#define NGX_RTMP_CTL_PING_RESPONSE 7
#define NGX_RTMP_MODULE 0x504D5452 /* "RTMP" */
@ -264,22 +277,22 @@ void ngx_rtmp_send_packet(ngx_rtmp_session_t *s,
/* NetConnection methods */
ngx_int_t ngx_rtmp_connect(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_call(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_close(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_createstream(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_connect(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_call(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_close(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_createstream(ngx_rtmp_session_t *s, ngx_chain_t *l);
/* NetStream methods */
ngx_int_t ngx_rtmp_play(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_play2(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_deletestream(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_closestream(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_receivevideo(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_publish(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_seek(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_pause(ngx_rtmp_session_t *s, ngx_chain_t **l);
ngx_int_t ngx_rtmp_play(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_play2(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_deletestream(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_closestream(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_receivevideo(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_publish(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_seek(ngx_rtmp_session_t *s, ngx_chain_t *l);
ngx_int_t ngx_rtmp_pause(ngx_rtmp_session_t *s, ngx_chain_t *l);
extern ngx_uint_t ngx_rtmp_max_module;

View file

@ -5,12 +5,58 @@
#include "ngx_rtmp_amf0.h"
#include "ngx_rtmp.h"
#include <string.h>
#define NGX_RTMP_AMF0_SV(x, y) \
/*
#define NGX_RTMP_AMF0_SWAP_VALUES(x, y) \
(x) ^= (y); (y) ^= (x); (x) ^= (y)
#define NGX_RTMP_AMF0_SB(x) \
NGX_RTMP_AMF0_SV(*(uint8_t*)(&x), *((uint8_t*)(&x) + 1))
#define NGX_RTMP_AMF0_REVERSE2(x) \
NGX_RTMP_AMF0_SV(*(uint8_t*)(&x) , *((uint8_t*)(&x) + 1))
*/
static inline void*
ngx_rtmp_amf0_reverse_copy(void *dst, void* src, size_t len)
{
size_t k;
if (dst == NULL || src == NULL) {
return NULL;
}
for(k = 0; k < len; ++k) {
((u_char*)dst)[k] = ((u_char*)src)[len - 1 - k];
}
return dst;
}
#define NGX_RTMP_AMF0_DEBUG_SIZE 16
#ifdef NGX_DEBUG
static void
ngx_rtmp_amf0_debug(const char* op, ngx_log_t *log, u_char *p, size_t n)
{
u_char hstr[3 * NGX_RTMP_AMF0_DEBUG_SIZE + 1];
u_char str[NGX_RTMP_AMF0_DEBUG_SIZE + 1];
u_char *hp, *sp;
static u_char hex[] = "0123456789ABCDEF";
size_t i;
hp = hstr;
sp = str;
for(i = 0; i < n && i < NGX_RTMP_AMF0_DEBUG_SIZE; ++i) {
*hp++ = ' ';
*hp++ = hex[(*p & 0xf0) >> 4];
*hp++ = hex[*p & 0x0f];
*sp++ = (*p >= 0x20 && *p <= 0x7e) ?
*p : (u_char)'?';
++p;
}
*hp = *sp = '\0';
ngx_log_debug4(NGX_LOG_DEBUG_RTMP, log, 0,
"AMF0 %s (%d)%s '%s'", op, n, hstr, str);
}
#endif
static ngx_int_t
ngx_rtmp_amf0_get(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n)
@ -34,35 +80,11 @@ ngx_rtmp_amf0_get(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n)
p = ngx_cpymem(p, b->pos, n);
}
b->pos += n;
#define NGX_RTMP_AMF0_DEBUG_SIZE 16
#ifdef NGX_DEBUG
{
u_char hstr[3 * NGX_RTMP_AMF0_DEBUG_SIZE + 1];
u_char str[NGX_RTMP_AMF0_DEBUG_SIZE + 1];
u_char *hp, *pp, *sp;
static u_char hex[] = "0123456789ABCDEF";
hp = hstr;
sp = str;
pp = op;
while (pp < (u_char*)p
&& pp - (u_char*)op < NGX_RTMP_AMF0_DEBUG_SIZE)
{
*hp++ = ' ';
*hp++ = hex[(*pp & 0xf0) >> 4];
*hp++ = hex[*pp & 0x0f];
*sp++ = (*pp >= 0x20 && *pp <= 0x7e) ?
*pp : (u_char)'?';
++pp;
}
*hp = *sp = '\0';
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, ctx->log, 0,
"AMF0 read (%d)%s '%s'", n, hstr, str);
}
ngx_rtmp_amf0_debug("read", ctx->log, (u_char*)op, n);
#endif
return NGX_OK;
}
@ -88,6 +110,10 @@ ngx_rtmp_amf0_put(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n)
size_t size;
ngx_chain_t **l, **free;
#ifdef NGX_DEBUG
ngx_rtmp_amf0_debug("write", ctx->log, (u_char*)p, n);
#endif
l = ctx->link;
free = ctx->free;
@ -113,7 +139,7 @@ ngx_rtmp_amf0_put(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n)
size = b->end - b->last;
if (size <= n) {
if (size >= n) {
b->last = ngx_cpymem(b->last, p, n);
return NGX_OK;
}
@ -197,6 +223,7 @@ ngx_rtmp_amf0_read(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t n
uint16_t len;
ngx_int_t rc;
int till_end;
u_char buf[8];
if (nelts & NGX_RTMP_AMF0_TILL_END_FLAG) {
till_end = 1;
@ -216,9 +243,10 @@ ngx_rtmp_amf0_read(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t n
switch(type) {
case NGX_RTMP_AMF0_NUMBER:
if (ngx_rtmp_amf0_get(ctx, data, 8) != NGX_OK) {
if (ngx_rtmp_amf0_get(ctx, buf, 8) != NGX_OK) {
return NGX_ERROR;
}
ngx_rtmp_amf0_reverse_copy(data, buf, 0);
break;
case NGX_RTMP_AMF0_BOOLEAN:
@ -228,11 +256,10 @@ ngx_rtmp_amf0_read(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t n
break;
case NGX_RTMP_AMF0_STRING:
if (ngx_rtmp_amf0_get(ctx, &len, sizeof(len)) != NGX_OK) {
if (ngx_rtmp_amf0_get(ctx, buf, 2) != NGX_OK) {
return NGX_ERROR;
}
NGX_RTMP_AMF0_SB(len);
ngx_rtmp_amf0_reverse_copy(&len, buf, 2);
if (data == NULL) {
rc = ngx_rtmp_amf0_get(ctx, data, len);
@ -297,16 +324,24 @@ static ngx_int_t
ngx_rtmp_amf0_write_object(ngx_rtmp_amf0_ctx_t *ctx,
ngx_rtmp_amf0_elt_t *elts, size_t nelts)
{
uint16_t len;
uint16_t len, len_sb;
size_t n;
char *name;
u_char buf[2];
for(n = 0; n < nelts; ++n) {
name = elts[n].name;
len = strlen(name);
len_sb = len = strlen(name);
if (ngx_rtmp_amf0_put(ctx, &name, len) != NGX_OK) {
if (ngx_rtmp_amf0_put(ctx,
ngx_rtmp_amf0_reverse_copy(buf,
&len, 2), 2) != NGX_OK)
{
return NGX_ERROR;
}
if (ngx_rtmp_amf0_put(ctx, name, len) != NGX_OK) {
return NGX_ERROR;
}
@ -317,7 +352,7 @@ ngx_rtmp_amf0_write_object(ngx_rtmp_amf0_ctx_t *ctx,
len = 0;
if (ngx_rtmp_amf0_put(ctx, &name, len) != NGX_OK) {
if (ngx_rtmp_amf0_put(ctx, "\00\00", 2) != NGX_OK) {
return NGX_ERROR;
}
@ -333,6 +368,7 @@ ngx_rtmp_amf0_write(ngx_rtmp_amf0_ctx_t *ctx,
uint8_t type;
void *data;
uint16_t len;
u_char buf[8];
for(n = 0; n < nelts; ++n) {
@ -345,7 +381,10 @@ ngx_rtmp_amf0_write(ngx_rtmp_amf0_ctx_t *ctx,
switch(type) {
case NGX_RTMP_AMF0_NUMBER:
if (ngx_rtmp_amf0_put(ctx, data, 8) != NGX_OK) {
if (ngx_rtmp_amf0_put(ctx,
ngx_rtmp_amf0_reverse_copy(buf,
data, 8), 8) != NGX_OK)
{
return NGX_ERROR;
}
break;
@ -357,12 +396,13 @@ ngx_rtmp_amf0_write(ngx_rtmp_amf0_ctx_t *ctx,
break;
case NGX_RTMP_AMF0_STRING:
if (ngx_rtmp_amf0_put(ctx, &len, sizeof(len)) != NGX_OK) {
if (ngx_rtmp_amf0_put(ctx,
ngx_rtmp_amf0_reverse_copy(buf,
&len, 2), 2) != NGX_OK)
{
return NGX_ERROR;
}
NGX_RTMP_AMF0_SB(len);
if (ngx_rtmp_amf0_put(ctx, data, len) != NGX_OK) {
return NGX_ERROR;
}

View file

@ -23,20 +23,18 @@ static void ngx_rtmp_send(ngx_event_t *rev);
static void ngx_rtmp_close_connection(ngx_connection_t *c);
static size_t hdrsizes[] = { 12, 8, 4, 1 };
#ifdef NGX_DEBUG
static char*
ngx_rtmp_packet_type(uint8_t type) {
static char* types[] = {
"?",
"chunk_size",
"?",
"bytes_read",
"ping",
"server_bw",
"client_bw",
"?",
"abort",
"ack",
"ctl",
"ack_size",
"bandwidth",
"edge",
"audio",
"video",
"?",
@ -44,12 +42,14 @@ ngx_rtmp_packet_type(uint8_t type) {
"?",
"?",
"?",
"flex",
"flex_so",
"flex_msg",
"notify",
"so",
"invoke"
"amf3_meta",
"amf3_shared",
"amd3_cmd",
"amf0_meta",
"amf0_shared",
"amf0_cmd",
"?",
"aggregate"
};
return type < sizeof(types) / sizeof(types[0])
@ -212,8 +212,7 @@ ngx_rtmp_init_session(ngx_connection_t *c)
s->chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE;
/* TODO: we should move preallcation to a func to call on user request */
bufs.size = s->chunk_size + 14 /* + max header size */;
bufs.size = s->chunk_size + NGX_RTMP_MAX_CHUNK_HEADER;
bufs.num = cscf->buffers;
s->free = ngx_create_chain_of_bufs(c->pool, &bufs);
@ -363,13 +362,15 @@ restart:
void
ngx_rtmp_recv(ngx_event_t *rev)
{
ngx_int_t n;
ngx_connection_t *c;
ngx_rtmp_session_t *s;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_buf_t *b, *bb;
u_char h, *p, *pp;
ngx_chain_t *lin;
ngx_int_t n;
ngx_connection_t *c;
ngx_rtmp_session_t *s;
ngx_rtmp_core_srv_conf_t *cscf;
ngx_buf_t *b, *bb;
u_char *p, *pp;
ngx_chain_t *lin;
ngx_rtmp_packet_hdr_t *h;
uint32_t timestamp;
c = rev->data;
s = c->data;
@ -413,86 +414,106 @@ ngx_rtmp_recv(ngx_event_t *rev)
return;
}
/* first byte of a packet? */
if (b->last == b->start) {
h = *b->last;
s->in_hdr.hsize = hdrsizes[(h >> 6) & 0x03];
s->in_hdr.channel = h & 0x3f;
b->last += n;
h = &s->in_hdr;
ngx_log_debug3(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP start %d hd=%d ch=%d",
(int)h,
(int)s->in_hdr.hsize,
(int)s->in_hdr.channel);
/* parse headers */
if (b->pos == b->start) {
p = b->pos;
timestamp = h->timestamp;
/* chunk basic header */
h->fmt = (*p >> 6) & 0x03;
h->csid = *p++ & 0x3f;
if (h->csid == 0) {
if (b->last - p < 1)
continue;
h->csid = 64;
h->csid += *(uint8_t*)p++;
} else if (h->csid == 1) {
if (b->last - p < 2)
continue;
h->csid = 64;
h->csid += *(uint8_t*)p++;
h->csid += (uint32_t)256 * (*(uint8_t*)p++);
}
ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP bheader fmt=%d csid=%D",
(int)h->fmt, h->csid);
if (h->fmt <= 2 ) {
if (b->last - p < 3)
continue;
/* timestamp:
* big-endian 3b -> little-endian 4b */
pp = (u_char*)&timestamp;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
pp[3] = 0;
if (h->fmt <= 1) {
if (b->last - p < 4)
continue;
/* size:
* big-endian 3b -> little-endian 4b
* type:
* 1b -> 1b*/
pp = (u_char*)&h->mlen;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
pp[3] = 0;
h->type = *(uint8_t*)p++;
if (h->fmt == 0) {
if (b->last - p < 4)
continue;
/* stream:
* little-endian 4b -> little-endian 4b */
pp = (u_char*)&h->msid;
pp[0] = *p++;
pp[1] = *p++;
pp[2] = *p++;
pp[3] = *p++;
}
}
/* extended header */
if (timestamp == 0x00ffffff) {
if (b->last - p < 4)
continue;
pp = (u_char*)&h->timestamp;
pp[3] = *p++;
pp[2] = *p++;
pp[1] = *p++;
pp[0] = *p++;
} else if (h->fmt) {
h->timestamp += timestamp;
} else {
h->timestamp = timestamp;
}
}
ngx_log_debug5(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP mheader %s (%d) "
"timestamp=%D mlen=%D msid=%D",
ngx_rtmp_packet_type(h->type), (int)h->type,
h->timestamp, h->mlen, h->msid);
/* header done */
b->pos = p;
}
b->last += n;
if (b->last - b->pos < s->in_hdr.hsize)
/* parse payload */
if (b->last - b->pos < (ngx_int_t)ngx_min(h->mlen, s->chunk_size))
continue;
p = b->start + 1;
/* basic header */
do {
if (s->in_hdr.hsize < 4)
break;
/* FIXME: is this fix really needed?
if (s->in_hdr.channel == 1) {
p += 2;
}*/
/* timer:
* big-endian 3b -> little-endian 4b */
pp = (u_char*)&s->in_hdr.timer;
pp[0] = p[2];
pp[1] = p[1];
pp[2] = p[0];
pp[3] = 0;
if (s->in_hdr.hsize < 8)
break;
/* size:
* big-endian 3b -> little-endian 4b
* type:
* 1b -> 1b*/
p += 3;
pp = (u_char*)&s->in_hdr.size;
pp[0] = p[2];
pp[1] = p[1];
pp[2] = p[0];
pp[3] = 0;
p += 3;
pp = &s->in_hdr.type;
*pp = *p;
if (s->in_hdr.hsize < 12)
break;
/* stream:
* little-endian 4b -> little-endian 4b */
++p;
ngx_memcpy(&s->in_hdr.stream, p, 4);
p += 4;
} while(0);
ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP header %s (%d) ch=%d hd=%d "
"sz=%D tm=%D st=%D",
ngx_rtmp_packet_type(s->in_hdr.type),
(int)s->in_hdr.type,
(int)s->in_hdr.channel,
(int)s->in_hdr.hsize,
s->in_hdr.size,
s->in_hdr.timer,
s->in_hdr.stream);
if (b->last < p + ngx_min(s->in_hdr.size, s->chunk_size))
continue;
b->pos = p;
/* if fragmented then wait for more fragments */
if (s->in_hdr.size > s->chunk_size) {
if (h->mlen > s->chunk_size) {
if (s->free == NULL) {
ngx_log_error(NGX_LOG_INFO, c->log,
NGX_ERROR, "no free buffers");
@ -503,14 +524,14 @@ ngx_rtmp_recv(ngx_event_t *rev)
s->free = s->free->next;
lin = lin->next;
lin->next = NULL;
s->in_hdr.size -= s->chunk_size;
h->mlen -= s->chunk_size;
bb = lin->buf;
bb->pos = bb->last = bb->start;
continue;
}
/* handle packet! */
if (ngx_rtmp_receive_packet(s, &s->in_hdr, s->in) != NGX_OK) {
if (ngx_rtmp_receive_packet(s, h, s->in) != NGX_OK) {
ngx_rtmp_close_session(s);
return;
}
@ -518,10 +539,10 @@ ngx_rtmp_recv(ngx_event_t *rev)
bb->pos = bb->last = bb->start;
/* copy remained data to first buffer */
if (s->in_hdr.size < b->last - b->pos) {
if (h->mlen < b->last - b->pos) {
bb->last = ngx_movemem(bb->start,
b->pos + s->in_hdr.size,
b->last - b->pos - s->in_hdr.size);
b->pos + h->mlen,
b->last - b->pos - h->mlen);
}
/* free all but one input buffer */
@ -537,7 +558,6 @@ ngx_rtmp_recv(ngx_event_t *rev)
void
ngx_rtmp_send(ngx_event_t *wev)
{
ngx_int_t n;
ngx_connection_t *c;
ngx_rtmp_session_t *s;
ngx_rtmp_core_srv_conf_t *cscf;
@ -548,7 +568,8 @@ ngx_rtmp_send(ngx_event_t *wev)
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
if (wev->timedout) {
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out");
ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT,
"client timed out");
c->timedout = 1;
ngx_rtmp_close_session(s);
return;
@ -559,7 +580,6 @@ ngx_rtmp_send(ngx_event_t *wev)
}
while(s->out) {
l = c->send_chain(c, s->out, 0);
if (l == NGX_CHAIN_ERROR) {
@ -567,15 +587,7 @@ ngx_rtmp_send(ngx_event_t *wev)
return;
}
n = 0;
if (l != s->out) {
for(ll = s->out; ll->next && ll->next != l; ll = ll->next);
ll->next = s->free;
s->out = l;
}
if (l != NULL) {
if (l == NULL) {
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
ngx_add_timer(c->write, cscf->timeout);
if (ngx_handle_write_event(c->write, 0) != NGX_OK) {
@ -583,6 +595,14 @@ ngx_rtmp_send(ngx_event_t *wev)
}
return;
}
if (l != s->out) {
for(ll = s->out;
ll->next && ll->next != l;
ll = ll->next);
ll->next = s->free;
s->out = l;
}
}
ngx_del_event(wev, NGX_WRITE_EVENT, 0);
@ -658,100 +678,172 @@ ngx_rtmp_leave(ngx_rtmp_session_t *s)
void
ngx_rtmp_send_packet(ngx_rtmp_session_t *s, ngx_rtmp_packet_hdr_t *h,
ngx_chain_t *l)
ngx_chain_t *ll)
{
ngx_rtmp_packet_hdr_t *lh;
size_t hsel, hsize, size;
ngx_chain_t *ll, **pl;
ngx_int_t hsize, size, nbufs;
ngx_chain_t *l, **pl;
ngx_buf_t *b, *bb;
u_char *p, *pp;
uint8_t fmt;
uint32_t timestamp, ext_timestamp, mlen;
ngx_connection_t *c;
if (l == NULL)
if (ll == NULL) {
return;
}
b = l->buf;
p = b->pos;
/* detect packet size */
mlen = 0;
l = ll;
nbufs = 0;
while(l) {
mlen += (l->buf->last - l->buf->pos);
++nbufs;
l = l->next;
}
c = s->connection;
bb = ll->buf;
pp = bb->pos;
lh = &s->out_hdr;
ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP send %s (%d) csid=%D timestamp=%D "
"mlen=%D msid=%D nbufs=%d",
ngx_rtmp_packet_type(h->type), (int)h->type,
h->csid, h->timestamp, mlen, h->msid, nbufs);
while(ll) {
if (s->free == NULL) {
/* TODO: implement proper packet dropper */
/* FIXME: implement proper packet dropper */
return;
}
/* add new output chunk */
ll = s->free;
/* append new output buffer */
l = s->free;
s->free = s->free->next;
ll->next = NULL;
l->next = NULL;
for(pl = &s->out; *pl; pl = &(*pl)->next);
*pl = ll;
*pl = l;
b = l->buf;
b->pos = b->last = b->start + NGX_RTMP_MAX_CHUNK_HEADER;
/* put payload at the end; leave space for header */
bb = ll->buf;
bb->pos = bb->last = bb->end - s->chunk_size;
/* copy payload to new buffer leaving space for header */
while (b->last < b->end) {
size = b->end - b->last;
if (size < bb->last - pp) {
b->last = ngx_cpymem(b->last, pp, size);
pp += size;
break;
}
b->last = ngx_cpymem(b->last, pp, bb->last - pp);
/* fill new chunk payload */
while(l && bb->pos != bb->last) {
size = ngx_min(bb->last - bb->pos, b->last - p);
bb->last = ngx_cpymem(bb->last, p, size);
p += size;
if (p != b->last)
continue;
l = l->next;
if (l) {
b = l->buf;
p = b->pos;
ll = ll->next;
if (ll == NULL) {
break;
}
bb = ll->buf;
pp = bb->pos;
}
/* FIXME: there can be some occasional
* matches (h->msid == 0) on first out
* packet when we compare it
* against initially zeroed header;
* Though it maybe OK */
/* fill header
* we have
* h - new header
* lh - old header for diffs */
fmt = 0;
hsize = 12;
if (h->msid && lh->msid == h->msid) {
++fmt;
hsize -= 4;
if (lh->type == h->type && lh->mlen == mlen) {
++fmt;
hsize -= 4;
if (lh->timestamp == h->timestamp) {
++fmt;
hsize -= 3;
}
}
}
lh = &s->out_hdr;
/* message header */
timestamp = (fmt ? h->timestamp
: h->timestamp - lh->timestamp);
ext_timestamp = 0;
hsel = 0;
/* choose header size */
if (lh->hsize) {
if (lh->stream == h->stream)
++hsel;
if (lh->type == h->type && lh->size == h->size)
++hsel;
if (lh->timer == h->timer)
++hsel;
if (timestamp >= 0x00ffffff) {
ext_timestamp = timestamp;
timestamp = 0x00ffffff;
hsize += 4;
}
hsize = hdrsizes[hsel];
if (h->csid >= 64) {
++hsize;
if (h->csid >= 320) {
++hsize;
}
}
bb->pos -= hsize;
/* now we know header size */
b->pos -= hsize;
p = b->pos;
/* basic header */
*p = (fmt << 6);
if (h->csid >= 2 && h->csid <= 63) {
*p++ |= (((uint8_t)h->csid) & 0x3f);
} else if (h->csid >= 64 && h->csid < 320) {
++p;
*p++ = (uint8_t)(h->csid - 64);
} else {
*p++ |= 1;
*p++ = (uint8_t)(h->csid - 64);
*p++ = (uint8_t)((h->csid - 64) >> 8);
}
/* message header */
if (fmt <= 2) {
pp = (u_char*)&timestamp;
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
if (fmt <= 1) {
pp = (u_char*)&mlen;
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
*p++ = h->type;
if (fmt == 0) {
pp = (u_char*)&h->msid;
*p++ = pp[0];
*p++ = pp[1];
*p++ = pp[2];
*p++ = pp[3];
}
}
}
/* extended header */
if (ext_timestamp) {
pp = (u_char*)&ext_timestamp;
*p++ = pp[3];
*p++ = pp[2];
*p++ = pp[1];
*p++ = pp[0];
}
*lh = *h;
/* fill header: bb->pos..bb->pos+hsize */
pp = bb->pos;
*pp++ = (((uint8_t)hsel & 0x03) << 6) | (h->channel & 0x3f);
if (hsize == 1)
continue;
/* TODO: watch endians */
pp = ngx_cpymem(pp, &h->timer, 3);
if (hsize == 4)
continue;
pp = ngx_cpymem(pp, &h->size, 3);
*pp++ = h->type;
if (hsize == 8)
continue;
ngx_memcpy(pp, &h->stream, 4);
}
ngx_rtmp_send(c->write);
}
@ -776,51 +868,49 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s,
};
ngx_rtmp_amf0_ctx_t amf_ctx;
c = s->connection;
if (l == NULL) {
return NGX_ERROR;
}
c = s->connection;
b = l->buf;
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
/* a session handles only one chunk stream
* but #2 is a special one for protocol messages */
if (h->csid != 2) {
s->csid = h->csid;
}
#ifdef NGX_DEBUG
{
int nch;
int nbufs;
ngx_chain_t *ch;
for(nch = 1, ch = l; ch->next; ch = ch->next, ++nch);
for(nbufs = 1, ch = l;
ch->next;
ch = ch->next, ++nbufs);
ngx_log_debug8(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP packet %s (%d) ch=%d hd=%d "
"sz=%d tm=%D st=%D nbfs=%d",
ngx_rtmp_packet_type(h->type),
(int)h->type,
(int)h->channel,
(int)h->hsize,
(int)h->size,
h->timer,
h->stream,
nch);
ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP recv %s (%d) csid=%D timestamp=%D "
"mlen=%D msid=%D nbufs=%d",
ngx_rtmp_packet_type(h->type), (int)h->type,
h->csid, h->timestamp, h->mlen, h->msid, nbufs);
}
#endif
cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module);
b = l->buf;
switch(h->type) {
case NGX_RTMP_PACKET_CHUNK_SIZE:
if (b->last - b->pos < 4)
return NGX_ERROR;
/*ngx_rtmp_set_chunk_size(s, *(uint32_t*)(b->pos));*/
break;
case NGX_RTMP_PACKET_BYTES_READ:
if (b->last - b->pos < 4)
return NGX_ERROR;
/*ngx_rtmp_set_bytes_read(s, *(uint32_t*)(b->pos));*/
case NGX_RTMP_PACKET_ABORT:
break;
case NGX_RTMP_PACKET_PING:
case NGX_RTMP_PACKET_ACK:
break;
case NGX_RTMP_PACKET_CTL:
if (b->last - b->pos < 6)
return NGX_ERROR;
@ -829,53 +919,39 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s,
ping.v3 = (uint16_t*)(b->pos + 4);
switch(*ping.v1) {
case NGX_RMTP_PING_CLEAR_STEAM:
case NGX_RTMP_CTL_STREAM_BEGIN:
break;
case NGX_RMTP_PING_CLEAR_BUFFER:
/*ngx_rtmp_clear_buffer(s);*/
case NGX_RTMP_CTL_STREAM_EOF:
break;
case NGX_RMTP_PING_CLIENT_TIME:
/*ngx_rtmp_set_client_buffer_time(s, *ping.v3);*/
case NGX_RTMP_CTL_STREAM_DRY:
break;
case NGX_RMTP_PING_RESET_STREAM:
case NGX_RTMP_CTL_SET_BUFLEN:
break;
case NGX_RMTP_PING_PING:
case NGX_RTMP_CTL_RECORDED:
break;
case NGX_RTMP_CTL_PING_REQUEST:
/* ping client from server */
*ping.v1 = NGX_RMTP_PING_PONG;
ngx_rtmp_send_packet(s, h, l);
/**ping.v1 = NGX_RTMP_PING_PONG;
ngx_rtmp_send_packet(s, h, l);*/
break;
case NGX_RMTP_PING_PONG:
/* TODO: wtf the arg? */
/*ngx_rtmp_set_ping_time(s, *ping.v2);*/
case NGX_RTMP_CTL_PING_RESPONSE:
break;
}
break;
case NGX_RTMP_PACKET_SERVER_BW:
if (b->last - b->pos < 4)
return NGX_ERROR;
/*ngx_rtmp_set_server_bw(s, *(uint32_t*)b->pos,
b->last - b->pos >= 5
? *(uint8_t*)(b->pos + 4)
: 0);*/
case NGX_RTMP_PACKET_ACK_SIZE:
break;
case NGX_RTMP_PACKET_CLIENT_BW:
if (b->last - b->pos < 4)
return NGX_ERROR;
case NGX_RTMP_PACKET_BANDWIDTH:
break;
/*ngx_rtmp_set_client_bw(s, *(uint32_t*)b->pos,
b->last - b->pos >= 5
? *(uint8_t*)(b->pos + 4)
: 0);*/
case NGX_RTMP_PACKET_EDGE:
break;
case NGX_RTMP_PACKET_AUDIO:
@ -899,22 +975,21 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s,
}
}
break;
case NGX_RTMP_PACKET_SO:
/* TODO: implement
* plain: name, version, persistent; + lots of amf key-values
* ignore so far */
case NGX_RTMP_PACKET_AMF3_META:
case NGX_RTMP_PACKET_AMF3_SHARED:
case NGX_RTMP_PACKET_AMF3_CMD:
/* FIXME: AMF3 it not yet supported */
break;
case NGX_RTMP_PACKET_NOTIFY:
/* TODO: Implement HTTP callbacks on such packets
* with AMF fields converted to HTTP
* GET vars*/
case NGX_RTMP_PACKET_AMF0_META:
break;
case NGX_RTMP_PACKET_INVOKE:
case NGX_RTMP_PACKET_AMF0_SHARED:
break;
case NGX_RTMP_PACKET_AMF0_CMD:
amf_ctx.link = &l;
amf_ctx.free = &s->free;
amf_ctx.log = c->log;
@ -922,51 +997,43 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s,
memset(invoke_name, 0, sizeof(invoke_name));
if (ngx_rtmp_amf0_read(&amf_ctx, &invoke_name_elt, 1) != NGX_OK) {
ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP invoke failed");
"AMF0 cmd failed");
return NGX_ERROR;
}
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"RTMP invoke '%s'",
"AMF0 cmd '%s'",
invoke_name);
#define INVOKE_CALL(name) \
#define _CMD_CALL(name) \
if (!strcasecmp(invoke_name, #name)) { \
return ngx_rtmp_##name(s, &l); \
return ngx_rtmp_##name(s, l); \
}
/* NetConnection calls */
INVOKE_CALL(connect);
INVOKE_CALL(call);
INVOKE_CALL(close);
INVOKE_CALL(createstream);
_CMD_CALL(connect);
_CMD_CALL(call);
_CMD_CALL(close);
_CMD_CALL(createstream);
/* NetStream calls */
INVOKE_CALL(play);
INVOKE_CALL(play2);
INVOKE_CALL(deletestream);
INVOKE_CALL(closestream);
INVOKE_CALL(receiveaudio);
INVOKE_CALL(receivevideo);
INVOKE_CALL(publish);
INVOKE_CALL(seek);
INVOKE_CALL(pause);
_CMD_CALL(play);
_CMD_CALL(play2);
_CMD_CALL(deletestream);
_CMD_CALL(closestream);
_CMD_CALL(receiveaudio);
_CMD_CALL(receivevideo);
_CMD_CALL(publish);
_CMD_CALL(seek);
_CMD_CALL(pause);
#undef INVOKE_CALL
#undef _CMD_CALL
break;
case NGX_RTMP_PACKET_FLEX:
case NGX_RTMP_PACKET_FLEX_SO:
case NGX_RTMP_PACKET_FLEX_MSG:
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"flex packets are not supported %d",
(int)h->type);
break;
default:
ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0,
"unsupported packet type %d",
"unexpected packet type %d",
(int)h->type);
}

View file

@ -3,27 +3,148 @@
*/
#include "ngx_rtmp.h"
#include "ngx_rtmp_amf0.h"
ngx_int_t
ngx_rtmp_connect(ngx_rtmp_session_t *s, ngx_chain_t **l)
ngx_rtmp_connect(ngx_rtmp_session_t *s, ngx_chain_t *li)
{
ngx_rtmp_packet_hdr_t h;
ngx_chain_t lo, *lo_amf;
ngx_buf_t bo;
u_char buf[6], *p;
uint16_t ctl_evt;
uint32_t msid;
uint32_t ack_size;
uint8_t limit_type;
ngx_rtmp_amf0_ctx_t amf_ctx;
static double trans;
static ngx_rtmp_amf0_elt_t inf[] = {
{ NGX_RTMP_AMF0_STRING, "code", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "level", NULL, 0 },
{ NGX_RTMP_AMF0_STRING, "description", NULL, 0 },
};
static ngx_rtmp_amf0_elt_t elts[] = {
{ NGX_RTMP_AMF0_STRING, 0, "_result", sizeof("_result") - 1 },
{ NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) },
{ NGX_RTMP_AMF0_NULL , 0, NULL, 0 },
{ NGX_RTMP_AMF0_OBJECT, 0, inf, sizeof(inf) },
};
/* 1) send 'Window Acknowledgement Size'
*
* 2) send 'Set Peer Bandwidth'
*
* 3*) receive 'Window Acknowledgement'
*
* 4) send 'User Control Message(StreamBegin)'
* '
* 5) AMF0 reply:
*
* "_ result"
* 1
* NULL
* { code : "NetConnection.Connect.Success",
* level : "status",
* description : "Connection succeeded." }
*/
memset(&h, 0, sizeof(h));
h.timestamp = 0;
h.csid = 2; /* standard */
h.msid = 0;
lo.buf = &bo;
lo.next = NULL;
/* send Window Acknowledgement Size*/
h.type = NGX_RTMP_PACKET_ACK_SIZE;
ack_size = 65536;
p = (u_char*)&ack_size;
buf[0] = p[3];
buf[1] = p[2];
buf[2] = p[1];
buf[3] = p[0];
bo.start = bo.pos = buf;
bo.end = bo.last = bo.start + 4;
ngx_rtmp_send_packet(s, &h, &lo);
/* send Set Peer Bandwidth */
h.type = NGX_RTMP_PACKET_BANDWIDTH;
ack_size = 65536;
limit_type = 1;
p = (u_char*)&ack_size;
buf[0] = p[3];
buf[1] = p[2];
buf[2] = p[1];
buf[3] = p[0];
buf[4] = limit_type;
bo.start = bo.pos = buf;
bo.end = bo.last = bo.start + 5;
ngx_rtmp_send_packet(s, &h, &lo);
/* send STREAM_BEGIN */
h.type = NGX_RTMP_PACKET_CTL;
msid = 1;
ctl_evt = NGX_RTMP_CTL_STREAM_BEGIN;
p = (u_char*)&ctl_evt;
buf[0] = p[1];
buf[1] = p[0];
p = (u_char*)&msid;
buf[2] = p[3];
buf[3] = p[2];
buf[4] = p[1];
buf[5] = p[0];
bo.start = bo.pos = buf;
bo.end = bo.last = bo.start + sizeof(buf);
ngx_rtmp_send_packet(s, &h, &lo);
/* send 'connect' reply */
h.type = NGX_RTMP_PACKET_AMF0_CMD;
h.csid = s->csid;
inf[0].data = "NetConnection.Connect.Success"; /* code */
inf[0].len = strlen(inf[0].data);
inf[1].data = "status"; /* level */
inf[1].len = strlen(inf[1].data);
inf[2].data = "Connection succeeded."; /* description */
inf[2].len = strlen(inf[2].data);
trans = 1;
lo_amf = NULL;
amf_ctx.link = &lo_amf;
amf_ctx.free = &s->free;
amf_ctx.log = s->connection->log;
if (ngx_rtmp_amf0_write(&amf_ctx, elts,
sizeof(elts) / sizeof(elts[0])) != NGX_OK)
{
return NGX_ERROR;
}
ngx_rtmp_send_packet(s, &h, lo_amf);
return NGX_OK;
}
ngx_int_t
ngx_rtmp_call(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_call(ngx_rtmp_session_t *s, ngx_chain_t **l)
ngx_rtmp_close(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_close(ngx_rtmp_session_t *s, ngx_chain_t **l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_createstream(ngx_rtmp_session_t *s, ngx_chain_t **l)
ngx_rtmp_createstream(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}

View file

@ -5,63 +5,63 @@
#include "ngx_rtmp.h"
ngx_int_t
ngx_rtmp_play(ngx_rtmp_session_t *s, ngx_chain_t **l)
ngx_rtmp_play(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_play2(ngx_rtmp_session_t *s, ngx_chain_t **l)
ngx_rtmp_play2(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_deletestream(ngx_rtmp_session_t *s, ngx_chain_t **l)
ngx_rtmp_deletestream(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_closestream(ngx_rtmp_session_t *s, ngx_chain_t **l)
ngx_rtmp_closestream(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s, ngx_chain_t **l)
ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_receivevideo(ngx_rtmp_session_t *s, ngx_chain_t **l)
ngx_rtmp_receivevideo(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_publish(ngx_rtmp_session_t *s, ngx_chain_t **l)
ngx_rtmp_publish(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_seek(ngx_rtmp_session_t *s, ngx_chain_t **l)
ngx_rtmp_seek(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}
ngx_int_t
ngx_rtmp_pause(ngx_rtmp_session_t *s, ngx_chain_t **l)
ngx_rtmp_pause(ngx_rtmp_session_t *s, ngx_chain_t *l)
{
return NGX_OK;
}