2  * Copyright (c) 2008 Apple Inc.  All rights reserved. 
   4  * @APPLE_DTS_LICENSE_HEADER_START@ 
   6  * IMPORTANT:  This Apple software is supplied to you by Apple Computer, Inc. 
   7  * ("Apple") in consideration of your agreement to the following terms, and your 
   8  * use, installation, modification or redistribution of this Apple software 
   9  * constitutes acceptance of these terms.  If you do not agree with these terms, 
  10  * please do not use, install, modify or redistribute this Apple software. 
  12  * In consideration of your agreement to abide by the following terms, and 
  13  * subject to these terms, Apple grants you a personal, non-exclusive license, 
  14  * under Apple's copyrights in this original Apple software (the "Apple Software"), 
  15  * to use, reproduce, modify and redistribute the Apple Software, with or without 
  16  * modifications, in source and/or binary forms; provided that if you redistribute 
  17  * the Apple Software in its entirety and without modifications, you must retain 
  18  * this notice and the following text and disclaimers in all such redistributions 
  19  * of the Apple Software.  Neither the name, trademarks, service marks or logos of 
  20  * Apple Computer, Inc. may be used to endorse or promote products derived from 
  21  * the Apple Software without specific prior written permission from Apple.  Except 
  22  * as expressly stated in this notice, no other rights or licenses, express or 
  23  * implied, are granted by Apple herein, including but not limited to any patent 
  24  * rights that may be infringed by your derivative works or by other works in 
  25  * which the Apple Software may be incorporated. 
  27  * The Apple Software is provided by Apple on an "AS IS" basis.  APPLE MAKES NO 
  28  * WARRANTIES, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION THE IMPLIED 
  29  * WARRANTIES OF NON-INFRINGEMENT, MERCHANTABILITY AND FITNESS FOR A PARTICULAR 
  30  * PURPOSE, REGARDING THE APPLE SOFTWARE OR ITS USE AND OPERATION ALONE OR IN 
  31  * COMBINATION WITH YOUR PRODUCTS.  
  33  * IN NO EVENT SHALL APPLE BE LIABLE FOR ANY SPECIAL, INDIRECT, INCIDENTAL OR 
  34  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE 
  35  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 
  36  * ARISING IN ANY WAY OUT OF THE USE, REPRODUCTION, MODIFICATION AND/OR 
  37  * DISTRIBUTION OF THE APPLE SOFTWARE, HOWEVER CAUSED AND WHETHER UNDER THEORY OF 
  38  * CONTRACT, TORT (INCLUDING NEGLIGENCE), STRICT LIABILITY OR OTHERWISE, EVEN IF 
  39  * APPLE HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 
  41  * @APPLE_DTS_LICENSE_HEADER_END@ 
  44 /* A tiny web server that does as much stuff the "dispatch way" as it can, like a queue per connection... */ 
  46 /**************************************************************************** 
  47 overview of dispatch related operations: 
  50         have dump_reqs() called every 5 to 6 seconds, and on every SIGINFO  
  53         have accept_cb() called when there are new connections on our port 
  55         have reopen_logfile_when_needed() called whenever our logfile is 
  56         renamed, deleted, or forcibly closed 
  59 reopen_logfile_when_needed() { 
  60         call ourself whenever our logfile is renamed, deleted, or forcibly  
  65         allocate a new queue to handle network and file I/O, and timers 
  66         for a series of HTTP requests coming from a new network connection 
  68         have read_req() called (on the new queue) when there 
  69         is network traffic for the new connection 
  71         have req_free(new_req) called when the connection is "done" (no 
  72         pending work to be executed on the queue, an no sources left to 
  73         generate new work for the queue) 
  77         uses dispatch_get_current_queue() and dispatch_async() to call itself 
  82         If there is a timeout source delete_source() it 
  84         if (we have a whole request) { 
  85                 make a new dispatch source (req->fd_rd.ds) for the 
  88                 have clean up fd, req->fd and req->fd_rd (if 
  89                 appropriate) when the content file source is canceled 
  91                 have read_filedata called when the content file is 
  94                 if we already have a dispatch source for "network 
  95                 socket ready to be written", enable it.  Otherwise 
  96                 make one, and have write_filedata called when it 
  99                 disable the call to read_req 
 102         close the connection if something goes wrong 
 106         close the connection if anything goes wrong 
 108         if (we have written the whole HTTP document) { 
 109                 timeout in a little bit, closing the connection if we  
 110                 haven't received a new command 
 112                 enable the call to read_req 
 115         if (we have written all the buffered data) { 
 116                 disable the call to write_filedata() 
 121         if (nothing left to read) { 
 122                 delete the content file dispatch source 
 124                 enable the call to write_filedata() 
 128 qprintf, qfprintf, qflush 
 129         schedule stdio calls on a single queue 
 131 disable_source, enable_source 
 132         implements a binary enable/disable on top of dispatch's 
 133         counted suspend/resume 
 136         cancels the source (this example program uses source 
 137         cancelation to schedule any source cleanup it needs, 
 138         so "delete" needs a cancel). 
 140         ensure the source isn't suspended 
 142         release the reference, which _should_ be the last 
 143         reference (this example program never has more 
 144         then one reference to a source) 
 146 ****************************************************************************/ 
 155 #include <netinet/in.h> 
 158 #include <sys/socket.h> 
 160 #include <arpa/inet.h> 
 165 #include <malloc/malloc.h> 
 166 #include <sys/stat.h> 
 169 #include <dispatch/dispatch.h> 
 173 char *DOC_BASE 
= NULL
; 
 174 char *log_name 
= NULL
; 
 175 FILE *logfile 
= NULL
; 
 176 char *argv0 
= "a.out"; 
 177 char *server_port 
= "8080"; 
 178 const int re_request_nmatch 
= 4; 
 179 regex_t re_first_request
, re_nth_request
, re_accept_deflate
, re_host
; 
 182 // qpf is the queue that we schedule our "stdio file I/O", which serves as a lock, 
 183 // and orders the output, and also gets it "out of the way" of our main line execution 
 184 dispatch_queue_t qpf
; 
 186 void qfprintf(FILE *f
, const char *fmt
, ...) __attribute__((format(printf
, 2, 3))); 
 188 void qfprintf(FILE *f
, const char *fmt
, ...) { 
 192     /* We gennerate the formatted string on the same queue (or 
 193       thread) that calls qfprintf, that way the values can change 
 194       while the fputs call is being sent to the qpf queue, or waiting 
 195       for other work to complete ont he qpf queue. */ 
 197     vasprintf(&str
, fmt
, ap
); 
 198     dispatch_async(qpf
, ^{ fputs(str
, f
); free(str
); }); 
 200             dispatch_sync(qpf
, ^{ fflush(f
); }); 
 205 void qfflush(FILE *f
) { 
 206         dispatch_sync(qpf
, ^{ fflush(f
); }); 
 209 void reopen_logfile_when_needed() { 
 210     // We don't want to use a fd with a lifetime managed by something else 
 211     // because we need to close it inside the cancel handler (see below) 
 212     int lf_dup 
= dup(fileno(logfile
)); 
 213     FILE **lf 
= &logfile
; 
 215     // We register the vnode callback on the qpf queue since that is where 
 216     // we do all our logfile printing.   (we set up to reopen the logfile 
 217     // if the "old one" has been deleted or renamed (or revoked).  This 
 218     // makes it pretty safe to mv the file to a new name, delay breifly, 
 219     // then gzip it.   Safer to move the file to a new name, wait for the 
 220     // "old" file to reappear, then gzip.   Niftier then doing the move, 
 221     // sending a SIGHUP to the right process (somehow) and then doing 
 222     // as above.    Well, maybe it'll never catch on as "the new right  
 223     /// thing", but it makes a nifty demo. 
 224     dispatch_source_t vn 
= dispatch_source_create(DISPATCH_SOURCE_TYPE_VNODE
, lf_dup
, DISPATCH_VNODE_REVOKE
|DISPATCH_VNODE_RENAME
|DISPATCH_VNODE_DELETE
, qpf
); 
 226     dispatch_source_set_event_handler(vn
, ^{ 
 227         printf("lf_dup is %d (logfile's fileno=%d), closing it\n", lf_dup
, fileno(logfile
)); 
 228         fprintf(logfile
, "# flush n' roll!\n"); 
 230         dispatch_release(vn
); 
 232         *lf 
= freopen(log_name
, "a", logfile
); 
 234         // The new logfile has (or may have) a diffrent fd from the old one, so 
 235         // we have to register it again 
 236         reopen_logfile_when_needed(); 
 239     dispatch_source_set_cancel_handler(vn
, ^{ close(lf_dup
); }); 
 244 #define qprintf(fmt...) qfprintf(stdout, ## fmt); 
 247     // Manage a buffer, currently at sz bytes, but will realloc if needed 
 248     // The buffer has a part that we read data INTO, and a part that we 
 249     // write data OUT OF. 
 251     // Best use of the space would be a circular buffer (and we would 
 252     // use readv/writev and pass around iovec structs), but we use a 
 254     //   data from buf to outof is wasted.   From outof to into is 
 255     //   "ready to write data OUT OF", from into until buf+sz is 
 256     //   "ready to read data IN TO". 
 259     unsigned char *into
, *outof
; 
 262 struct request_source 
{ 
 263         // libdispatch gives suspension a counting behaviour, we want a simple on/off behaviour, so we use 
 264         // this struct to provide track suspensions 
 265         dispatch_source_t ds
; 
 269 // The request struct manages an actiave HTTP request/connection.   It gets reused for pipelined HTTP clients. 
 270 // Every request has it's own queue where all of it's network traffic, and source file I/O as well as 
 271 // compression (when requested by the HTTP client) is done. 
 273     struct sockaddr_in r_addr
; 
 275     // cmd_buf holds the HTTP request 
 276     char cmd_buf
[8196], *cb
; 
 277     char chunk_num
[13], *cnp
;  // Big enough for 8 digits plus \r\n\r\n\0 
 278     bool needs_zero_chunk
; 
 281     size_t chunk_bytes_remaining
; 
 283     int req_num
;    // For debugging 
 284     int files_served
;   // For this socket 
 286     // "sd" is the socket descriptor, where the network I/O for this request goes.   "fd" is the source file (or -1) 
 288     // fd_rd is for read events from the source file (say /Users/YOU/Sites/index.html for a GET /index.html request) 
 289     // sd_rd is for read events from the network socket (we suspend it after we read an HTTP request header, and 
 290     // resume it when we complete a request) 
 291     // sd_wr is for write events to the network socket (we suspend it when we have no buffered source data to send, 
 292     // and resume it when we have data ready to send) 
 293     // timeo is the timeout event waiting for a new client request header. 
 294     struct request_source fd_rd
, sd_rd
, sd_wr
, timeo
; 
 298     // file_b is where we read data from fd into. 
 299     // For compressed GET requests: 
 300     //  - data is compressed from file_b into deflate_b 
 301     //  - data is written to the network socket from deflate_b 
 302     // For uncompressed GET requests 
 303     //  - data is written to the network socket from file_b 
 304     //  - deflate_b is unused 
 305     struct buffer file_b
, deflate_b
; 
 307     ssize_t total_written
; 
 310 void req_free(struct request 
*req
); 
 312 void disable_source(struct request 
*req
, struct request_source 
*rs
) { 
 313     // we want a binary suspend state, not a counted state.   Our 
 314     // suspend flag is "locked" by only being used on req->q, this 
 315     // assert makes sure we are in a valid context to write the new 
 317     assert(req
->q 
== dispatch_get_current_queue()); 
 318     if (!rs
->suspended
) { 
 319             rs
->suspended 
= true; 
 320             dispatch_suspend(rs
->ds
); 
 324 void enable_source(struct request 
*req
, struct request_source 
*rs
) { 
 325     assert(req
->q 
== dispatch_get_current_queue()); 
 327             rs
->suspended 
= false; 
 328             dispatch_resume(rs
->ds
); 
 332 void delete_source(struct request 
*req
, struct request_source 
*rs
) { 
 333     assert(req
->q 
== dispatch_get_current_queue()); 
 335             /* sources need to be resumed before they can be deleted 
 336               (otherwise an I/O and/or cancel block might be stranded 
 337               waiting for a resume that will never come, causing 
 340             enable_source(req
, rs
); 
 341             dispatch_cancel(rs
->ds
); 
 342             dispatch_release(rs
->ds
); 
 345     rs
->suspended 
= false; 
 348 size_t buf_into_sz(struct buffer 
*b
) { 
 349     return (b
->buf 
+ b
->sz
) - b
->into
; 
 352 void buf_need_into(struct buffer 
*b
, size_t cnt
) { 
 353     // resize buf so into has at least cnt bytes ready to use 
 354     size_t sz 
= buf_into_sz(b
); 
 358     sz 
= malloc_good_size(cnt 
- sz 
+ b
->sz
); 
 359     unsigned char *old 
= b
->buf
; 
 360     // We could special case b->buf == b->into && b->into == b->outof to 
 361     // do a free & malloc rather then realloc, but after testing it happens 
 362     // only for the 1st use of the buffer, where realloc is the same cost as 
 364     b
->buf 
= reallocf(b
->buf
, sz
); 
 367     b
->into 
= b
->buf 
+ (b
->into 
- old
); 
 368     b
->outof 
= b
->buf 
+ (b
->outof 
- old
); 
 371 void buf_used_into(struct buffer 
*b
, size_t used
) { 
 373     assert(b
->into 
<= b
->buf 
+ b
->sz
); 
 376 size_t buf_outof_sz(struct buffer 
*b
) { 
 377     return b
->into 
- b
->outof
; 
 380 int buf_sprintf(struct buffer 
*b
, char *fmt
, ...) __attribute__((format(printf
,2,3))); 
 382 int buf_sprintf(struct buffer 
*b
, char *fmt
, ...) { 
 385     size_t s 
= buf_into_sz(b
); 
 386     int l 
= vsnprintf((char *)(b
->into
), s
, fmt
, ap
); 
 390         // Reset ap -- vsnprintf has already used it. 
 395         l 
= vsnprintf((char *)(b
->into
), s
, fmt
, ap
); 
 404 void buf_used_outof(struct buffer 
*b
, size_t used
) { 
 406     //assert(b->into <= b->outof); 
 407     assert(b
->outof 
<= b
->into
); 
 408     if (b
->into 
== b
->outof
) { 
 409         b
->into 
= b
->outof 
= b
->buf
; 
 413 char *buf_debug_str(struct buffer 
*b
) { 
 415     asprintf(&ret
, "S%d i#%d o#%d", b
->sz
, buf_into_sz(b
), buf_outof_sz(b
)); 
 419 uint64_t getnanotime() { 
 421         gettimeofday(&tv
, NULL
); 
 423         return tv
.tv_sec 
* NSEC_PER_SEC 
+ tv
.tv_usec 
* NSEC_PER_USEC
; 
 427 struct request 
**debug_req
; 
 431     static last_reported 
= -1; 
 433     // We want to see the transition into n_req == 0, but we don't need to 
 435     if (n_req 
== 0 && n_req 
== last_reported
) { 
 438             last_reported 
= n_req
; 
 441     qprintf("%d actiave requests to dump\n", n_req
); 
 442     uint64_t now 
= getnanotime(); 
 443     /* Because we iterate over the debug_req array in this queue 
 444       ("the main queue"), it has to "own" that array.   All manipulation 
 445       of the array as a whole will have to be done on this queue.  */ 
 447     for(i 
= 0; i 
< n_req
; i
++) { 
 448         struct request 
*req 
= debug_req
[i
]; 
 449         qprintf("%s sources: fd_rd %p%s, sd_rd %p%s, sd_wr %p%s, timeo %p%s\n", req
->q_name
, req
->fd_rd
.ds
, req
->fd_rd
.suspended 
? " (SUSPENDED)" : "", req
->sd_rd
.ds
, req
->sd_rd
.suspended 
? " (SUSPENDED)" : "", req
->sd_wr
.ds
, req
->sd_wr
.suspended 
? " (SUSPENDED)" : "", req
->timeo
.ds
, req
->timeo
.suspended 
? " (SUSPENDED)" : ""); 
 450         if (req
->timeout_at
) { 
 451                 double when 
= req
->timeout_at 
- now
; 
 452                 when 
/= NSEC_PER_SEC
; 
 454                         qprintf("  timeout %f seconds ago\n", -when
); 
 456                         qprintf("  timeout in %f seconds\n", when
); 
 459                 qprintf("  timeout_at not set\n"); 
 461         char *file_bd 
= buf_debug_str(&req
->file_b
), *deflate_bd 
= buf_debug_str(&req
->deflate_b
); 
 462         qprintf("  file_b %s; deflate_b %s\n  cmd_buf used %ld; fd#%d; files_served %d\n", file_bd
, deflate_bd
, (long)(req
->cb 
- req
->cmd_buf
), req
->fd
, req
->files_served
); 
 464                 qprintf("  deflate total in: %ld ", req
->deflate
->total_in
); 
 466         qprintf("%s total_written %lu, file size %lld\n", req
->deflate 
? "" : " ", req
->total_written
, req
->sb
.st_size
); 
 472 void req_free(struct request 
*req
) { 
 473     assert(!req
->reuse_guard
); 
 474     if (dispatch_get_main_queue() != dispatch_get_current_queue()) { 
 475             /* dispatch_set_finalizer_f arranges to have us "invoked 
 476               asynchronously on req->q's target queue".  However, 
 477               we want to manipulate the debug_req array in ways 
 478               that are unsafe anywhere except the same queue that 
 479               dump_reqs runs on (which happens to be the main queue). 
 480               So if we are running anywhere but the main queue, we 
 481               just arrange to be called there */ 
 483             dispatch_async(dispatch_get_main_queue(), ^{ req_free(req
); }); 
 487     req
->reuse_guard 
= true; 
 489     qprintf("$$$ req_free %s; fd#%d; buf: %s\n", dispatch_queue_get_label(req
->q
), req
->fd
, req
->cmd_buf
); 
 490     assert(req
->sd_rd
.ds 
== NULL 
&& req
->sd_wr
.ds 
== NULL
); 
 492     assert(req
->fd_rd
.ds 
== NULL
); 
 493     if (req
->fd 
>= 0) close(req
->fd
); 
 494     free(req
->file_b
.buf
); 
 495     free(req
->deflate_b
.buf
); 
 502     for(i 
= 0; i 
< n_req
; i
++) { 
 504             debug_req
[i 
-1] = debug_req
[i
]; 
 506             found 
= (debug_req
[i
] == req
); 
 509     debug_req 
= reallocf(debug_req
, sizeof(struct request 
*) * --n_req
); 
 513 void close_connection(struct request 
*req
) { 
 514     qprintf("$$$ close_connection %s, served %d files -- canceling all sources\n", dispatch_queue_get_label(req
->q
), req
->files_served
); 
 515     delete_source(req
, &req
->fd_rd
); 
 516     delete_source(req
, &req
->sd_rd
); 
 517     delete_source(req
, &req
->sd_wr
); 
 518     delete_source(req
, &req
->timeo
); 
 521 // We have some "content data" (either from the file, or from 
 522 // compressing the file), and the network socket is ready for us to 
 524 void write_filedata(struct request 
*req
, size_t avail
) { 
 525     /* We always attempt to write as much data as we have.   This 
 526      is safe becuase we use non-blocking I/O.   It is a good idea 
 527      becuase the amount of buffer space that dispatch tells us may 
 528      be stale (more space could have opened up, or memory presure 
 529      may have caused it to go down). */ 
 531     struct buffer 
*w_buf 
= req
->deflate 
? &req
->deflate_b 
: &req
->file_b
; 
 532     ssize_t sz 
= buf_outof_sz(w_buf
); 
 535         if (!req
->chunk_bytes_remaining
) { 
 536             req
->chunk_bytes_remaining 
= sz
; 
 537             req
->needs_zero_chunk 
= sz 
!= 0; 
 538             req
->cnp 
= req
->chunk_num
; 
 539             int n 
= snprintf(req
->chunk_num
, sizeof(req
->chunk_num
), "\r\n%lx\r\n%s", sz
, sz 
? "" : "\r\n"); 
 540             assert(n 
<= sizeof(req
->chunk_num
)); 
 542         iov
[0].iov_base 
= req
->cnp
; 
 543         iov
[0].iov_len 
= req
->cnp 
? strlen(req
->cnp
) : 0; 
 544         iov
[1].iov_base 
= w_buf
->outof
; 
 545         iov
[1].iov_len 
= (req
->chunk_bytes_remaining 
< sz
) ? req
->chunk_bytes_remaining 
: sz
; 
 546         sz 
= writev(req
->sd
, iov
, 2); 
 549                 if (sz 
>= strlen(req
->cnp
)) { 
 555             sz 
-= iov
[0].iov_len
; 
 556             sz 
= (sz 
< 0) ? 0 : sz
; 
 557             req
->chunk_bytes_remaining 
-= sz
; 
 560         sz 
= write(req
->sd
, w_buf
->outof
, sz
); 
 563         buf_used_outof(w_buf
, sz
); 
 566         qprintf("write_filedata %s write error: %d %s\n", dispatch_queue_get_label(req
->q
), e
, strerror(e
)); 
 567         close_connection(req
); 
 571     req
->total_written 
+= sz
; 
 572     off_t bytes 
= req
->total_written
; 
 574         bytes 
= req
->deflate
->total_in 
- buf_outof_sz(w_buf
); 
 575         if (req
->deflate
->total_in 
< buf_outof_sz(w_buf
)) { 
 579     if (bytes 
== req
->sb
.st_size
) { 
 580         if (req
->needs_zero_chunk 
&& req
->deflate 
&& (sz 
|| req
->cnp
)) { 
 584         // We have transfered the file, time to write the log entry. 
 586         // We don't deal with " in the request string, this is an example of how 
 587         // to use dispatch, not how to do C string manipulation, eh? 
 588         size_t rlen 
= strcspn(req
->cmd_buf
, "\r\n"); 
 589         char tstr
[45], astr
[45]; 
 593         strftime(tstr
, sizeof(tstr
), "%d/%b/%Y:%H:%M:%S +0", gmtime_r(&clock
, &tm
)); 
 594         addr2ascii(AF_INET
, &req
->r_addr
.sin_addr
, sizeof(struct in_addr
), astr
); 
 595         qfprintf(logfile
, "%s - - [%s] \"%.*s\" %hd %zd\n", astr
, tstr
, (int)rlen
, req
->cmd_buf
, req
->status_number
, req
->total_written
); 
 597         int64_t t_offset 
= 5 * NSEC_PER_SEC 
+ req
->files_served 
* NSEC_PER_SEC 
/ 10; 
 598         int64_t timeout_at 
= req
->timeout_at 
= getnanotime() + t_offset
; 
 600         req
->timeo
.ds 
= dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, req
->q
); 
 601         dispatch_source_set_timer(req
->timeo
.ds
, dispatch_time(DISPATCH_TIME_NOW
, t_offset
), NSEC_PER_SEC
, NSEC_PER_SEC
); 
 602         dispatch_source_set_event_handler(req
->timeo
.ds
, ^{ 
 603                         if (req
->timeout_at 
== timeout_at
) { 
 604                                 qfprintf(stderr
, "$$$ -- timeo fire (delta=%f) -- close connection: q=%s\n", (getnanotime() - (double)timeout_at
) / NSEC_PER_SEC
, dispatch_queue_get_label(req
->q
)); 
 605                                 close_connection(req
); 
 607                                 // This happens if the timeout value has been updated, but a pending timeout event manages to race in before the cancel 
 610         dispatch_resume(req
->timeo
.ds
); 
 613         qprintf("$$$ wrote whole file (%s); timeo %p, about to enable %p and close %d, total_written=%zd, this is the %d%s file served\n", dispatch_queue_get_label(req
->q
), req
->timeo
.ds
, req
->sd_rd
.ds
, req
->fd
, req
->total_written
, req
->files_served
, (1 == req
->files_served
) ? "st" : (2 == req
->files_served
) ? "nd" : "th"); 
 614         enable_source(req
, &req
->sd_rd
); 
 616                 delete_source(req
, &req
->fd_rd
); 
 618         req
->cb 
= req
->cmd_buf
; 
 620         assert(bytes 
<= req
->sb
.st_size
); 
 623     if (0 == buf_outof_sz(w_buf
)) { 
 624         // The write buffer is now empty, so we don't need to know when sd is ready for us to write to it. 
 625         disable_source(req
, &req
->sd_wr
); 
 629 // Our "content file" has some data ready for us to read. 
 630 void read_filedata(struct request 
*req
, size_t avail
) { 
 632             delete_source(req
, &req
->fd_rd
); 
 636     /* We make sure we can read at least as many bytes as dispatch 
 637       says are avilable, but if our buffer is bigger we will read as 
 638       much as we have space for.  We have the file opened in non-blocking 
 639       mode so this is safe. */ 
 641     buf_need_into(&req
->file_b
, avail
); 
 642     size_t rsz 
= buf_into_sz(&req
->file_b
); 
 643     ssize_t sz 
= read(req
->fd
, req
->file_b
.into
, rsz
); 
 645         assert(req
->sd_wr
.ds
); 
 646         size_t sz0 
= buf_outof_sz(&req
->file_b
); 
 647         buf_used_into(&req
->file_b
, sz
); 
 648         assert(sz 
== buf_outof_sz(&req
->file_b
) - sz0
); 
 651         qprintf("read_filedata %s read error: %d %s\n", dispatch_queue_get_label(req
->q
), e
, strerror(e
)); 
 652         close_connection(req
); 
 656         // Note:: deflateBound is "worst case", we could try with any non-zero 
 657         // buffer, and alloc more if we get Z_BUF_ERROR... 
 658         buf_need_into(&req
->deflate_b
, deflateBound(req
->deflate
, buf_outof_sz(&req
->file_b
))); 
 659         req
->deflate
->next_in 
= (req
->file_b
.outof
); 
 660         size_t o_sz 
= buf_outof_sz(&req
->file_b
); 
 661         req
->deflate
->avail_in 
= o_sz
; 
 662         req
->deflate
->next_out 
= req
->deflate_b
.into
; 
 663         size_t i_sz 
= buf_into_sz(&req
->deflate_b
); 
 664         req
->deflate
->avail_out 
= i_sz
; 
 665         assert(req
->deflate
->avail_in 
+ req
->deflate
->total_in 
<= req
->sb
.st_size
); 
 666         // at EOF we want to use Z_FINISH, otherwise we pass Z_NO_FLUSH so we get maximum compression 
 667         int rc 
= deflate(req
->deflate
, (req
->deflate
->avail_in 
+ req
->deflate
->total_in 
>= req
->sb
.st_size
) ? Z_FINISH 
: Z_NO_FLUSH
); 
 668         assert(rc 
== Z_OK 
|| rc 
== Z_STREAM_END
); 
 669         buf_used_outof(&req
->file_b
, o_sz 
- req
->deflate
->avail_in
); 
 670         buf_used_into(&req
->deflate_b
, i_sz 
- req
->deflate
->avail_out
); 
 671         if (i_sz 
!= req
->deflate
->avail_out
) { 
 672             enable_source(req
, &req
->sd_wr
); 
 675         enable_source(req
, &req
->sd_wr
); 
 679 // We are waiting to for an HTTP request (we eitther havn't gotten 
 680 // the first request, or pipelneing is on, and we finished a request), 
 681 // and there is data to read on the network socket. 
 682 void read_req(struct request 
*req
, size_t avail
) { 
 684         delete_source(req
, &req
->timeo
); 
 687     // -1 to account for the trailing NUL 
 688     int s 
= (sizeof(req
->cmd_buf
) - (req
->cb 
- req
->cmd_buf
)) -1; 
 690         qprintf("read_req fd#%d command overflow\n", req
->sd
); 
 691         close_connection(req
); 
 694     int rd 
= read(req
->sd
, req
->cb
, s
); 
 697         if (req
->cb 
> req
->cmd_buf 
+ 4) { 
 699             for(i 
= -4; i 
!= 0; i
++) { 
 700                 char ch 
= *(req
->cb 
+ i
); 
 701                 if (ch 
!= '\n' && ch 
!= '\r') { 
 708                 assert(buf_outof_sz(&req
->file_b
) == 0); 
 709                 assert(buf_outof_sz(&req
->deflate_b
) == 0); 
 710                 regmatch_t pmatch
[re_request_nmatch
]; 
 711                 regex_t 
*rex 
= req
->files_served 
? &re_first_request 
: &re_nth_request
; 
 712                 int rc 
= regexec(rex
, req
->cmd_buf
, re_request_nmatch
, pmatch
, 0); 
 715                     regerror(rc
, rex
, ebuf
, sizeof(ebuf
)); 
 716                     qprintf("\n$$$ regexec error: %s, ditching request: '%s'\n", ebuf
, req
->cmd_buf
); 
 717                     close_connection(req
); 
 720                     if (!strncmp("GET", req
->cmd_buf 
+ pmatch
[1].rm_so
, pmatch
[1].rm_eo 
- pmatch
[1].rm_so
)) { 
 721                         rc 
= regexec(&re_accept_deflate
, req
->cmd_buf
, 0, NULL
, 0); 
 722                         assert(rc 
== 0 || rc 
== REG_NOMATCH
); 
 723                         // to disable deflate code: 
 726                             deflateEnd(req
->deflate
); 
 729                         req
->deflate 
= (0 == rc
) ? calloc(1, sizeof(z_stream
)) : NULL
; 
 731                         strlcpy(path_buf
, DOC_BASE
, sizeof(path_buf
)); 
 732                         // WARNING: this doesn't avoid use of .. in the path 
 733                         // do get outside of DOC_ROOT, a real web server would 
 734                         // really have to avoid that. 
 735                         char ch 
= *(req
->cmd_buf 
+ pmatch
[2].rm_eo
); 
 736                         *(req
->cmd_buf 
+ pmatch
[2].rm_eo
) = '\0'; 
 737                         strlcat(path_buf
, req
->cmd_buf 
+ pmatch
[2].rm_so
, sizeof(path_buf
)); 
 738                         *(req
->cmd_buf 
+ pmatch
[2].rm_eo
) = ch
; 
 739                         req
->fd 
= open(path_buf
, O_RDONLY
|O_NONBLOCK
); 
 740                         qprintf("GET req for %s, path: %s, deflate: %p; fd#%d\n", dispatch_queue_get_label(req
->q
), path_buf
, req
->deflate
, req
->fd
); 
 743                             const char *msg 
= "<HTML><HEAD><TITLE>404 Page not here</TITLE></HEAD><BODY><P>You step in the stream,<BR>but the water has moved on.<BR>This <B>page is not here</B>.<BR></BODY></HTML>"; 
 744                             req
->status_number 
= 404; 
 745                             n 
= buf_sprintf(&req
->file_b
, "HTTP/1.1 404 Not Found\r\nContent-Length: %zu\r\nExpires: now\r\nServer: %s\r\n\r\n%s", strlen(msg
), argv0
, msg
); 
 748                             rc 
= fstat(req
->fd
, &req
->sb
); 
 750                             if (req
->sb
.st_mode 
& S_IFDIR
) { 
 751                                 req
->status_number 
= 301; 
 752                                 regmatch_t hmatch
[re_request_nmatch
]; 
 753                                 rc 
= regexec(&re_host
, req
->cmd_buf
, re_request_nmatch
, hmatch
, 0); 
 754                                 assert(rc 
== 0 || rc 
== REG_NOMATCH
); 
 755                                 if (rc 
== REG_NOMATCH
) { 
 756                                     hmatch
[1].rm_so 
= hmatch
[1].rm_eo 
= 0; 
 758                                 n 
= buf_sprintf(&req
->file_b
, "HTTP/1.1 301 Redirect\r\nContent-Length: 0\r\nExpires: now\r\nServer: %s\r\nLocation: http://%*.0s/%*.0s/index.html\r\n\r\n", argv0
, (int)(hmatch
[1].rm_eo 
- hmatch
[1].rm_so
), req
->cmd_buf 
+ hmatch
[1].rm_so
, (int)(pmatch
[2].rm_eo 
- pmatch
[2].rm_so
), req
->cmd_buf 
+ pmatch
[2].rm_so
); 
 763                                 req
->status_number 
= 200; 
 765                                     n 
= buf_sprintf(&req
->deflate_b
, "HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\nContent-Encoding: deflate\r\nExpires: now\r\nServer: %s\r\n", argv0
); 
 766                                     req
->chunk_bytes_remaining 
= buf_outof_sz(&req
->deflate_b
); 
 768                                     n 
= buf_sprintf(req
->deflate 
? &req
->deflate_b 
: &req
->file_b
, "HTTP/1.1 200 OK\r\nContent-Length: %lld\r\nExpires: now\r\nServer: %s\r\n\r\n", req
->sb
.st_size
, argv0
); 
 773                         if (req
->status_number 
!= 200) { 
 779                             rc 
= deflateInit(req
->deflate
, Z_BEST_COMPRESSION
); 
 783                         // Cheat: we don't count the header bytes as part of total_written 
 784                         req
->total_written 
= -buf_outof_sz(&req
->file_b
); 
 786                             req
->fd_rd
.ds 
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, req
->fd
, 0, req
->q
); 
 787                             // Cancelation is async, so we capture the fd and read sources we will want to operate on as the req struct may have moved on to a new set of values 
 789                             dispatch_source_t fd_rd 
= req
->fd_rd
.ds
; 
 790                             dispatch_source_set_cancel_handler(req
->fd_rd
.ds
, ^{ 
 795                                     if (req
->fd_rd
.ds 
== fd_rd
) { 
 796                                             req
->fd_rd
.ds 
= NULL
; 
 799                             dispatch_source_set_event_handler(req
->fd_rd
.ds
, ^{ 
 801                                             read_filedata(req
, dispatch_source_get_data(req
->fd_rd
.ds
));  
 804                             dispatch_resume(req
->fd_rd
.ds
); 
 806                             req
->fd_rd
.ds 
= NULL
; 
 810                             enable_source(req
, &req
->sd_wr
); 
 812                             req
->sd_wr
.ds 
= dispatch_source_create(DISPATCH_SOURCE_TYPE_WRITE
, req
->sd
, 0, req
->q
); 
 813                             dispatch_source_set_event_handler(req
->sd_wr
.ds
, ^{ write_filedata(req
, dispatch_source_get_data(req
->sd_wr
.ds
)); }); 
 814                             dispatch_resume(req
->sd_wr
.ds
); 
 816                         disable_source(req
, &req
->sd_rd
); 
 821     } else if (rd 
== 0) { 
 822         qprintf("### (%s) read_req fd#%d rd=0 (%s); %d files served\n", dispatch_queue_get_label(req
->q
), req
->sd
, (req
->cb 
== req
->cmd_buf
) ? "no final request" : "incomplete request", req
->files_served
); 
 823         close_connection(req
); 
 827         qprintf("reqd_req fd#%d rd=%d err=%d %s\n", req
->sd
, rd
, e
, strerror(e
)); 
 828         close_connection(req
); 
 833 // We have a new connection, allocate a req struct & set up a read event handler 
 834 void accept_cb(int fd
) { 
 835     static int req_num 
= 0; 
 836     struct request 
*new_req 
= calloc(1, sizeof(struct request
)); 
 838     new_req
->cb 
= new_req
->cmd_buf
; 
 839     socklen_t r_len 
= sizeof(new_req
->r_addr
); 
 840     int s 
= accept(fd
, (struct sockaddr 
*)&(new_req
->r_addr
), &r_len
); 
 842             qfprintf(stderr
, "accept failure (rc=%d, errno=%d %s)\n", s
, errno
, strerror(errno
)); 
 847     new_req
->req_num 
= req_num
; 
 848     asprintf(&(new_req
->q_name
), "req#%d s#%d", req_num
++, s
); 
 849     qprintf("accept_cb fd#%d; made: %s\n", fd
, new_req
->q_name
); 
 851     // All further work for this request will happen "on" new_req->q, 
 852     // except the final tear down (see req_free()) 
 853     new_req
->q 
= dispatch_queue_create(new_req
->q_name
, NULL
); 
 854     dispatch_set_context(new_req
->q
, new_req
); 
 855     dispatch_set_finalizer_f(new_req
->q
, (dispatch_function_t
)req_free
); 
 857     debug_req 
= reallocf(debug_req
, sizeof(struct request 
*) * ++n_req
); 
 858     debug_req
[n_req 
-1] = new_req
; 
 861     new_req
->sd_rd
.ds 
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, new_req
->sd
, 0, new_req
->q
); 
 862     dispatch_source_set_event_handler(new_req
->sd_rd
.ds
, ^{ 
 863             read_req(new_req
, dispatch_source_get_data(new_req
->sd_rd
.ds
)); 
 866     // We want our queue to go away when all of it's sources do, so we 
 867     // drop the reference dispatch_queue_create gave us & rely on the 
 868     // references each source holds on the queue to keep it alive. 
 869     dispatch_release(new_req
->q
); 
 870     dispatch_resume(new_req
->sd_rd
.ds
); 
 873 int main(int argc
, char *argv
[]) { 
 874     int sock 
= socket(AF_INET
, SOCK_STREAM
, IPPROTO_TCP
); 
 877     struct addrinfo ai_hints
, *my_addr
; 
 879     qpf 
= dispatch_queue_create("printf", NULL
); 
 881     argv0 
= basename(argv
[0]); 
 882     struct passwd 
*pw 
= getpwuid(getuid()); 
 884     asprintf(&DOC_BASE
, "%s/Sites/", pw
->pw_dir
); 
 885     asprintf(&log_name
, "%s/Library/Logs/%s-transfer.log", pw
->pw_dir
, argv0
); 
 886     logfile 
= fopen(log_name
, "a"); 
 887     reopen_logfile_when_needed(logfile
, log_name
); 
 889     bzero(&ai_hints
, sizeof(ai_hints
)); 
 890     ai_hints
.ai_flags 
= AI_PASSIVE
; 
 891     ai_hints
.ai_family 
= PF_INET
; 
 892     ai_hints
.ai_socktype 
= SOCK_STREAM
; 
 893     ai_hints
.ai_protocol 
= IPPROTO_TCP
; 
 894     rc 
= getaddrinfo(NULL
, server_port
, &ai_hints
, &my_addr
); 
 897     qprintf("Serving content from %s on port %s, logging transfers to %s\n", DOC_BASE
, server_port
, log_name
); 
 900     rc 
= setsockopt(sock
, SOL_SOCKET
, SO_REUSEADDR
, &yes
, sizeof(yes
)); 
 903     rc 
= setsockopt(sock
, SOL_SOCKET
, SO_REUSEPORT
, &yes
, sizeof(yes
)); 
 906     rc 
= bind(sock
, my_addr
->ai_addr
, my_addr
->ai_addr
->sa_len
); 
 909     rc 
= listen(sock
, 25); 
 912     rc 
= regcomp(&re_first_request
, "^([A-Z]+)[ \t]+([^ \t\n]+)[ \t]+HTTP/1\\.1[\r\n]+", REG_EXTENDED
); 
 915     rc 
= regcomp(&re_nth_request
, "^([A-Z]+)[ \t]+([^ \t\n]+)([ \t]+HTTP/1\\.1)?[\r\n]+", REG_EXTENDED
); 
 918     rc 
= regcomp(&re_accept_deflate
, "[\r\n]+Accept-Encoding:(.*,)? *deflate[,\r\n]+", REG_EXTENDED
); 
 921     rc 
= regcomp(&re_host
, "[\r\n]+Host: *([^ \r\n]+)[ \r\n]+", REG_EXTENDED
); 
 924     dispatch_source_t accept_ds 
= dispatch_source_create(DISPATCH_SOURCE_TYPE_READ
, sock
, 0, dispatch_get_main_queue()); 
 925     dispatch_source_set_event_handler(accept_ds
, ^{ accept_cb(sock
); }); 
 927     dispatch_resume(accept_ds
); 
 931     sigaddset(&sigs
, SIGINFO
); 
 932     sigaddset(&sigs
, SIGPIPE
); 
 935     for(s 
= 0; s 
< NSIG
; s
++) { 
 936             if (sigismember(&sigs
, s
)) { 
 937                     dispatch_source_t sig_ds 
= dispatch_source_create(DISPATCH_SOURCE_TYPE_SIGNAL
, s
, 0, dispatch_get_main_queue()); 
 939                     dispatch_source_set_event_handler(sig_ds
, ^{ dump_reqs(); }); 
 940                     dispatch_resume(sig_ds
); 
 944     rc 
= sigprocmask(SIG_BLOCK
, &sigs
, NULL
); 
 947     dispatch_source_t dump_timer_ds 
= dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER
, 0, 0, dispatch_get_main_queue()); 
 948     dispatch_source_set_timer(dump_timer_ds
, DISPATCH_TIME_NOW
, 5 * NSEC_PER_SEC
, NSEC_PER_SEC
); 
 949     dispatch_source_set_event_handler(dump_timer_ds
, ^{ dump_reqs(); }); 
 950     dispatch_resume(dump_timer_ds
); 
 953     printf("dispatch_main returned\n");