diff --git a/TODO b/TODO index f22729c..b397e1c 100644 --- a/TODO +++ b/TODO @@ -1,7 +1,32 @@ -+1) string allocations inside ngx_chain_t -+2) AMF0 creation -3) add AMF0 command parsing & replies to RTMP handler -4) improve handshake -5) add session_buckets option to core srv conf +- RTMP replies -Do we need loc confs (=fms apps) ? +- standard-compliance + - improve handshake + - names + +- Input buffers: + allocate a continuous block & + allocate input buffers + in it. Upon chunk size change + re-split the same buffer for + new chunk size. Use temp + buffer as temporary + storage of overflow data. + Max chunk size if 65K which is + really a lot. + +- Output buffers: + implement shared (per-loc/srv) buffers + for output of a) header(max 18b) b) chunk (fixed-sized) + We never change output chunk size. + Chunk buffers should be ref-counted. + +- implement loc confs (=fms apps) + loc options: + - session buckets + - broadcast/file + - input buffer size per connection + - output chunk size + - output chunk buffer size + - output header buffer size + - HTTP callbacks on invoke calls diff --git a/ngx_rtmp.h b/ngx_rtmp.h index f4e1adb..8fdfc6b 100644 --- a/ngx_rtmp.h +++ b/ngx_rtmp.h @@ -102,18 +102,18 @@ typedef struct { typedef struct { - ngx_array_t servers; /* ngx_rtmp_core_srv_conf_t */ - ngx_array_t listen; /* ngx_rtmp_listen_t */ + ngx_array_t servers; /* ngx_rtmp_core_srv_conf_t */ + ngx_array_t listen; /* ngx_rtmp_listen_t */ } ngx_rtmp_core_main_conf_t; typedef struct { - uint8_t type; - uint8_t channel; - uint8_t hsize; - uint32_t size; - uint32_t timer; - uint32_t stream; + uint8_t fmt; /* header format */ + uint32_t csid; /* chunk stream id */ + uint32_t timestamp; + uint32_t mlen; /* message length */ + uint8_t type; /* message type id */ + uint32_t msid; /* message stream id */ } ngx_rtmp_packet_hdr_t; @@ -156,6 +156,7 @@ struct ngx_rtmp_session_s { struct ngx_rtmp_session_s *next; ngx_uint_t flags; + uint32_t csid; }; typedef struct ngx_rtmp_session_s ngx_rtmp_session_t; @@ -191,28 +192,40 @@ typedef struct { } ngx_rtmp_module_t; -/* RTMP packet types*/ -#define NGX_RTMP_PACKET_CHUNK_SIZE 0x01 -#define NGX_RTMP_PACKET_BYTES_READ 0x03 -#define NGX_RTMP_PACKET_PING 0x04 -#define NGX_RTMP_PACKET_SERVER_BW 0x05 -#define NGX_RTMP_PACKET_CLIENT_BW 0x06 -#define NGX_RTMP_PACKET_AUDIO 0x08 -#define NGX_RTMP_PACKET_VIDEO 0x09 -#define NGX_RTMP_PACKET_FLEX 0x0f -#define NGX_RTMP_PACKET_FLEX_SO 0x10 -#define NGX_RTMP_PACKET_FLEX_MSG 0x11 -#define NGX_RTMP_PACKET_NOTIFY 0x12 -#define NGX_RTMP_PACKET_SO 0x13 -#define NGX_RTMP_PACKET_INVOKE 0x14 +/* Chunk header: + * max 3 basic header + * + max 11 message header + * + max 4 extended header (timestamp) */ +#define NGX_RTMP_MAX_CHUNK_HEADER 18 -/* RMTP ping types */ -#define NGX_RMTP_PING_CLEAR_STEAM 0 -#define NGX_RMTP_PING_CLEAR_BUFFER 1 -#define NGX_RMTP_PING_CLIENT_TIME 3 -#define NGX_RMTP_PING_RESET_STREAM 4 -#define NGX_RMTP_PING_PING 6 -#define NGX_RMTP_PING_PONG 7 + +/* RTMP packet types*/ +#define NGX_RTMP_PACKET_CHUNK_SIZE 1 +#define NGX_RTMP_PACKET_ABORT 2 +#define NGX_RTMP_PACKET_ACK 3 +#define NGX_RTMP_PACKET_CTL 4 +#define NGX_RTMP_PACKET_ACK_SIZE 5 +#define NGX_RTMP_PACKET_BANDWIDTH 6 +#define NGX_RTMP_PACKET_EDGE 7 +#define NGX_RTMP_PACKET_AUDIO 8 +#define NGX_RTMP_PACKET_VIDEO 9 +#define NGX_RTMP_PACKET_AMF3_META 15 +#define NGX_RTMP_PACKET_AMF3_SHARED 16 +#define NGX_RTMP_PACKET_AMF3_CMD 17 +#define NGX_RTMP_PACKET_AMF0_META 18 +#define NGX_RTMP_PACKET_AMF0_SHARED 19 +#define NGX_RTMP_PACKET_AMF0_CMD 20 +#define NGX_RTMP_PACKET_AGGREGATE 22 + + +/* RMTP control message types */ +#define NGX_RTMP_CTL_STREAM_BEGIN 0 +#define NGX_RTMP_CTL_STREAM_EOF 1 +#define NGX_RTMP_CTL_STREAM_DRY 2 +#define NGX_RTMP_CTL_SET_BUFLEN 3 +#define NGX_RTMP_CTL_RECORDED 4 +#define NGX_RTMP_CTL_PING_REQUEST 6 +#define NGX_RTMP_CTL_PING_RESPONSE 7 #define NGX_RTMP_MODULE 0x504D5452 /* "RTMP" */ @@ -264,22 +277,22 @@ void ngx_rtmp_send_packet(ngx_rtmp_session_t *s, /* NetConnection methods */ -ngx_int_t ngx_rtmp_connect(ngx_rtmp_session_t *s, ngx_chain_t **l); -ngx_int_t ngx_rtmp_call(ngx_rtmp_session_t *s, ngx_chain_t **l); -ngx_int_t ngx_rtmp_close(ngx_rtmp_session_t *s, ngx_chain_t **l); -ngx_int_t ngx_rtmp_createstream(ngx_rtmp_session_t *s, ngx_chain_t **l); +ngx_int_t ngx_rtmp_connect(ngx_rtmp_session_t *s, ngx_chain_t *l); +ngx_int_t ngx_rtmp_call(ngx_rtmp_session_t *s, ngx_chain_t *l); +ngx_int_t ngx_rtmp_close(ngx_rtmp_session_t *s, ngx_chain_t *l); +ngx_int_t ngx_rtmp_createstream(ngx_rtmp_session_t *s, ngx_chain_t *l); /* NetStream methods */ -ngx_int_t ngx_rtmp_play(ngx_rtmp_session_t *s, ngx_chain_t **l); -ngx_int_t ngx_rtmp_play2(ngx_rtmp_session_t *s, ngx_chain_t **l); -ngx_int_t ngx_rtmp_deletestream(ngx_rtmp_session_t *s, ngx_chain_t **l); -ngx_int_t ngx_rtmp_closestream(ngx_rtmp_session_t *s, ngx_chain_t **l); -ngx_int_t ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s, ngx_chain_t **l); -ngx_int_t ngx_rtmp_receivevideo(ngx_rtmp_session_t *s, ngx_chain_t **l); -ngx_int_t ngx_rtmp_publish(ngx_rtmp_session_t *s, ngx_chain_t **l); -ngx_int_t ngx_rtmp_seek(ngx_rtmp_session_t *s, ngx_chain_t **l); -ngx_int_t ngx_rtmp_pause(ngx_rtmp_session_t *s, ngx_chain_t **l); +ngx_int_t ngx_rtmp_play(ngx_rtmp_session_t *s, ngx_chain_t *l); +ngx_int_t ngx_rtmp_play2(ngx_rtmp_session_t *s, ngx_chain_t *l); +ngx_int_t ngx_rtmp_deletestream(ngx_rtmp_session_t *s, ngx_chain_t *l); +ngx_int_t ngx_rtmp_closestream(ngx_rtmp_session_t *s, ngx_chain_t *l); +ngx_int_t ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s, ngx_chain_t *l); +ngx_int_t ngx_rtmp_receivevideo(ngx_rtmp_session_t *s, ngx_chain_t *l); +ngx_int_t ngx_rtmp_publish(ngx_rtmp_session_t *s, ngx_chain_t *l); +ngx_int_t ngx_rtmp_seek(ngx_rtmp_session_t *s, ngx_chain_t *l); +ngx_int_t ngx_rtmp_pause(ngx_rtmp_session_t *s, ngx_chain_t *l); extern ngx_uint_t ngx_rtmp_max_module; diff --git a/ngx_rtmp_amf0.c b/ngx_rtmp_amf0.c index d63253f..627b0f1 100644 --- a/ngx_rtmp_amf0.c +++ b/ngx_rtmp_amf0.c @@ -5,12 +5,58 @@ #include "ngx_rtmp_amf0.h" #include "ngx_rtmp.h" #include - -#define NGX_RTMP_AMF0_SV(x, y) \ +/* +#define NGX_RTMP_AMF0_SWAP_VALUES(x, y) \ (x) ^= (y); (y) ^= (x); (x) ^= (y) -#define NGX_RTMP_AMF0_SB(x) \ - NGX_RTMP_AMF0_SV(*(uint8_t*)(&x), *((uint8_t*)(&x) + 1)) +#define NGX_RTMP_AMF0_REVERSE2(x) \ + NGX_RTMP_AMF0_SV(*(uint8_t*)(&x) , *((uint8_t*)(&x) + 1)) +*/ +static inline void* +ngx_rtmp_amf0_reverse_copy(void *dst, void* src, size_t len) +{ + size_t k; + + if (dst == NULL || src == NULL) { + return NULL; + } + + for(k = 0; k < len; ++k) { + ((u_char*)dst)[k] = ((u_char*)src)[len - 1 - k]; + } + + return dst; +} + +#define NGX_RTMP_AMF0_DEBUG_SIZE 16 + +#ifdef NGX_DEBUG +static void +ngx_rtmp_amf0_debug(const char* op, ngx_log_t *log, u_char *p, size_t n) +{ + u_char hstr[3 * NGX_RTMP_AMF0_DEBUG_SIZE + 1]; + u_char str[NGX_RTMP_AMF0_DEBUG_SIZE + 1]; + u_char *hp, *sp; + static u_char hex[] = "0123456789ABCDEF"; + size_t i; + + hp = hstr; + sp = str; + + for(i = 0; i < n && i < NGX_RTMP_AMF0_DEBUG_SIZE; ++i) { + *hp++ = ' '; + *hp++ = hex[(*p & 0xf0) >> 4]; + *hp++ = hex[*p & 0x0f]; + *sp++ = (*p >= 0x20 && *p <= 0x7e) ? + *p : (u_char)'?'; + ++p; + } + *hp = *sp = '\0'; + + ngx_log_debug4(NGX_LOG_DEBUG_RTMP, log, 0, + "AMF0 %s (%d)%s '%s'", op, n, hstr, str); +} +#endif static ngx_int_t ngx_rtmp_amf0_get(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n) @@ -34,35 +80,11 @@ ngx_rtmp_amf0_get(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n) p = ngx_cpymem(p, b->pos, n); } b->pos += n; - -#define NGX_RTMP_AMF0_DEBUG_SIZE 16 + #ifdef NGX_DEBUG - { - u_char hstr[3 * NGX_RTMP_AMF0_DEBUG_SIZE + 1]; - u_char str[NGX_RTMP_AMF0_DEBUG_SIZE + 1]; - u_char *hp, *pp, *sp; - static u_char hex[] = "0123456789ABCDEF"; - - hp = hstr; - sp = str; - pp = op; - - while (pp < (u_char*)p - && pp - (u_char*)op < NGX_RTMP_AMF0_DEBUG_SIZE) - { - *hp++ = ' '; - *hp++ = hex[(*pp & 0xf0) >> 4]; - *hp++ = hex[*pp & 0x0f]; - *sp++ = (*pp >= 0x20 && *pp <= 0x7e) ? - *pp : (u_char)'?'; - ++pp; - } - *hp = *sp = '\0'; - - ngx_log_debug3(NGX_LOG_DEBUG_RTMP, ctx->log, 0, - "AMF0 read (%d)%s '%s'", n, hstr, str); - } + ngx_rtmp_amf0_debug("read", ctx->log, (u_char*)op, n); #endif + return NGX_OK; } @@ -88,6 +110,10 @@ ngx_rtmp_amf0_put(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n) size_t size; ngx_chain_t **l, **free; +#ifdef NGX_DEBUG + ngx_rtmp_amf0_debug("write", ctx->log, (u_char*)p, n); +#endif + l = ctx->link; free = ctx->free; @@ -113,7 +139,7 @@ ngx_rtmp_amf0_put(ngx_rtmp_amf0_ctx_t *ctx, void *p, size_t n) size = b->end - b->last; - if (size <= n) { + if (size >= n) { b->last = ngx_cpymem(b->last, p, n); return NGX_OK; } @@ -197,6 +223,7 @@ ngx_rtmp_amf0_read(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t n uint16_t len; ngx_int_t rc; int till_end; + u_char buf[8]; if (nelts & NGX_RTMP_AMF0_TILL_END_FLAG) { till_end = 1; @@ -216,9 +243,10 @@ ngx_rtmp_amf0_read(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t n switch(type) { case NGX_RTMP_AMF0_NUMBER: - if (ngx_rtmp_amf0_get(ctx, data, 8) != NGX_OK) { + if (ngx_rtmp_amf0_get(ctx, buf, 8) != NGX_OK) { return NGX_ERROR; } + ngx_rtmp_amf0_reverse_copy(data, buf, 0); break; case NGX_RTMP_AMF0_BOOLEAN: @@ -228,11 +256,10 @@ ngx_rtmp_amf0_read(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t n break; case NGX_RTMP_AMF0_STRING: - if (ngx_rtmp_amf0_get(ctx, &len, sizeof(len)) != NGX_OK) { + if (ngx_rtmp_amf0_get(ctx, buf, 2) != NGX_OK) { return NGX_ERROR; } - - NGX_RTMP_AMF0_SB(len); + ngx_rtmp_amf0_reverse_copy(&len, buf, 2); if (data == NULL) { rc = ngx_rtmp_amf0_get(ctx, data, len); @@ -297,16 +324,24 @@ static ngx_int_t ngx_rtmp_amf0_write_object(ngx_rtmp_amf0_ctx_t *ctx, ngx_rtmp_amf0_elt_t *elts, size_t nelts) { - uint16_t len; + uint16_t len, len_sb; size_t n; char *name; + u_char buf[2]; for(n = 0; n < nelts; ++n) { name = elts[n].name; - len = strlen(name); + len_sb = len = strlen(name); - if (ngx_rtmp_amf0_put(ctx, &name, len) != NGX_OK) { + if (ngx_rtmp_amf0_put(ctx, + ngx_rtmp_amf0_reverse_copy(buf, + &len, 2), 2) != NGX_OK) + { + return NGX_ERROR; + } + + if (ngx_rtmp_amf0_put(ctx, name, len) != NGX_OK) { return NGX_ERROR; } @@ -317,7 +352,7 @@ ngx_rtmp_amf0_write_object(ngx_rtmp_amf0_ctx_t *ctx, len = 0; - if (ngx_rtmp_amf0_put(ctx, &name, len) != NGX_OK) { + if (ngx_rtmp_amf0_put(ctx, "\00\00", 2) != NGX_OK) { return NGX_ERROR; } @@ -333,6 +368,7 @@ ngx_rtmp_amf0_write(ngx_rtmp_amf0_ctx_t *ctx, uint8_t type; void *data; uint16_t len; + u_char buf[8]; for(n = 0; n < nelts; ++n) { @@ -345,7 +381,10 @@ ngx_rtmp_amf0_write(ngx_rtmp_amf0_ctx_t *ctx, switch(type) { case NGX_RTMP_AMF0_NUMBER: - if (ngx_rtmp_amf0_put(ctx, data, 8) != NGX_OK) { + if (ngx_rtmp_amf0_put(ctx, + ngx_rtmp_amf0_reverse_copy(buf, + data, 8), 8) != NGX_OK) + { return NGX_ERROR; } break; @@ -357,12 +396,13 @@ ngx_rtmp_amf0_write(ngx_rtmp_amf0_ctx_t *ctx, break; case NGX_RTMP_AMF0_STRING: - if (ngx_rtmp_amf0_put(ctx, &len, sizeof(len)) != NGX_OK) { + if (ngx_rtmp_amf0_put(ctx, + ngx_rtmp_amf0_reverse_copy(buf, + &len, 2), 2) != NGX_OK) + { return NGX_ERROR; } - NGX_RTMP_AMF0_SB(len); - if (ngx_rtmp_amf0_put(ctx, data, len) != NGX_OK) { return NGX_ERROR; } diff --git a/ngx_rtmp_handler.c b/ngx_rtmp_handler.c index 1cb2e43..b7b6033 100644 --- a/ngx_rtmp_handler.c +++ b/ngx_rtmp_handler.c @@ -23,20 +23,18 @@ static void ngx_rtmp_send(ngx_event_t *rev); static void ngx_rtmp_close_connection(ngx_connection_t *c); -static size_t hdrsizes[] = { 12, 8, 4, 1 }; - #ifdef NGX_DEBUG static char* ngx_rtmp_packet_type(uint8_t type) { static char* types[] = { "?", "chunk_size", - "?", - "bytes_read", - "ping", - "server_bw", - "client_bw", - "?", + "abort", + "ack", + "ctl", + "ack_size", + "bandwidth", + "edge", "audio", "video", "?", @@ -44,12 +42,14 @@ ngx_rtmp_packet_type(uint8_t type) { "?", "?", "?", - "flex", - "flex_so", - "flex_msg", - "notify", - "so", - "invoke" + "amf3_meta", + "amf3_shared", + "amd3_cmd", + "amf0_meta", + "amf0_shared", + "amf0_cmd", + "?", + "aggregate" }; return type < sizeof(types) / sizeof(types[0]) @@ -212,8 +212,7 @@ ngx_rtmp_init_session(ngx_connection_t *c) s->chunk_size = NGX_RTMP_DEFAULT_CHUNK_SIZE; - /* TODO: we should move preallcation to a func to call on user request */ - bufs.size = s->chunk_size + 14 /* + max header size */; + bufs.size = s->chunk_size + NGX_RTMP_MAX_CHUNK_HEADER; bufs.num = cscf->buffers; s->free = ngx_create_chain_of_bufs(c->pool, &bufs); @@ -363,13 +362,15 @@ restart: void ngx_rtmp_recv(ngx_event_t *rev) { - ngx_int_t n; - ngx_connection_t *c; - ngx_rtmp_session_t *s; - ngx_rtmp_core_srv_conf_t *cscf; - ngx_buf_t *b, *bb; - u_char h, *p, *pp; - ngx_chain_t *lin; + ngx_int_t n; + ngx_connection_t *c; + ngx_rtmp_session_t *s; + ngx_rtmp_core_srv_conf_t *cscf; + ngx_buf_t *b, *bb; + u_char *p, *pp; + ngx_chain_t *lin; + ngx_rtmp_packet_hdr_t *h; + uint32_t timestamp; c = rev->data; s = c->data; @@ -413,86 +414,106 @@ ngx_rtmp_recv(ngx_event_t *rev) return; } - /* first byte of a packet? */ - if (b->last == b->start) { - h = *b->last; - s->in_hdr.hsize = hdrsizes[(h >> 6) & 0x03]; - s->in_hdr.channel = h & 0x3f; + b->last += n; + h = &s->in_hdr; - ngx_log_debug3(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP start %d hd=%d ch=%d", - (int)h, - (int)s->in_hdr.hsize, - (int)s->in_hdr.channel); + /* parse headers */ + if (b->pos == b->start) { + p = b->pos; + timestamp = h->timestamp; + + /* chunk basic header */ + h->fmt = (*p >> 6) & 0x03; + h->csid = *p++ & 0x3f; + + if (h->csid == 0) { + if (b->last - p < 1) + continue; + h->csid = 64; + h->csid += *(uint8_t*)p++; + + } else if (h->csid == 1) { + if (b->last - p < 2) + continue; + h->csid = 64; + h->csid += *(uint8_t*)p++; + h->csid += (uint32_t)256 * (*(uint8_t*)p++); + } + + ngx_log_debug2(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP bheader fmt=%d csid=%D", + (int)h->fmt, h->csid); + + if (h->fmt <= 2 ) { + if (b->last - p < 3) + continue; + /* timestamp: + * big-endian 3b -> little-endian 4b */ + pp = (u_char*)×tamp; + pp[2] = *p++; + pp[1] = *p++; + pp[0] = *p++; + pp[3] = 0; + + if (h->fmt <= 1) { + if (b->last - p < 4) + continue; + /* size: + * big-endian 3b -> little-endian 4b + * type: + * 1b -> 1b*/ + pp = (u_char*)&h->mlen; + pp[2] = *p++; + pp[1] = *p++; + pp[0] = *p++; + pp[3] = 0; + h->type = *(uint8_t*)p++; + + if (h->fmt == 0) { + if (b->last - p < 4) + continue; + /* stream: + * little-endian 4b -> little-endian 4b */ + pp = (u_char*)&h->msid; + pp[0] = *p++; + pp[1] = *p++; + pp[2] = *p++; + pp[3] = *p++; + } + } + + /* extended header */ + if (timestamp == 0x00ffffff) { + if (b->last - p < 4) + continue; + pp = (u_char*)&h->timestamp; + pp[3] = *p++; + pp[2] = *p++; + pp[1] = *p++; + pp[0] = *p++; + } else if (h->fmt) { + h->timestamp += timestamp; + } else { + h->timestamp = timestamp; + } + } + + ngx_log_debug5(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP mheader %s (%d) " + "timestamp=%D mlen=%D msid=%D", + ngx_rtmp_packet_type(h->type), (int)h->type, + h->timestamp, h->mlen, h->msid); + + /* header done */ + b->pos = p; } - b->last += n; - if (b->last - b->pos < s->in_hdr.hsize) + /* parse payload */ + if (b->last - b->pos < (ngx_int_t)ngx_min(h->mlen, s->chunk_size)) continue; - p = b->start + 1; - - /* basic header */ - do { - if (s->in_hdr.hsize < 4) - break; - - /* FIXME: is this fix really needed? - if (s->in_hdr.channel == 1) { - p += 2; - }*/ - - /* timer: - * big-endian 3b -> little-endian 4b */ - pp = (u_char*)&s->in_hdr.timer; - pp[0] = p[2]; - pp[1] = p[1]; - pp[2] = p[0]; - pp[3] = 0; - if (s->in_hdr.hsize < 8) - break; - - /* size: - * big-endian 3b -> little-endian 4b - * type: - * 1b -> 1b*/ - p += 3; - pp = (u_char*)&s->in_hdr.size; - pp[0] = p[2]; - pp[1] = p[1]; - pp[2] = p[0]; - pp[3] = 0; - p += 3; - pp = &s->in_hdr.type; - *pp = *p; - if (s->in_hdr.hsize < 12) - break; - - /* stream: - * little-endian 4b -> little-endian 4b */ - ++p; - ngx_memcpy(&s->in_hdr.stream, p, 4); - p += 4; - - } while(0); - - ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP header %s (%d) ch=%d hd=%d " - "sz=%D tm=%D st=%D", - ngx_rtmp_packet_type(s->in_hdr.type), - (int)s->in_hdr.type, - (int)s->in_hdr.channel, - (int)s->in_hdr.hsize, - s->in_hdr.size, - s->in_hdr.timer, - s->in_hdr.stream); - - if (b->last < p + ngx_min(s->in_hdr.size, s->chunk_size)) - continue; - - b->pos = p; /* if fragmented then wait for more fragments */ - if (s->in_hdr.size > s->chunk_size) { + if (h->mlen > s->chunk_size) { if (s->free == NULL) { ngx_log_error(NGX_LOG_INFO, c->log, NGX_ERROR, "no free buffers"); @@ -503,14 +524,14 @@ ngx_rtmp_recv(ngx_event_t *rev) s->free = s->free->next; lin = lin->next; lin->next = NULL; - s->in_hdr.size -= s->chunk_size; + h->mlen -= s->chunk_size; bb = lin->buf; bb->pos = bb->last = bb->start; continue; } /* handle packet! */ - if (ngx_rtmp_receive_packet(s, &s->in_hdr, s->in) != NGX_OK) { + if (ngx_rtmp_receive_packet(s, h, s->in) != NGX_OK) { ngx_rtmp_close_session(s); return; } @@ -518,10 +539,10 @@ ngx_rtmp_recv(ngx_event_t *rev) bb->pos = bb->last = bb->start; /* copy remained data to first buffer */ - if (s->in_hdr.size < b->last - b->pos) { + if (h->mlen < b->last - b->pos) { bb->last = ngx_movemem(bb->start, - b->pos + s->in_hdr.size, - b->last - b->pos - s->in_hdr.size); + b->pos + h->mlen, + b->last - b->pos - h->mlen); } /* free all but one input buffer */ @@ -537,7 +558,6 @@ ngx_rtmp_recv(ngx_event_t *rev) void ngx_rtmp_send(ngx_event_t *wev) { - ngx_int_t n; ngx_connection_t *c; ngx_rtmp_session_t *s; ngx_rtmp_core_srv_conf_t *cscf; @@ -548,7 +568,8 @@ ngx_rtmp_send(ngx_event_t *wev) cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); if (wev->timedout) { - ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, "client timed out"); + ngx_log_error(NGX_LOG_INFO, c->log, NGX_ETIMEDOUT, + "client timed out"); c->timedout = 1; ngx_rtmp_close_session(s); return; @@ -559,7 +580,6 @@ ngx_rtmp_send(ngx_event_t *wev) } while(s->out) { - l = c->send_chain(c, s->out, 0); if (l == NGX_CHAIN_ERROR) { @@ -567,15 +587,7 @@ ngx_rtmp_send(ngx_event_t *wev) return; } - n = 0; - - if (l != s->out) { - for(ll = s->out; ll->next && ll->next != l; ll = ll->next); - ll->next = s->free; - s->out = l; - } - - if (l != NULL) { + if (l == NULL) { cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); ngx_add_timer(c->write, cscf->timeout); if (ngx_handle_write_event(c->write, 0) != NGX_OK) { @@ -583,6 +595,14 @@ ngx_rtmp_send(ngx_event_t *wev) } return; } + + if (l != s->out) { + for(ll = s->out; + ll->next && ll->next != l; + ll = ll->next); + ll->next = s->free; + s->out = l; + } } ngx_del_event(wev, NGX_WRITE_EVENT, 0); @@ -658,100 +678,172 @@ ngx_rtmp_leave(ngx_rtmp_session_t *s) void ngx_rtmp_send_packet(ngx_rtmp_session_t *s, ngx_rtmp_packet_hdr_t *h, - ngx_chain_t *l) + ngx_chain_t *ll) { ngx_rtmp_packet_hdr_t *lh; - size_t hsel, hsize, size; - ngx_chain_t *ll, **pl; + ngx_int_t hsize, size, nbufs; + ngx_chain_t *l, **pl; ngx_buf_t *b, *bb; u_char *p, *pp; + uint8_t fmt; + uint32_t timestamp, ext_timestamp, mlen; + ngx_connection_t *c; - if (l == NULL) + if (ll == NULL) { return; + } - b = l->buf; - p = b->pos; - + /* detect packet size */ + mlen = 0; + l = ll; + nbufs = 0; while(l) { + mlen += (l->buf->last - l->buf->pos); + ++nbufs; + l = l->next; + } + c = s->connection; + bb = ll->buf; + pp = bb->pos; + lh = &s->out_hdr; + + ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP send %s (%d) csid=%D timestamp=%D " + "mlen=%D msid=%D nbufs=%d", + ngx_rtmp_packet_type(h->type), (int)h->type, + h->csid, h->timestamp, mlen, h->msid, nbufs); + + while(ll) { if (s->free == NULL) { - /* TODO: implement proper packet dropper */ + /* FIXME: implement proper packet dropper */ return; } - /* add new output chunk */ - ll = s->free; + /* append new output buffer */ + l = s->free; s->free = s->free->next; - ll->next = NULL; - + l->next = NULL; for(pl = &s->out; *pl; pl = &(*pl)->next); - *pl = ll; + *pl = l; + b = l->buf; + b->pos = b->last = b->start + NGX_RTMP_MAX_CHUNK_HEADER; - /* put payload at the end; leave space for header */ - bb = ll->buf; - bb->pos = bb->last = bb->end - s->chunk_size; + /* copy payload to new buffer leaving space for header */ + while (b->last < b->end) { + size = b->end - b->last; + if (size < bb->last - pp) { + b->last = ngx_cpymem(b->last, pp, size); + pp += size; + break; + } + b->last = ngx_cpymem(b->last, pp, bb->last - pp); - /* fill new chunk payload */ - while(l && bb->pos != bb->last) { - size = ngx_min(bb->last - bb->pos, b->last - p); - bb->last = ngx_cpymem(bb->last, p, size); - p += size; - if (p != b->last) - continue; - l = l->next; - if (l) { - b = l->buf; - p = b->pos; + ll = ll->next; + if (ll == NULL) { + break; + } + + bb = ll->buf; + pp = bb->pos; + } + + /* FIXME: there can be some occasional + * matches (h->msid == 0) on first out + * packet when we compare it + * against initially zeroed header; + * Though it maybe OK */ + + /* fill header + * we have + * h - new header + * lh - old header for diffs */ + fmt = 0; + hsize = 12; + + if (h->msid && lh->msid == h->msid) { + ++fmt; + hsize -= 4; + if (lh->type == h->type && lh->mlen == mlen) { + ++fmt; + hsize -= 4; + if (lh->timestamp == h->timestamp) { + ++fmt; + hsize -= 3; + } } } - lh = &s->out_hdr; + /* message header */ + timestamp = (fmt ? h->timestamp + : h->timestamp - lh->timestamp); + ext_timestamp = 0; - hsel = 0; - - /* choose header size */ - if (lh->hsize) { - - if (lh->stream == h->stream) - ++hsel; - - if (lh->type == h->type && lh->size == h->size) - ++hsel; - - if (lh->timer == h->timer) - ++hsel; + if (timestamp >= 0x00ffffff) { + ext_timestamp = timestamp; + timestamp = 0x00ffffff; + hsize += 4; } - hsize = hdrsizes[hsel]; + if (h->csid >= 64) { + ++hsize; + if (h->csid >= 320) { + ++hsize; + } + } - bb->pos -= hsize; + /* now we know header size */ + b->pos -= hsize; + p = b->pos; + + /* basic header */ + *p = (fmt << 6); + if (h->csid >= 2 && h->csid <= 63) { + *p++ |= (((uint8_t)h->csid) & 0x3f); + } else if (h->csid >= 64 && h->csid < 320) { + ++p; + *p++ = (uint8_t)(h->csid - 64); + } else { + *p++ |= 1; + *p++ = (uint8_t)(h->csid - 64); + *p++ = (uint8_t)((h->csid - 64) >> 8); + } + + /* message header */ + if (fmt <= 2) { + pp = (u_char*)×tamp; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + if (fmt <= 1) { + pp = (u_char*)&mlen; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + *p++ = h->type; + if (fmt == 0) { + pp = (u_char*)&h->msid; + *p++ = pp[0]; + *p++ = pp[1]; + *p++ = pp[2]; + *p++ = pp[3]; + } + } + } + + /* extended header */ + if (ext_timestamp) { + pp = (u_char*)&ext_timestamp; + *p++ = pp[3]; + *p++ = pp[2]; + *p++ = pp[1]; + *p++ = pp[0]; + } *lh = *h; - - /* fill header: bb->pos..bb->pos+hsize */ - - pp = bb->pos; - - *pp++ = (((uint8_t)hsel & 0x03) << 6) | (h->channel & 0x3f); - - if (hsize == 1) - continue; - - /* TODO: watch endians */ - - pp = ngx_cpymem(pp, &h->timer, 3); - - if (hsize == 4) - continue; - - pp = ngx_cpymem(pp, &h->size, 3); - *pp++ = h->type; - - if (hsize == 8) - continue; - - ngx_memcpy(pp, &h->stream, 4); } + + ngx_rtmp_send(c->write); } @@ -776,51 +868,49 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s, }; ngx_rtmp_amf0_ctx_t amf_ctx; - c = s->connection; - if (l == NULL) { return NGX_ERROR; } + c = s->connection; + b = l->buf; + cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); + + /* a session handles only one chunk stream + * but #2 is a special one for protocol messages */ + if (h->csid != 2) { + s->csid = h->csid; + } + + #ifdef NGX_DEBUG { - int nch; + int nbufs; ngx_chain_t *ch; - for(nch = 1, ch = l; ch->next; ch = ch->next, ++nch); + for(nbufs = 1, ch = l; + ch->next; + ch = ch->next, ++nbufs); - ngx_log_debug8(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP packet %s (%d) ch=%d hd=%d " - "sz=%d tm=%D st=%D nbfs=%d", - ngx_rtmp_packet_type(h->type), - (int)h->type, - (int)h->channel, - (int)h->hsize, - (int)h->size, - h->timer, - h->stream, - nch); + ngx_log_debug7(NGX_LOG_DEBUG_RTMP, c->log, 0, + "RTMP recv %s (%d) csid=%D timestamp=%D " + "mlen=%D msid=%D nbufs=%d", + ngx_rtmp_packet_type(h->type), (int)h->type, + h->csid, h->timestamp, h->mlen, h->msid, nbufs); } #endif - cscf = ngx_rtmp_get_module_srv_conf(s, ngx_rtmp_core_module); - - b = l->buf; - switch(h->type) { case NGX_RTMP_PACKET_CHUNK_SIZE: - if (b->last - b->pos < 4) - return NGX_ERROR; - /*ngx_rtmp_set_chunk_size(s, *(uint32_t*)(b->pos));*/ break; - case NGX_RTMP_PACKET_BYTES_READ: - if (b->last - b->pos < 4) - return NGX_ERROR; - /*ngx_rtmp_set_bytes_read(s, *(uint32_t*)(b->pos));*/ + case NGX_RTMP_PACKET_ABORT: break; - case NGX_RTMP_PACKET_PING: + case NGX_RTMP_PACKET_ACK: + break; + + case NGX_RTMP_PACKET_CTL: if (b->last - b->pos < 6) return NGX_ERROR; @@ -829,53 +919,39 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s, ping.v3 = (uint16_t*)(b->pos + 4); switch(*ping.v1) { - - case NGX_RMTP_PING_CLEAR_STEAM: + case NGX_RTMP_CTL_STREAM_BEGIN: break; - case NGX_RMTP_PING_CLEAR_BUFFER: - /*ngx_rtmp_clear_buffer(s);*/ + case NGX_RTMP_CTL_STREAM_EOF: break; - case NGX_RMTP_PING_CLIENT_TIME: - /*ngx_rtmp_set_client_buffer_time(s, *ping.v3);*/ + case NGX_RTMP_CTL_STREAM_DRY: break; - case NGX_RMTP_PING_RESET_STREAM: + case NGX_RTMP_CTL_SET_BUFLEN: break; - case NGX_RMTP_PING_PING: + case NGX_RTMP_CTL_RECORDED: + break; + + case NGX_RTMP_CTL_PING_REQUEST: /* ping client from server */ - *ping.v1 = NGX_RMTP_PING_PONG; - ngx_rtmp_send_packet(s, h, l); + /**ping.v1 = NGX_RTMP_PING_PONG; + ngx_rtmp_send_packet(s, h, l);*/ break; - case NGX_RMTP_PING_PONG: - /* TODO: wtf the arg? */ - /*ngx_rtmp_set_ping_time(s, *ping.v2);*/ + case NGX_RTMP_CTL_PING_RESPONSE: break; } - break; - case NGX_RTMP_PACKET_SERVER_BW: - if (b->last - b->pos < 4) - return NGX_ERROR; - - /*ngx_rtmp_set_server_bw(s, *(uint32_t*)b->pos, - b->last - b->pos >= 5 - ? *(uint8_t*)(b->pos + 4) - : 0);*/ + case NGX_RTMP_PACKET_ACK_SIZE: break; - case NGX_RTMP_PACKET_CLIENT_BW: - if (b->last - b->pos < 4) - return NGX_ERROR; + case NGX_RTMP_PACKET_BANDWIDTH: + break; - /*ngx_rtmp_set_client_bw(s, *(uint32_t*)b->pos, - b->last - b->pos >= 5 - ? *(uint8_t*)(b->pos + 4) - : 0);*/ + case NGX_RTMP_PACKET_EDGE: break; case NGX_RTMP_PACKET_AUDIO: @@ -899,22 +975,21 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s, } } - break; - case NGX_RTMP_PACKET_SO: - /* TODO: implement - * plain: name, version, persistent; + lots of amf key-values - * ignore so far */ + case NGX_RTMP_PACKET_AMF3_META: + case NGX_RTMP_PACKET_AMF3_SHARED: + case NGX_RTMP_PACKET_AMF3_CMD: + /* FIXME: AMF3 it not yet supported */ break; - case NGX_RTMP_PACKET_NOTIFY: - /* TODO: Implement HTTP callbacks on such packets - * with AMF fields converted to HTTP - * GET vars*/ + case NGX_RTMP_PACKET_AMF0_META: break; - case NGX_RTMP_PACKET_INVOKE: + case NGX_RTMP_PACKET_AMF0_SHARED: + break; + + case NGX_RTMP_PACKET_AMF0_CMD: amf_ctx.link = &l; amf_ctx.free = &s->free; amf_ctx.log = c->log; @@ -922,51 +997,43 @@ ngx_int_t ngx_rtmp_receive_packet(ngx_rtmp_session_t *s, memset(invoke_name, 0, sizeof(invoke_name)); if (ngx_rtmp_amf0_read(&amf_ctx, &invoke_name_elt, 1) != NGX_OK) { ngx_log_debug0(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP invoke failed"); + "AMF0 cmd failed"); return NGX_ERROR; } ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "RTMP invoke '%s'", + "AMF0 cmd '%s'", invoke_name); -#define INVOKE_CALL(name) \ +#define _CMD_CALL(name) \ if (!strcasecmp(invoke_name, #name)) { \ - return ngx_rtmp_##name(s, &l); \ + return ngx_rtmp_##name(s, l); \ } /* NetConnection calls */ - INVOKE_CALL(connect); - INVOKE_CALL(call); - INVOKE_CALL(close); - INVOKE_CALL(createstream); + _CMD_CALL(connect); + _CMD_CALL(call); + _CMD_CALL(close); + _CMD_CALL(createstream); /* NetStream calls */ - INVOKE_CALL(play); - INVOKE_CALL(play2); - INVOKE_CALL(deletestream); - INVOKE_CALL(closestream); - INVOKE_CALL(receiveaudio); - INVOKE_CALL(receivevideo); - INVOKE_CALL(publish); - INVOKE_CALL(seek); - INVOKE_CALL(pause); + _CMD_CALL(play); + _CMD_CALL(play2); + _CMD_CALL(deletestream); + _CMD_CALL(closestream); + _CMD_CALL(receiveaudio); + _CMD_CALL(receivevideo); + _CMD_CALL(publish); + _CMD_CALL(seek); + _CMD_CALL(pause); -#undef INVOKE_CALL +#undef _CMD_CALL break; - case NGX_RTMP_PACKET_FLEX: - case NGX_RTMP_PACKET_FLEX_SO: - case NGX_RTMP_PACKET_FLEX_MSG: - ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "flex packets are not supported %d", - (int)h->type); - break; - default: ngx_log_debug1(NGX_LOG_DEBUG_RTMP, c->log, 0, - "unsupported packet type %d", + "unexpected packet type %d", (int)h->type); } diff --git a/ngx_rtmp_netconn.c b/ngx_rtmp_netconn.c index 5a8f53b..39c871b 100644 --- a/ngx_rtmp_netconn.c +++ b/ngx_rtmp_netconn.c @@ -3,27 +3,148 @@ */ #include "ngx_rtmp.h" +#include "ngx_rtmp_amf0.h" ngx_int_t -ngx_rtmp_connect(ngx_rtmp_session_t *s, ngx_chain_t **l) +ngx_rtmp_connect(ngx_rtmp_session_t *s, ngx_chain_t *li) +{ + ngx_rtmp_packet_hdr_t h; + ngx_chain_t lo, *lo_amf; + ngx_buf_t bo; + u_char buf[6], *p; + uint16_t ctl_evt; + uint32_t msid; + uint32_t ack_size; + uint8_t limit_type; + ngx_rtmp_amf0_ctx_t amf_ctx; + + static double trans; + + static ngx_rtmp_amf0_elt_t inf[] = { + { NGX_RTMP_AMF0_STRING, "code", NULL, 0 }, + { NGX_RTMP_AMF0_STRING, "level", NULL, 0 }, + { NGX_RTMP_AMF0_STRING, "description", NULL, 0 }, + }; + + static ngx_rtmp_amf0_elt_t elts[] = { + { NGX_RTMP_AMF0_STRING, 0, "_result", sizeof("_result") - 1 }, + { NGX_RTMP_AMF0_NUMBER, 0, &trans, sizeof(trans) }, + { NGX_RTMP_AMF0_NULL , 0, NULL, 0 }, + { NGX_RTMP_AMF0_OBJECT, 0, inf, sizeof(inf) }, + }; + + /* 1) send 'Window Acknowledgement Size' + * + * 2) send 'Set Peer Bandwidth' + * + * 3*) receive 'Window Acknowledgement' + * + * 4) send 'User Control Message(StreamBegin)' + * ' + * 5) AMF0 reply: + * + * "_ result" + * 1 + * NULL + * { code : "NetConnection.Connect.Success", + * level : "status", + * description : "Connection succeeded." } + */ + memset(&h, 0, sizeof(h)); + h.timestamp = 0; + h.csid = 2; /* standard */ + h.msid = 0; + lo.buf = &bo; + lo.next = NULL; + + /* send Window Acknowledgement Size*/ + h.type = NGX_RTMP_PACKET_ACK_SIZE; + ack_size = 65536; + p = (u_char*)&ack_size; + buf[0] = p[3]; + buf[1] = p[2]; + buf[2] = p[1]; + buf[3] = p[0]; + bo.start = bo.pos = buf; + bo.end = bo.last = bo.start + 4; + ngx_rtmp_send_packet(s, &h, &lo); + + /* send Set Peer Bandwidth */ + h.type = NGX_RTMP_PACKET_BANDWIDTH; + ack_size = 65536; + limit_type = 1; + p = (u_char*)&ack_size; + buf[0] = p[3]; + buf[1] = p[2]; + buf[2] = p[1]; + buf[3] = p[0]; + buf[4] = limit_type; + bo.start = bo.pos = buf; + bo.end = bo.last = bo.start + 5; + ngx_rtmp_send_packet(s, &h, &lo); + + /* send STREAM_BEGIN */ + + h.type = NGX_RTMP_PACKET_CTL; + + msid = 1; + ctl_evt = NGX_RTMP_CTL_STREAM_BEGIN; + + p = (u_char*)&ctl_evt; + buf[0] = p[1]; + buf[1] = p[0]; + + p = (u_char*)&msid; + buf[2] = p[3]; + buf[3] = p[2]; + buf[4] = p[1]; + buf[5] = p[0]; + + bo.start = bo.pos = buf; + bo.end = bo.last = bo.start + sizeof(buf); + + ngx_rtmp_send_packet(s, &h, &lo); + + /* send 'connect' reply */ + h.type = NGX_RTMP_PACKET_AMF0_CMD; + h.csid = s->csid; + inf[0].data = "NetConnection.Connect.Success"; /* code */ + inf[0].len = strlen(inf[0].data); + inf[1].data = "status"; /* level */ + inf[1].len = strlen(inf[1].data); + inf[2].data = "Connection succeeded."; /* description */ + inf[2].len = strlen(inf[2].data); + trans = 1; + + lo_amf = NULL; + amf_ctx.link = &lo_amf; + amf_ctx.free = &s->free; + amf_ctx.log = s->connection->log; + if (ngx_rtmp_amf0_write(&amf_ctx, elts, + sizeof(elts) / sizeof(elts[0])) != NGX_OK) + { + return NGX_ERROR; + } + + ngx_rtmp_send_packet(s, &h, lo_amf); + + return NGX_OK; +} + +ngx_int_t +ngx_rtmp_call(ngx_rtmp_session_t *s, ngx_chain_t *l) { return NGX_OK; } ngx_int_t -ngx_rtmp_call(ngx_rtmp_session_t *s, ngx_chain_t **l) +ngx_rtmp_close(ngx_rtmp_session_t *s, ngx_chain_t *l) { return NGX_OK; } ngx_int_t -ngx_rtmp_close(ngx_rtmp_session_t *s, ngx_chain_t **l) -{ - return NGX_OK; -} - -ngx_int_t -ngx_rtmp_createstream(ngx_rtmp_session_t *s, ngx_chain_t **l) +ngx_rtmp_createstream(ngx_rtmp_session_t *s, ngx_chain_t *l) { return NGX_OK; } diff --git a/ngx_rtmp_netstream.c b/ngx_rtmp_netstream.c index 90c4ab0..1628ffc 100644 --- a/ngx_rtmp_netstream.c +++ b/ngx_rtmp_netstream.c @@ -5,63 +5,63 @@ #include "ngx_rtmp.h" ngx_int_t -ngx_rtmp_play(ngx_rtmp_session_t *s, ngx_chain_t **l) +ngx_rtmp_play(ngx_rtmp_session_t *s, ngx_chain_t *l) { return NGX_OK; } ngx_int_t -ngx_rtmp_play2(ngx_rtmp_session_t *s, ngx_chain_t **l) +ngx_rtmp_play2(ngx_rtmp_session_t *s, ngx_chain_t *l) { return NGX_OK; } ngx_int_t -ngx_rtmp_deletestream(ngx_rtmp_session_t *s, ngx_chain_t **l) +ngx_rtmp_deletestream(ngx_rtmp_session_t *s, ngx_chain_t *l) { return NGX_OK; } ngx_int_t -ngx_rtmp_closestream(ngx_rtmp_session_t *s, ngx_chain_t **l) +ngx_rtmp_closestream(ngx_rtmp_session_t *s, ngx_chain_t *l) { return NGX_OK; } ngx_int_t -ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s, ngx_chain_t **l) +ngx_rtmp_receiveaudio(ngx_rtmp_session_t *s, ngx_chain_t *l) { return NGX_OK; } ngx_int_t -ngx_rtmp_receivevideo(ngx_rtmp_session_t *s, ngx_chain_t **l) +ngx_rtmp_receivevideo(ngx_rtmp_session_t *s, ngx_chain_t *l) { return NGX_OK; } ngx_int_t -ngx_rtmp_publish(ngx_rtmp_session_t *s, ngx_chain_t **l) +ngx_rtmp_publish(ngx_rtmp_session_t *s, ngx_chain_t *l) { return NGX_OK; } ngx_int_t -ngx_rtmp_seek(ngx_rtmp_session_t *s, ngx_chain_t **l) +ngx_rtmp_seek(ngx_rtmp_session_t *s, ngx_chain_t *l) { return NGX_OK; } ngx_int_t -ngx_rtmp_pause(ngx_rtmp_session_t *s, ngx_chain_t **l) +ngx_rtmp_pause(ngx_rtmp_session_t *s, ngx_chain_t *l) { return NGX_OK; }