]> git.saurik.com Git - apple/libdispatch.git/blob - examples/DispatchWebServer/DispatchWebServer.c
libdispatch-84.5.tar.gz
[apple/libdispatch.git] / examples / DispatchWebServer / DispatchWebServer.c
1 /*
2 * Copyright (c) 2008 Apple Inc. All rights reserved.
3 *
4 * @APPLE_DTS_LICENSE_HEADER_START@
5 *
6 * IMPORTANT: This Apple software is supplied to you by Apple Computer, Inc.
7 * ("Apple") in consideration of your agreement to the following terms, and your
8 * use, installation, modification or redistribution of this Apple software
9 * constitutes acceptance of these terms. If you do not agree with these terms,
10 * please do not use, install, modify or redistribute this Apple software.
11 *
12 * In consideration of your agreement to abide by the following terms, and
13 * subject to these terms, Apple grants you a personal, non-exclusive license,
14 * under Apple's copyrights in this original Apple software (the "Apple Software"),
15 * to use, reproduce, modify and redistribute the Apple Software, with or without
16 * modifications, in source and/or binary forms; provided that if you redistribute
17 * the Apple Software in its entirety and without modifications, you must retain
18 * this notice and the following text and disclaimers in all such redistributions
19 * of the Apple Software. Neither the name, trademarks, service marks or logos of
20 * Apple Computer, Inc. may be used to endorse or promote products derived from
21 * the Apple Software without specific prior written permission from Apple. Except
22 * as expressly stated in this notice, no other rights or licenses, express or
23 * implied, are granted by Apple herein, including but not limited to any patent
24 * rights that may be infringed by your derivative works or by other works in
25 * which the Apple Software may be incorporated.
26 *
27 * The Apple Software is provided by Apple on an "AS IS" basis. APPLE MAKES NO
28 * WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE IMPLIED
29 * WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR
30 * PURPOSE, REGARDING THE APPLE SOFTWARE OR ITS USE AND OPERATION ALONE OR IN
31 * COMBINATION WITH YOUR PRODUCTS.
32 *
33 * IN NO EVENT SHALL APPLE BE LIABLE FOR ANY SPECIAL, INDIRECT, INCIDENTAL OR
34 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
35 * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
36 * ARISING IN ANY WAY OUT OF THE USE, REPRODUCTION, MODIFICATION AND/OR
37 * DISTRIBUTION OF THE APPLE SOFTWARE, HOWEVER CAUSED AND WHETHER UNDER THEORY OF
38 * CONTRACT, TORT (INCLUDING NEGLIGENCE), STRICT LIABILITY OR OTHERWISE, EVEN IF
39 * APPLE HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
40 *
41 * @APPLE_DTS_LICENSE_HEADER_END@
42 */
43
44 /* A tiny web server that does as much stuff the "dispatch way" as it can, like a queue per connection... */
45
46 /****************************************************************************
47 overview of dispatch related operations:
48
49 main() {
50 have dump_reqs() called every 5 to 6 seconds, and on every SIGINFO
51 and SIGPIPE
52
53 have accept_cb() called when there are new connections on our port
54
55 have reopen_logfile_when_needed() called whenever our logfile is
56 renamed, deleted, or forcibly closed
57 }
58
59 reopen_logfile_when_needed() {
60 call ourself whenever our logfile is renamed, deleted, or forcibly
61 closed
62 }
63
64 accept_cb() {
65 allocate a new queue to handle network and file I/O, and timers
66 for a series of HTTP requests coming from a new network connection
67
68 have read_req() called (on the new queue) when there
69 is network traffic for the new connection
70
71 have req_free(new_req) called when the connection is "done" (no
72 pending work to be executed on the queue, an no sources left to
73 generate new work for the queue)
74 }
75
76 req_free() {
77 uses dispatch_get_current_queue() and dispatch_async() to call itself
78 "on the right queue"
79 }
80
81 read_req() {
82 If there is a timeout source delete_source() it
83
84 if (we have a whole request) {
85 make a new dispatch source (req->fd_rd.ds) for the
86 content file
87
88 have clean up fd, req->fd and req->fd_rd (if
89 appropriate) when the content file source is canceled
90
91 have read_filedata called when the content file is
92 read to be read
93
94 if we already have a dispatch source for "network
95 socket ready to be written", enable it. Otherwise
96 make one, and have write_filedata called when it
97 time to write to it.
98
99 disable the call to read_req
100 }
101
102 close the connection if something goes wrong
103 }
104
105 write_filedata() {
106 close the connection if anything goes wrong
107
108 if (we have written the whole HTTP document) {
109 timeout in a little bit, closing the connection if we
110 haven't received a new command
111
112 enable the call to read_req
113 }
114
115 if (we have written all the buffered data) {
116 disable the call to write_filedata()
117 }
118 }
119
120 read_filedata() {
121 if (nothing left to read) {
122 delete the content file dispatch source
123 } else {
124 enable the call to write_filedata()
125 }
126 }
127
128 qprintf, qfprintf, qflush
129 schedule stdio calls on a single queue
130
131 disable_source, enable_source
132 implements a binary enable/disable on top of dispatch's
133 counted suspend/resume
134
135 delete_source
136 cancels the source (this example program uses source
137 cancelation to schedule any source cleanup it needs,
138 so "delete" needs a cancel).
139
140 ensure the source isn't suspended
141
142 release the reference, which _should_ be the last
143 reference (this example program never has more
144 then one reference to a source)
145
146 ****************************************************************************/
147
148 #include <stdio.h>
149 #include <signal.h>
150 #include <string.h>
151 #include <strings.h>
152 #include <fcntl.h>
153 #include <stdarg.h>
154 #include <assert.h>
155 #include <netinet/in.h>
156 #include <libgen.h>
157 #include <pwd.h>
158 #include <sys/socket.h>
159 #include <sys/uio.h>
160 #include <arpa/inet.h>
161 #include <netdb.h>
162 #include <stdlib.h>
163 #include <regex.h>
164 #include <time.h>
165 #include <malloc/malloc.h>
166 #include <sys/stat.h>
167 #include <unistd.h>
168 #include <zlib.h>
169 #include <dispatch/dispatch.h>
170 #include <Block.h>
171 #include <errno.h>
172
173 char *DOC_BASE = NULL;
174 char *log_name = NULL;
175 FILE *logfile = NULL;
176 char *argv0 = "a.out";
177 char *server_port = "8080";
178 const int re_request_nmatch = 4;
179 regex_t re_first_request, re_nth_request, re_accept_deflate, re_host;
180
181
182 // qpf is the queue that we schedule our "stdio file I/O", which serves as a lock,
183 // and orders the output, and also gets it "out of the way" of our main line execution
184 dispatch_queue_t qpf;
185
186 void qfprintf(FILE *f, const char *fmt, ...) __attribute__((format(printf, 2, 3)));
187
188 void qfprintf(FILE *f, const char *fmt, ...) {
189 va_list ap;
190 va_start(ap, fmt);
191 char *str;
192 /* We gennerate the formatted string on the same queue (or
193 thread) that calls qfprintf, that way the values can change
194 while the fputs call is being sent to the qpf queue, or waiting
195 for other work to complete ont he qpf queue. */
196
197 vasprintf(&str, fmt, ap);
198 dispatch_async(qpf, ^{ fputs(str, f); free(str); });
199 if ('*' == *fmt) {
200 dispatch_sync(qpf, ^{ fflush(f); });
201 }
202 va_end(ap);
203 }
204
205 void qfflush(FILE *f) {
206 dispatch_sync(qpf, ^{ fflush(f); });
207 }
208
209 void reopen_logfile_when_needed() {
210 // We don't want to use a fd with a lifetime managed by something else
211 // because we need to close it inside the cancel handler (see below)
212 int lf_dup = dup(fileno(logfile));
213 FILE **lf = &logfile;
214
215 // We register the vnode callback on the qpf queue since that is where
216 // we do all our logfile printing. (we set up to reopen the logfile
217 // if the "old one" has been deleted or renamed (or revoked). This
218 // makes it pretty safe to mv the file to a new name, delay breifly,
219 // then gzip it. Safer to move the file to a new name, wait for the
220 // "old" file to reappear, then gzip. Niftier then doing the move,
221 // sending a SIGHUP to the right process (somehow) and then doing
222 // as above. Well, maybe it'll never catch on as "the new right
223 /// thing", but it makes a nifty demo.
224 dispatch_source_t vn = dispatch_source_create(DISPATCH_SOURCE_TYPE_VNODE, lf_dup, DISPATCH_VNODE_REVOKE|DISPATCH_VNODE_RENAME|DISPATCH_VNODE_DELETE, qpf);
225
226 dispatch_source_set_event_handler(vn, ^{
227 printf("lf_dup is %d (logfile's fileno=%d), closing it\n", lf_dup, fileno(logfile));
228 fprintf(logfile, "# flush n' roll!\n");
229 dispatch_cancel(vn);
230 dispatch_release(vn);
231 fflush(logfile);
232 *lf = freopen(log_name, "a", logfile);
233
234 // The new logfile has (or may have) a diffrent fd from the old one, so
235 // we have to register it again
236 reopen_logfile_when_needed();
237 });
238
239 dispatch_source_set_cancel_handler(vn, ^{ close(lf_dup); });
240
241 dispatch_resume(vn);
242 }
243
244 #define qprintf(fmt...) qfprintf(stdout, ## fmt);
245
246 struct buffer {
247 // Manage a buffer, currently at sz bytes, but will realloc if needed
248 // The buffer has a part that we read data INTO, and a part that we
249 // write data OUT OF.
250 //
251 // Best use of the space would be a circular buffer (and we would
252 // use readv/writev and pass around iovec structs), but we use a
253 // simpler layout:
254 // data from buf to outof is wasted. From outof to into is
255 // "ready to write data OUT OF", from into until buf+sz is
256 // "ready to read data IN TO".
257 size_t sz;
258 unsigned char *buf;
259 unsigned char *into, *outof;
260 };
261
262 struct request_source {
263 // libdispatch gives suspension a counting behaviour, we want a simple on/off behaviour, so we use
264 // this struct to provide track suspensions
265 dispatch_source_t ds;
266 bool suspended;
267 };
268
269 // The request struct manages an actiave HTTP request/connection. It gets reused for pipelined HTTP clients.
270 // Every request has it's own queue where all of it's network traffic, and source file I/O as well as
271 // compression (when requested by the HTTP client) is done.
272 struct request {
273 struct sockaddr_in r_addr;
274 z_stream *deflate;
275 // cmd_buf holds the HTTP request
276 char cmd_buf[8196], *cb;
277 char chunk_num[13], *cnp; // Big enough for 8 digits plus \r\n\r\n\0
278 bool needs_zero_chunk;
279 bool reuse_guard;
280 short status_number;
281 size_t chunk_bytes_remaining;
282 char *q_name;
283 int req_num; // For debugging
284 int files_served; // For this socket
285 dispatch_queue_t q;
286 // "sd" is the socket descriptor, where the network I/O for this request goes. "fd" is the source file (or -1)
287 int sd, fd;
288 // fd_rd is for read events from the source file (say /Users/YOU/Sites/index.html for a GET /index.html request)
289 // sd_rd is for read events from the network socket (we suspend it after we read an HTTP request header, and
290 // resume it when we complete a request)
291 // sd_wr is for write events to the network socket (we suspend it when we have no buffered source data to send,
292 // and resume it when we have data ready to send)
293 // timeo is the timeout event waiting for a new client request header.
294 struct request_source fd_rd, sd_rd, sd_wr, timeo;
295 uint64_t timeout_at;
296 struct stat sb;
297
298 // file_b is where we read data from fd into.
299 // For compressed GET requests:
300 // - data is compressed from file_b into deflate_b
301 // - data is written to the network socket from deflate_b
302 // For uncompressed GET requests
303 // - data is written to the network socket from file_b
304 // - deflate_b is unused
305 struct buffer file_b, deflate_b;
306
307 ssize_t total_written;
308 };
309
310 void req_free(struct request *req);
311
312 void disable_source(struct request *req, struct request_source *rs) {
313 // we want a binary suspend state, not a counted state. Our
314 // suspend flag is "locked" by only being used on req->q, this
315 // assert makes sure we are in a valid context to write the new
316 // suspend value.
317 assert(req->q == dispatch_get_current_queue());
318 if (!rs->suspended) {
319 rs->suspended = true;
320 dispatch_suspend(rs->ds);
321 }
322 }
323
324 void enable_source(struct request *req, struct request_source *rs) {
325 assert(req->q == dispatch_get_current_queue());
326 if (rs->suspended) {
327 rs->suspended = false;
328 dispatch_resume(rs->ds);
329 }
330 }
331
332 void delete_source(struct request *req, struct request_source *rs) {
333 assert(req->q == dispatch_get_current_queue());
334 if (rs->ds) {
335 /* sources need to be resumed before they can be deleted
336 (otherwise an I/O and/or cancel block might be stranded
337 waiting for a resume that will never come, causing
338 leaks) */
339
340 enable_source(req, rs);
341 dispatch_cancel(rs->ds);
342 dispatch_release(rs->ds);
343 }
344 rs->ds = NULL;
345 rs->suspended = false;
346 }
347
348 size_t buf_into_sz(struct buffer *b) {
349 return (b->buf + b->sz) - b->into;
350 }
351
352 void buf_need_into(struct buffer *b, size_t cnt) {
353 // resize buf so into has at least cnt bytes ready to use
354 size_t sz = buf_into_sz(b);
355 if (cnt <= sz) {
356 return;
357 }
358 sz = malloc_good_size(cnt - sz + b->sz);
359 unsigned char *old = b->buf;
360 // We could special case b->buf == b->into && b->into == b->outof to
361 // do a free & malloc rather then realloc, but after testing it happens
362 // only for the 1st use of the buffer, where realloc is the same cost as
363 // malloc anyway.
364 b->buf = reallocf(b->buf, sz);
365 assert(b->buf);
366 b->sz = sz;
367 b->into = b->buf + (b->into - old);
368 b->outof = b->buf + (b->outof - old);
369 }
370
371 void buf_used_into(struct buffer *b, size_t used) {
372 b->into += used;
373 assert(b->into <= b->buf + b->sz);
374 }
375
376 size_t buf_outof_sz(struct buffer *b) {
377 return b->into - b->outof;
378 }
379
380 int buf_sprintf(struct buffer *b, char *fmt, ...) __attribute__((format(printf,2,3)));
381
382 int buf_sprintf(struct buffer *b, char *fmt, ...) {
383 va_list ap;
384 va_start(ap, fmt);
385 size_t s = buf_into_sz(b);
386 int l = vsnprintf((char *)(b->into), s, fmt, ap);
387 if (l < s) {
388 buf_used_into(b, l);
389 } else {
390 // Reset ap -- vsnprintf has already used it.
391 va_end(ap);
392 va_start(ap, fmt);
393 buf_need_into(b, l);
394 s = buf_into_sz(b);
395 l = vsnprintf((char *)(b->into), s, fmt, ap);
396 assert(l <= s);
397 buf_used_into(b, l);
398 }
399 va_end(ap);
400
401 return l;
402 }
403
404 void buf_used_outof(struct buffer *b, size_t used) {
405 b->outof += used;
406 //assert(b->into <= b->outof);
407 assert(b->outof <= b->into);
408 if (b->into == b->outof) {
409 b->into = b->outof = b->buf;
410 }
411 }
412
413 char *buf_debug_str(struct buffer *b) {
414 char *ret = NULL;
415 asprintf(&ret, "S%d i#%d o#%d", b->sz, buf_into_sz(b), buf_outof_sz(b));
416 return ret;
417 }
418
419 uint64_t getnanotime() {
420 struct timeval tv;
421 gettimeofday(&tv, NULL);
422
423 return tv.tv_sec * NSEC_PER_SEC + tv.tv_usec * NSEC_PER_USEC;
424 }
425
426 int n_req;
427 struct request **debug_req;
428
429 void dump_reqs() {
430 int i = 0;
431 static last_reported = -1;
432
433 // We want to see the transition into n_req == 0, but we don't need to
434 // keep seeing it.
435 if (n_req == 0 && n_req == last_reported) {
436 return;
437 } else {
438 last_reported = n_req;
439 }
440
441 qprintf("%d actiave requests to dump\n", n_req);
442 uint64_t now = getnanotime();
443 /* Because we iterate over the debug_req array in this queue
444 ("the main queue"), it has to "own" that array. All manipulation
445 of the array as a whole will have to be done on this queue. */
446
447 for(i = 0; i < n_req; i++) {
448 struct request *req = debug_req[i];
449 qprintf("%s sources: fd_rd %p%s, sd_rd %p%s, sd_wr %p%s, timeo %p%s\n", req->q_name, req->fd_rd.ds, req->fd_rd.suspended ? " (SUSPENDED)" : "", req->sd_rd.ds, req->sd_rd.suspended ? " (SUSPENDED)" : "", req->sd_wr.ds, req->sd_wr.suspended ? " (SUSPENDED)" : "", req->timeo.ds, req->timeo.suspended ? " (SUSPENDED)" : "");
450 if (req->timeout_at) {
451 double when = req->timeout_at - now;
452 when /= NSEC_PER_SEC;
453 if (when < 0) {
454 qprintf(" timeout %f seconds ago\n", -when);
455 } else {
456 qprintf(" timeout in %f seconds\n", when);
457 }
458 } else {
459 qprintf(" timeout_at not set\n");
460 }
461 char *file_bd = buf_debug_str(&req->file_b), *deflate_bd = buf_debug_str(&req->deflate_b);
462 qprintf(" file_b %s; deflate_b %s\n cmd_buf used %ld; fd#%d; files_served %d\n", file_bd, deflate_bd, (long)(req->cb - req->cmd_buf), req->fd, req->files_served);
463 if (req->deflate) {
464 qprintf(" deflate total in: %ld ", req->deflate->total_in);
465 }
466 qprintf("%s total_written %lu, file size %lld\n", req->deflate ? "" : " ", req->total_written, req->sb.st_size);
467 free(file_bd);
468 free(deflate_bd);
469 }
470 }
471
472 void req_free(struct request *req) {
473 assert(!req->reuse_guard);
474 if (dispatch_get_main_queue() != dispatch_get_current_queue()) {
475 /* dispatch_set_finalizer_f arranges to have us "invoked
476 asynchronously on req->q's target queue". However,
477 we want to manipulate the debug_req array in ways
478 that are unsafe anywhere except the same queue that
479 dump_reqs runs on (which happens to be the main queue).
480 So if we are running anywhere but the main queue, we
481 just arrange to be called there */
482
483 dispatch_async(dispatch_get_main_queue(), ^{ req_free(req); });
484 return;
485 }
486
487 req->reuse_guard = true;
488 *(req->cb) = '\0';
489 qprintf("$$$ req_free %s; fd#%d; buf: %s\n", dispatch_queue_get_label(req->q), req->fd, req->cmd_buf);
490 assert(req->sd_rd.ds == NULL && req->sd_wr.ds == NULL);
491 close(req->sd);
492 assert(req->fd_rd.ds == NULL);
493 if (req->fd >= 0) close(req->fd);
494 free(req->file_b.buf);
495 free(req->deflate_b.buf);
496 free(req->q_name);
497 free(req->deflate);
498 free(req);
499
500 int i;
501 bool found = false;
502 for(i = 0; i < n_req; i++) {
503 if (found) {
504 debug_req[i -1] = debug_req[i];
505 } else {
506 found = (debug_req[i] == req);
507 }
508 }
509 debug_req = reallocf(debug_req, sizeof(struct request *) * --n_req);
510 assert(n_req >= 0);
511 }
512
513 void close_connection(struct request *req) {
514 qprintf("$$$ close_connection %s, served %d files -- canceling all sources\n", dispatch_queue_get_label(req->q), req->files_served);
515 delete_source(req, &req->fd_rd);
516 delete_source(req, &req->sd_rd);
517 delete_source(req, &req->sd_wr);
518 delete_source(req, &req->timeo);
519 }
520
521 // We have some "content data" (either from the file, or from
522 // compressing the file), and the network socket is ready for us to
523 // write it
524 void write_filedata(struct request *req, size_t avail) {
525 /* We always attempt to write as much data as we have. This
526 is safe becuase we use non-blocking I/O. It is a good idea
527 becuase the amount of buffer space that dispatch tells us may
528 be stale (more space could have opened up, or memory presure
529 may have caused it to go down). */
530
531 struct buffer *w_buf = req->deflate ? &req->deflate_b : &req->file_b;
532 ssize_t sz = buf_outof_sz(w_buf);
533 if (req->deflate) {
534 struct iovec iov[2];
535 if (!req->chunk_bytes_remaining) {
536 req->chunk_bytes_remaining = sz;
537 req->needs_zero_chunk = sz != 0;
538 req->cnp = req->chunk_num;
539 int n = snprintf(req->chunk_num, sizeof(req->chunk_num), "\r\n%lx\r\n%s", sz, sz ? "" : "\r\n");
540 assert(n <= sizeof(req->chunk_num));
541 }
542 iov[0].iov_base = req->cnp;
543 iov[0].iov_len = req->cnp ? strlen(req->cnp) : 0;
544 iov[1].iov_base = w_buf->outof;
545 iov[1].iov_len = (req->chunk_bytes_remaining < sz) ? req->chunk_bytes_remaining : sz;
546 sz = writev(req->sd, iov, 2);
547 if (sz > 0) {
548 if (req->cnp) {
549 if (sz >= strlen(req->cnp)) {
550 req->cnp = NULL;
551 } else {
552 req->cnp += sz;
553 }
554 }
555 sz -= iov[0].iov_len;
556 sz = (sz < 0) ? 0 : sz;
557 req->chunk_bytes_remaining -= sz;
558 }
559 } else {
560 sz = write(req->sd, w_buf->outof, sz);
561 }
562 if (sz > 0) {
563 buf_used_outof(w_buf, sz);
564 } else if (sz < 0) {
565 int e = errno;
566 qprintf("write_filedata %s write error: %d %s\n", dispatch_queue_get_label(req->q), e, strerror(e));
567 close_connection(req);
568 return;
569 }
570
571 req->total_written += sz;
572 off_t bytes = req->total_written;
573 if (req->deflate) {
574 bytes = req->deflate->total_in - buf_outof_sz(w_buf);
575 if (req->deflate->total_in < buf_outof_sz(w_buf)) {
576 bytes = 0;
577 }
578 }
579 if (bytes == req->sb.st_size) {
580 if (req->needs_zero_chunk && req->deflate && (sz || req->cnp)) {
581 return;
582 }
583
584 // We have transfered the file, time to write the log entry.
585
586 // We don't deal with " in the request string, this is an example of how
587 // to use dispatch, not how to do C string manipulation, eh?
588 size_t rlen = strcspn(req->cmd_buf, "\r\n");
589 char tstr[45], astr[45];
590 struct tm tm;
591 time_t clock;
592 time(&clock);
593 strftime(tstr, sizeof(tstr), "%d/%b/%Y:%H:%M:%S +0", gmtime_r(&clock, &tm));
594 addr2ascii(AF_INET, &req->r_addr.sin_addr, sizeof(struct in_addr), astr);
595 qfprintf(logfile, "%s - - [%s] \"%.*s\" %hd %zd\n", astr, tstr, (int)rlen, req->cmd_buf, req->status_number, req->total_written);
596
597 int64_t t_offset = 5 * NSEC_PER_SEC + req->files_served * NSEC_PER_SEC / 10;
598 int64_t timeout_at = req->timeout_at = getnanotime() + t_offset;
599
600 req->timeo.ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, req->q);
601 dispatch_source_set_timer(req->timeo.ds, dispatch_time(DISPATCH_TIME_NOW, t_offset), NSEC_PER_SEC, NSEC_PER_SEC);
602 dispatch_source_set_event_handler(req->timeo.ds, ^{
603 if (req->timeout_at == timeout_at) {
604 qfprintf(stderr, "$$$ -- timeo fire (delta=%f) -- close connection: q=%s\n", (getnanotime() - (double)timeout_at) / NSEC_PER_SEC, dispatch_queue_get_label(req->q));
605 close_connection(req);
606 } else {
607 // This happens if the timeout value has been updated, but a pending timeout event manages to race in before the cancel
608 }
609 });
610 dispatch_resume(req->timeo.ds);
611
612 req->files_served++;
613 qprintf("$$$ wrote whole file (%s); timeo %p, about to enable %p and close %d, total_written=%zd, this is the %d%s file served\n", dispatch_queue_get_label(req->q), req->timeo.ds, req->sd_rd.ds, req->fd, req->total_written, req->files_served, (1 == req->files_served) ? "st" : (2 == req->files_served) ? "nd" : "th");
614 enable_source(req, &req->sd_rd);
615 if (req->fd_rd.ds) {
616 delete_source(req, &req->fd_rd);
617 }
618 req->cb = req->cmd_buf;
619 } else {
620 assert(bytes <= req->sb.st_size);
621 }
622
623 if (0 == buf_outof_sz(w_buf)) {
624 // The write buffer is now empty, so we don't need to know when sd is ready for us to write to it.
625 disable_source(req, &req->sd_wr);
626 }
627 }
628
629 // Our "content file" has some data ready for us to read.
630 void read_filedata(struct request *req, size_t avail) {
631 if (avail == 0) {
632 delete_source(req, &req->fd_rd);
633 return;
634 }
635
636 /* We make sure we can read at least as many bytes as dispatch
637 says are avilable, but if our buffer is bigger we will read as
638 much as we have space for. We have the file opened in non-blocking
639 mode so this is safe. */
640
641 buf_need_into(&req->file_b, avail);
642 size_t rsz = buf_into_sz(&req->file_b);
643 ssize_t sz = read(req->fd, req->file_b.into, rsz);
644 if (sz >= 0) {
645 assert(req->sd_wr.ds);
646 size_t sz0 = buf_outof_sz(&req->file_b);
647 buf_used_into(&req->file_b, sz);
648 assert(sz == buf_outof_sz(&req->file_b) - sz0);
649 } else {
650 int e = errno;
651 qprintf("read_filedata %s read error: %d %s\n", dispatch_queue_get_label(req->q), e, strerror(e));
652 close_connection(req);
653 return;
654 }
655 if (req->deflate) {
656 // Note:: deflateBound is "worst case", we could try with any non-zero
657 // buffer, and alloc more if we get Z_BUF_ERROR...
658 buf_need_into(&req->deflate_b, deflateBound(req->deflate, buf_outof_sz(&req->file_b)));
659 req->deflate->next_in = (req->file_b.outof);
660 size_t o_sz = buf_outof_sz(&req->file_b);
661 req->deflate->avail_in = o_sz;
662 req->deflate->next_out = req->deflate_b.into;
663 size_t i_sz = buf_into_sz(&req->deflate_b);
664 req->deflate->avail_out = i_sz;
665 assert(req->deflate->avail_in + req->deflate->total_in <= req->sb.st_size);
666 // at EOF we want to use Z_FINISH, otherwise we pass Z_NO_FLUSH so we get maximum compression
667 int rc = deflate(req->deflate, (req->deflate->avail_in + req->deflate->total_in >= req->sb.st_size) ? Z_FINISH : Z_NO_FLUSH);
668 assert(rc == Z_OK || rc == Z_STREAM_END);
669 buf_used_outof(&req->file_b, o_sz - req->deflate->avail_in);
670 buf_used_into(&req->deflate_b, i_sz - req->deflate->avail_out);
671 if (i_sz != req->deflate->avail_out) {
672 enable_source(req, &req->sd_wr);
673 }
674 } else {
675 enable_source(req, &req->sd_wr);
676 }
677 }
678
679 // We are waiting to for an HTTP request (we eitther havn't gotten
680 // the first request, or pipelneing is on, and we finished a request),
681 // and there is data to read on the network socket.
682 void read_req(struct request *req, size_t avail) {
683 if (req->timeo.ds) {
684 delete_source(req, &req->timeo);
685 }
686
687 // -1 to account for the trailing NUL
688 int s = (sizeof(req->cmd_buf) - (req->cb - req->cmd_buf)) -1;
689 if (s == 0) {
690 qprintf("read_req fd#%d command overflow\n", req->sd);
691 close_connection(req);
692 return;
693 }
694 int rd = read(req->sd, req->cb, s);
695 if (rd > 0) {
696 req->cb += rd;
697 if (req->cb > req->cmd_buf + 4) {
698 int i;
699 for(i = -4; i != 0; i++) {
700 char ch = *(req->cb + i);
701 if (ch != '\n' && ch != '\r') {
702 break;
703 }
704 }
705 if (i == 0) {
706 *(req->cb) = '\0';
707
708 assert(buf_outof_sz(&req->file_b) == 0);
709 assert(buf_outof_sz(&req->deflate_b) == 0);
710 regmatch_t pmatch[re_request_nmatch];
711 regex_t *rex = req->files_served ? &re_first_request : &re_nth_request;
712 int rc = regexec(rex, req->cmd_buf, re_request_nmatch, pmatch, 0);
713 if (rc) {
714 char ebuf[1024];
715 regerror(rc, rex, ebuf, sizeof(ebuf));
716 qprintf("\n$$$ regexec error: %s, ditching request: '%s'\n", ebuf, req->cmd_buf);
717 close_connection(req);
718 return;
719 } else {
720 if (!strncmp("GET", req->cmd_buf + pmatch[1].rm_so, pmatch[1].rm_eo - pmatch[1].rm_so)) {
721 rc = regexec(&re_accept_deflate, req->cmd_buf, 0, NULL, 0);
722 assert(rc == 0 || rc == REG_NOMATCH);
723 // to disable deflate code:
724 // rc = REG_NOMATCH;
725 if (req->deflate) {
726 deflateEnd(req->deflate);
727 free(req->deflate);
728 }
729 req->deflate = (0 == rc) ? calloc(1, sizeof(z_stream)) : NULL;
730 char path_buf[4096];
731 strlcpy(path_buf, DOC_BASE, sizeof(path_buf));
732 // WARNING: this doesn't avoid use of .. in the path
733 // do get outside of DOC_ROOT, a real web server would
734 // really have to avoid that.
735 char ch = *(req->cmd_buf + pmatch[2].rm_eo);
736 *(req->cmd_buf + pmatch[2].rm_eo) = '\0';
737 strlcat(path_buf, req->cmd_buf + pmatch[2].rm_so, sizeof(path_buf));
738 *(req->cmd_buf + pmatch[2].rm_eo) = ch;
739 req->fd = open(path_buf, O_RDONLY|O_NONBLOCK);
740 qprintf("GET req for %s, path: %s, deflate: %p; fd#%d\n", dispatch_queue_get_label(req->q), path_buf, req->deflate, req->fd);
741 size_t n;
742 if (req->fd < 0) {
743 const char *msg = "<HTML><HEAD><TITLE>404 Page not here</TITLE></HEAD><BODY><P>You step in the stream,<BR>but the water has moved on.<BR>This <B>page is not here</B>.<BR></BODY></HTML>";
744 req->status_number = 404;
745 n = buf_sprintf(&req->file_b, "HTTP/1.1 404 Not Found\r\nContent-Length: %zu\r\nExpires: now\r\nServer: %s\r\n\r\n%s", strlen(msg), argv0, msg);
746 req->sb.st_size = 0;
747 } else {
748 rc = fstat(req->fd, &req->sb);
749 assert(rc >= 0);
750 if (req->sb.st_mode & S_IFDIR) {
751 req->status_number = 301;
752 regmatch_t hmatch[re_request_nmatch];
753 rc = regexec(&re_host, req->cmd_buf, re_request_nmatch, hmatch, 0);
754 assert(rc == 0 || rc == REG_NOMATCH);
755 if (rc == REG_NOMATCH) {
756 hmatch[1].rm_so = hmatch[1].rm_eo = 0;
757 }
758 n = buf_sprintf(&req->file_b, "HTTP/1.1 301 Redirect\r\nContent-Length: 0\r\nExpires: now\r\nServer: %s\r\nLocation: http://%*.0s/%*.0s/index.html\r\n\r\n", argv0, (int)(hmatch[1].rm_eo - hmatch[1].rm_so), req->cmd_buf + hmatch[1].rm_so, (int)(pmatch[2].rm_eo - pmatch[2].rm_so), req->cmd_buf + pmatch[2].rm_so);
759 req->sb.st_size = 0;
760 close(req->fd);
761 req->fd = -1;
762 } else {
763 req->status_number = 200;
764 if (req->deflate) {
765 n = buf_sprintf(&req->deflate_b, "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nContent-Encoding: deflate\r\nExpires: now\r\nServer: %s\r\n", argv0);
766 req->chunk_bytes_remaining = buf_outof_sz(&req->deflate_b);
767 } else {
768 n = buf_sprintf(req->deflate ? &req->deflate_b : &req->file_b, "HTTP/1.1 200 OK\r\nContent-Length: %lld\r\nExpires: now\r\nServer: %s\r\n\r\n", req->sb.st_size, argv0);
769 }
770 }
771 }
772
773 if (req->status_number != 200) {
774 free(req->deflate);
775 req->deflate = NULL;
776 }
777
778 if (req->deflate) {
779 rc = deflateInit(req->deflate, Z_BEST_COMPRESSION);
780 assert(rc == Z_OK);
781 }
782
783 // Cheat: we don't count the header bytes as part of total_written
784 req->total_written = -buf_outof_sz(&req->file_b);
785 if (req->fd >= 0) {
786 req->fd_rd.ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, req->fd, 0, req->q);
787 // Cancelation is async, so we capture the fd and read sources we will want to operate on as the req struct may have moved on to a new set of values
788 int fd = req->fd;
789 dispatch_source_t fd_rd = req->fd_rd.ds;
790 dispatch_source_set_cancel_handler(req->fd_rd.ds, ^{
791 close(fd);
792 if (req->fd == fd) {
793 req->fd = -1;
794 }
795 if (req->fd_rd.ds == fd_rd) {
796 req->fd_rd.ds = NULL;
797 }
798 });
799 dispatch_source_set_event_handler(req->fd_rd.ds, ^{
800 if (req->fd_rd.ds) {
801 read_filedata(req, dispatch_source_get_data(req->fd_rd.ds));
802 }
803 });
804 dispatch_resume(req->fd_rd.ds);
805 } else {
806 req->fd_rd.ds = NULL;
807 }
808
809 if (req->sd_wr.ds) {
810 enable_source(req, &req->sd_wr);
811 } else {
812 req->sd_wr.ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE, req->sd, 0, req->q);
813 dispatch_source_set_event_handler(req->sd_wr.ds, ^{ write_filedata(req, dispatch_source_get_data(req->sd_wr.ds)); });
814 dispatch_resume(req->sd_wr.ds);
815 }
816 disable_source(req, &req->sd_rd);
817 }
818 }
819 }
820 }
821 } else if (rd == 0) {
822 qprintf("### (%s) read_req fd#%d rd=0 (%s); %d files served\n", dispatch_queue_get_label(req->q), req->sd, (req->cb == req->cmd_buf) ? "no final request" : "incomplete request", req->files_served);
823 close_connection(req);
824 return;
825 } else {
826 int e = errno;
827 qprintf("reqd_req fd#%d rd=%d err=%d %s\n", req->sd, rd, e, strerror(e));
828 close_connection(req);
829 return;
830 }
831 }
832
833 // We have a new connection, allocate a req struct & set up a read event handler
834 void accept_cb(int fd) {
835 static int req_num = 0;
836 struct request *new_req = calloc(1, sizeof(struct request));
837 assert(new_req);
838 new_req->cb = new_req->cmd_buf;
839 socklen_t r_len = sizeof(new_req->r_addr);
840 int s = accept(fd, (struct sockaddr *)&(new_req->r_addr), &r_len);
841 if (s < 0) {
842 qfprintf(stderr, "accept failure (rc=%d, errno=%d %s)\n", s, errno, strerror(errno));
843 return;
844 }
845 assert(s >= 0);
846 new_req->sd = s;
847 new_req->req_num = req_num;
848 asprintf(&(new_req->q_name), "req#%d s#%d", req_num++, s);
849 qprintf("accept_cb fd#%d; made: %s\n", fd, new_req->q_name);
850
851 // All further work for this request will happen "on" new_req->q,
852 // except the final tear down (see req_free())
853 new_req->q = dispatch_queue_create(new_req->q_name, NULL);
854 dispatch_set_context(new_req->q, new_req);
855 dispatch_set_finalizer_f(new_req->q, (dispatch_function_t)req_free);
856
857 debug_req = reallocf(debug_req, sizeof(struct request *) * ++n_req);
858 debug_req[n_req -1] = new_req;
859
860
861 new_req->sd_rd.ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, new_req->sd, 0, new_req->q);
862 dispatch_source_set_event_handler(new_req->sd_rd.ds, ^{
863 read_req(new_req, dispatch_source_get_data(new_req->sd_rd.ds));
864 });
865
866 // We want our queue to go away when all of it's sources do, so we
867 // drop the reference dispatch_queue_create gave us & rely on the
868 // references each source holds on the queue to keep it alive.
869 dispatch_release(new_req->q);
870 dispatch_resume(new_req->sd_rd.ds);
871 }
872
873 int main(int argc, char *argv[]) {
874 int sock = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
875 assert(sock > 0);
876 int rc;
877 struct addrinfo ai_hints, *my_addr;
878
879 qpf = dispatch_queue_create("printf", NULL);
880
881 argv0 = basename(argv[0]);
882 struct passwd *pw = getpwuid(getuid());
883 assert(pw);
884 asprintf(&DOC_BASE, "%s/Sites/", pw->pw_dir);
885 asprintf(&log_name, "%s/Library/Logs/%s-transfer.log", pw->pw_dir, argv0);
886 logfile = fopen(log_name, "a");
887 reopen_logfile_when_needed(logfile, log_name);
888
889 bzero(&ai_hints, sizeof(ai_hints));
890 ai_hints.ai_flags = AI_PASSIVE;
891 ai_hints.ai_family = PF_INET;
892 ai_hints.ai_socktype = SOCK_STREAM;
893 ai_hints.ai_protocol = IPPROTO_TCP;
894 rc = getaddrinfo(NULL, server_port, &ai_hints, &my_addr);
895 assert(rc == 0);
896
897 qprintf("Serving content from %s on port %s, logging transfers to %s\n", DOC_BASE, server_port, log_name);
898
899 int yes = 1;
900 rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes));
901 assert(rc == 0);
902 yes = 1;
903 rc = setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes));
904 assert(rc == 0);
905
906 rc = bind(sock, my_addr->ai_addr, my_addr->ai_addr->sa_len);
907 assert(rc >= 0);
908
909 rc = listen(sock, 25);
910 assert(rc >= 0);
911
912 rc = regcomp(&re_first_request, "^([A-Z]+)[ \t]+([^ \t\n]+)[ \t]+HTTP/1\\.1[\r\n]+", REG_EXTENDED);
913 assert(rc == 0);
914
915 rc = regcomp(&re_nth_request, "^([A-Z]+)[ \t]+([^ \t\n]+)([ \t]+HTTP/1\\.1)?[\r\n]+", REG_EXTENDED);
916 assert(rc == 0);
917
918 rc = regcomp(&re_accept_deflate, "[\r\n]+Accept-Encoding:(.*,)? *deflate[,\r\n]+", REG_EXTENDED);
919 assert(rc == 0);
920
921 rc = regcomp(&re_host, "[\r\n]+Host: *([^ \r\n]+)[ \r\n]+", REG_EXTENDED);
922 assert(rc == 0);
923
924 dispatch_source_t accept_ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_READ, sock, 0, dispatch_get_main_queue());
925 dispatch_source_set_event_handler(accept_ds, ^{ accept_cb(sock); });
926 assert(accept_ds);
927 dispatch_resume(accept_ds);
928
929 sigset_t sigs;
930 sigemptyset(&sigs);
931 sigaddset(&sigs, SIGINFO);
932 sigaddset(&sigs, SIGPIPE);
933
934 int s;
935 for(s = 0; s < NSIG; s++) {
936 if (sigismember(&sigs, s)) {
937 dispatch_source_t sig_ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_SIGNAL, s, 0, dispatch_get_main_queue());
938 assert(sig_ds);
939 dispatch_source_set_event_handler(sig_ds, ^{ dump_reqs(); });
940 dispatch_resume(sig_ds);
941 }
942 }
943
944 rc = sigprocmask(SIG_BLOCK, &sigs, NULL);
945 assert(rc == 0);
946
947 dispatch_source_t dump_timer_ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, dispatch_get_main_queue());
948 dispatch_source_set_timer(dump_timer_ds, DISPATCH_TIME_NOW, 5 * NSEC_PER_SEC, NSEC_PER_SEC);
949 dispatch_source_set_event_handler(dump_timer_ds, ^{ dump_reqs(); });
950 dispatch_resume(dump_timer_ds);
951
952 dispatch_main();
953 printf("dispatch_main returned\n");
954
955 return 1;
956 }