]> git.saurik.com Git - apple/mdnsresponder.git/blob - ServiceRegistration/ioloop.c
mDNSResponder-1096.0.2.tar.gz
[apple/mdnsresponder.git] / ServiceRegistration / ioloop.c
1 /* dispatch.c
2 *
3 * Copyright (c) 2018 Apple Computer, Inc. All rights reserved.
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 * Simple event dispatcher for DNS.
18 */
19
20 #define __APPLE_USE_RFC_3542
21
22 #include <stdlib.h>
23 #include <string.h>
24 #include <stdio.h>
25 #include <unistd.h>
26 #include <sys/uio.h>
27 #include <sys/errno.h>
28 #include <sys/socket.h>
29 #include <netinet/in.h>
30 #include <arpa/inet.h>
31 #include <sys/event.h>
32 #include <fcntl.h>
33 #include <sys/time.h>
34
35 #include "srp.h"
36 #include "dns-msg.h"
37 #include "srp-crypto.h"
38 #include "ioloop.h"
39 #include "dnssd-proxy.h"
40
41 #define USE_SELECT
42 #pragma mark Globals
43 io_t *ios;
44 int64_t ioloop_now;
45
46 #ifdef USE_KQUEUE
47 int kq;
48 #endif
49
50 int
51 getipaddr(addr_t *addr, const char *p)
52 {
53 if (inet_pton(AF_INET, p, &addr->sin.sin_addr)) {
54 addr->sa.sa_family = AF_INET;
55 return sizeof addr->sin;
56 } else if (inet_pton(AF_INET6, p, &addr->sin6.sin6_addr)) {
57 addr->sa.sa_family = AF_INET6;
58 return sizeof addr->sin6;
59 } else {
60 return 0;
61 }
62 }
63
64 int64_t
65 ioloop_timenow()
66 {
67 int64_t now;
68 struct timeval tv;
69 gettimeofday(&tv, 0);
70 now = (int64_t)tv.tv_sec * 1000 + (int64_t)tv.tv_usec / 1000;
71 return now;
72 }
73
74 message_t *
75 message_allocate(size_t message_size)
76 {
77 message_t *message = (message_t *)malloc(message_size + (sizeof (message_t)) - (sizeof (dns_wire_t)));
78 if (message)
79 memset(message, 0, (sizeof (message_t)) - (sizeof (dns_wire_t)));
80 return message;
81 }
82
83 void
84 message_free(message_t *message)
85 {
86 free(message);
87 }
88
89 void
90 comm_free(comm_t *comm)
91 {
92 if (comm->name) {
93 free(comm->name);
94 comm->name = NULL;
95 }
96 if (comm->message) {
97 message_free(comm->message);
98 comm->message = NULL;
99 comm->buf = NULL;
100 }
101 free(comm);
102 }
103
104 void
105 ioloop_close(io_t *io)
106 {
107 close(io->sock);
108 io->sock = -1;
109 }
110
111 void
112 add_reader(io_t *io, io_callback_t callback, io_callback_t finalize)
113 {
114 io->next = ios;
115 ios = io;
116 io->read_callback = callback;
117 io->finalize = finalize;
118 #ifdef USE_SELECT
119 io->want_read = true;
120 #endif
121 #ifdef USE_EPOLL
122 #endif
123 #ifdef USE_KQUEUE
124 struct kevent ev;
125 int rv;
126 EV_SET(&ev, io->sock, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, io);
127 rv = kevent(kq, &ev, 1, NULL, 0, NULL);
128 if (rv < 0) {
129 ERROR("kevent add: %s", strerror(errno));
130 return;
131 }
132 #endif // USE_EPOLL
133 }
134
135 bool
136 ioloop_init(void)
137 {
138 signal(SIGPIPE, SIG_IGN); // because why ever?
139 #ifdef USE_KQUEUE
140 kq = kqueue();
141 if (kq < 0) {
142 ERROR("kqueue(): %s", strerror(errno));
143 return false;
144 }
145 #endif
146 return true;
147 }
148
149 int
150 ioloop_events(int64_t timeout_when)
151 {
152 io_t *io, **iop;
153 int nev = 0, rv;
154 int64_t now = ioloop_timenow();
155 int64_t next_event = timeout_when;
156 int64_t timeout = 0;
157
158 INFO("%qd.%03qd seconds have passed on entry to ioloop_events", (now - ioloop_now) / 1000, (now - ioloop_now) % 1000);
159 ioloop_now = now;
160
161 // A timeout of zero means don't time out.
162 if (timeout_when == 0) {
163 next_event = INT64_MAX;
164 } else {
165 next_event = timeout_when;
166 }
167
168 #ifdef USE_SELECT
169 int nfds = 0;
170 fd_set reads, writes, errors;
171 struct timeval tv;
172
173 FD_ZERO(&reads);
174 FD_ZERO(&writes);
175 FD_ZERO(&errors);
176
177 #endif
178 iop = &ios;
179 while (*iop) {
180 io = *iop;
181 if (io->sock != -1 && io->wakeup_time != 0) {
182 if (io->wakeup_time <= ioloop_now) {
183 io->wakeup_time = 0;
184 io->wakeup(io);
185 ++nev;
186 } else if (io->wakeup_time < next_event) {
187 next_event = io->wakeup_time;
188 }
189 }
190
191 if (io->sock == -1) {
192 *iop = io->next;
193 if (io->finalize) {
194 io->finalize(io);
195 } else {
196 free(io);
197 }
198 continue;
199 }
200
201 // INFO("now: %qd io %d wakeup_time %qd next_event %qd", ioloop_now, io->sock, io->wakeup_time, next_event);
202
203 // If we were given a timeout in the future, or told to wait indefinitely, wait until the next event.
204 if (timeout_when == 0 || timeout_when > ioloop_now) {
205 timeout = next_event - ioloop_now;
206 // Don't choose a time so far in the future that it might overflow some math in the kernel.
207 if (timeout > IOLOOP_DAY * 100) {
208 timeout = IOLOOP_DAY * 100;
209 }
210 #ifdef USE_SELECT
211 tv.tv_sec = timeout / 1000;
212 tv.tv_usec = (timeout % 1000) * 1000;
213 #endif
214 #ifdef USE_KQUEUE
215 ts.tv_sec = timeout / 1000;
216 ts.tv_nsec = (timeout % 1000) * 1000 * 1000;
217 #endif
218 }
219 iop = &io->next;
220 }
221
222 #ifdef USE_SELECT
223 for (io = ios; io; io = io->next) {
224 if (io->sock != -1 && (io->want_read || io->want_write)) {
225 if (io->sock >= nfds) {
226 nfds = io->sock + 1;
227 }
228 if (io->want_read) {
229 FD_SET(io->sock, &reads);
230 }
231 if (io->want_write) {
232 FD_SET(io->sock, &writes);
233 }
234 }
235 }
236 #endif
237
238 #ifdef USE_SELECT
239 INFO("waiting %ld %d seconds", tv.tv_sec, tv.tv_usec);
240 rv = select(nfds, &reads, &writes, &writes, &tv);
241 if (rv < 0) {
242 ERROR("select: %s", strerror(errno));
243 exit(1);
244 }
245 now = ioloop_timenow();
246 INFO("%qd.%03qd seconds passed waiting, got %d events", (now - ioloop_now) / 1000, (now - ioloop_now) % 1000, rv);
247 ioloop_now = now;
248 for (io = ios; io; io = io->next) {
249 if (io->sock != -1) {
250 if (FD_ISSET(io->sock, &reads)) {
251 io->read_callback(io);
252 } else if (FD_ISSET(io->sock, &writes)) {
253 io->write_callback(io);
254 }
255 }
256 }
257 nev += rv;
258 #endif // USE_SELECT
259 #ifdef USE_KQUEUE
260 #define KEV_MAX 20
261 struct kevent evs[KEV_MAX];
262 int i, rv;
263 struct timespec ts;
264
265 INFO("waiting %qd/%qd seconds", ts.tv_sec, ts.tv_nsec);
266 do {
267 rv = kevent(kq, NULL, 0, evs, KEV_MAX, &ts);
268 now = ioloop_timenow();
269 INFO("%qd.%03qd seconds passed waiting, got %d events", (now - ioloop_now) / 1000, (now - ioloop_now) % 1000, rv);
270 ioloop_now = now;
271 ts.tv_sec = 0;
272 ts.tv_nsec = 0;
273 if (rv < 0) {
274 ERROR("kevent poll: %s", strerror(errno));
275 exit(1);
276 }
277 for (i = 0; i < nev; i++) {
278 io = evs[i].udata;
279 if (evs[i].filter == EVFILT_WRITE) {
280 io->write_callback(io);
281 } else if (evs[i].filter == EVFILT_READ) {
282 io->read_callback(io);
283 }
284 }
285 nev += rv;
286 } while (rv == KEV_MAX);
287 #endif
288 return nev;
289 }
290
291 static void
292 udp_read_callback(io_t *io)
293 {
294 comm_t *connection = (comm_t *)io;
295 addr_t src;
296 int rv;
297 struct msghdr msg;
298 struct iovec bufp;
299 uint8_t msgbuf[DNS_MAX_UDP_PAYLOAD];
300 char cmsgbuf[128];
301 struct cmsghdr *cmh;
302 message_t *message;
303
304 bufp.iov_base = msgbuf;
305 bufp.iov_len = DNS_MAX_UDP_PAYLOAD;
306 msg.msg_iov = &bufp;
307 msg.msg_iovlen = 1;
308 msg.msg_name = &src;
309 msg.msg_namelen = sizeof src;
310 msg.msg_control = cmsgbuf;
311 msg.msg_controllen = sizeof cmsgbuf;
312
313 rv = recvmsg(connection->io.sock, &msg, 0);
314 if (rv < 0) {
315 ERROR("udp_read_callback: %s", strerror(errno));
316 return;
317 }
318 message = message_allocate(rv);
319 if (!message) {
320 ERROR("udp_read_callback: out of memory");
321 return;
322 }
323 memcpy(&message->src, &src, sizeof src);
324 message->length = rv;
325 memcpy(&message->wire, msgbuf, rv);
326
327 // For UDP, we use the interface index as part of the validation strategy, so go get
328 // the interface index.
329 for (cmh = CMSG_FIRSTHDR(&msg); cmh; cmh = CMSG_NXTHDR(&msg, cmh)) {
330 if (cmh->cmsg_level == IPPROTO_IPV6 && cmh->cmsg_type == IPV6_PKTINFO) {
331 struct in6_pktinfo pktinfo;
332
333 memcpy(&pktinfo, CMSG_DATA(cmh), sizeof pktinfo);
334 message->ifindex = pktinfo.ipi6_ifindex;
335 } else if (cmh->cmsg_level == IPPROTO_IP && cmh->cmsg_type == IP_PKTINFO) {
336 struct in_pktinfo pktinfo;
337
338 memcpy(&pktinfo, CMSG_DATA(cmh), sizeof pktinfo);
339 message->ifindex = pktinfo.ipi_ifindex;
340 }
341 }
342 connection->message = message;
343 connection->datagram_callback(connection);
344 }
345
346 static void
347 tcp_read_callback(io_t *context)
348 {
349 comm_t *connection = (comm_t *)context;
350 int rv;
351 if (connection->message_length_len < 2) {
352 rv = read(connection->io.sock, &connection->message_length_bytes[connection->message_length_len],
353 2 - connection->message_length_len);
354 if (rv < 0) {
355 read_error:
356 ERROR("tcp_read_callback: %s", strerror(errno));
357 close(connection->io.sock);
358 connection->io.sock = -1;
359 // connection->io.finalize() will be called from the io loop.
360 return;
361 }
362 // If we read zero here, the remote endpoint has closed or shutdown the connection. Either case is
363 // effectively the same--if we are sensitive to read events, that means that we are done processing
364 // the previous message.
365 if (rv == 0) {
366 eof:
367 ERROR("tcp_read_callback: remote end (%s) closed connection on %d", connection->name, connection->io.sock);
368 close(connection->io.sock);
369 connection->io.sock = -1;
370 // connection->io.finalize() will be called from the io loop.
371 return;
372 }
373 connection->message_length_len += rv;
374 if (connection->message_length_len == 2) {
375 connection->message_length = (((uint16_t)connection->message_length_bytes[0] << 8) |
376 ((uint16_t)connection->message_length_bytes[1]));
377 }
378 return;
379 }
380
381 // If we only just got the length, we need to allocate a message
382 if (connection->message == NULL) {
383 connection->message = message_allocate(connection->message_length);
384 if (!connection->message) {
385 ERROR("udp_read_callback: out of memory");
386 return;
387 }
388 connection->buf = (uint8_t *)&connection->message->wire;
389 connection->message->length = connection->message_length;
390 memset(&connection->message->src, 0, sizeof connection->message->src);
391 }
392
393 rv = read(connection->io.sock, &connection->buf[connection->message_cur],
394 connection->message_length - connection->message_cur);
395 if (rv < 0) {
396 goto read_error;
397 }
398 if (rv == 0) {
399 goto eof;
400 }
401
402 connection->message_cur += rv;
403 if (connection->message_cur == connection->message_length) {
404 connection->datagram_callback(connection);
405 // Caller is expected to consume the message, we are immediately ready for the next read.
406 connection->message_length = connection->message_length_len = 0;
407 }
408 }
409
410 static void
411 tcp_send_response(comm_t *comm, message_t *responding_to, struct iovec *iov, int iov_len)
412 {
413 struct msghdr mh;
414 struct iovec iovec[4];
415 char lenbuf[2];
416 ssize_t status;
417 size_t payload_length = 0;
418 int i;
419
420 // We don't anticipate ever needing more than four hunks, but if we get more, handle then?
421 if (iov_len > 3) {
422 ERROR("tcp_send_response: too many io buffers");
423 close(comm->io.sock);
424 comm->io.sock = -1;
425 return;
426 }
427
428 iovec[0].iov_base = &lenbuf[0];
429 iovec[0].iov_len = 2;
430 for (i = 0; i < iov_len; i++) {
431 iovec[i + 1] = iov[i];
432 payload_length += iov[i].iov_len;
433 }
434 lenbuf[0] = payload_length / 256;
435 lenbuf[1] = payload_length & 0xff;
436 payload_length += 2;
437
438 memset(&mh, 0, sizeof mh);
439 mh.msg_iov = &iovec[0];
440 mh.msg_iovlen = iov_len + 1;
441 mh.msg_name = 0;
442
443 #ifndef MSG_NOSIGNAL
444 #define MSG_NOSIGNAL 0
445 #endif
446 status = sendmsg(comm->io.sock, &mh, MSG_NOSIGNAL);
447 if (status < 0 || status != payload_length) {
448 if (status < 0) {
449 ERROR("tcp_send_response: write failed: %s", strerror(errno));
450 } else {
451 ERROR("tcp_send_response: short write (%zd out of %zu bytes)", status, payload_length);
452 }
453 close(comm->io.sock);
454 comm->io.sock = -1;
455 }
456 }
457
458 static void
459 udp_send_response(comm_t *comm, message_t *responding_to, struct iovec *iov, int iov_len)
460 {
461 struct msghdr mh;
462 memset(&mh, 0, sizeof mh);
463 mh.msg_iov = iov;
464 mh.msg_iovlen = iov_len;
465 mh.msg_name = &responding_to->src;
466 if (responding_to->src.sa.sa_family == AF_INET) {
467 mh.msg_namelen = sizeof (struct sockaddr_in);
468 } else if (responding_to->src.sa.sa_family == AF_INET6) {
469 mh.msg_namelen = sizeof (struct sockaddr_in6);
470 } else {
471 ERROR("send_udp_response: unknown family %d", responding_to->src.sa.sa_family);
472 abort();
473 }
474 sendmsg(comm->io.sock, &mh, 0);
475 }
476
477 // When a communication is closed, scan the io event list to see if any other ios are referencing this one.
478 void
479 comm_finalize(io_t *io_in) {
480 io_t *io;
481
482 for (io = ios; io; io = io->next) {
483 if (io->cancel_on_close == io_in && io->cancel != NULL) {
484 io->cancel(io);
485 }
486 }
487 }
488
489 static void
490 listen_callback(io_t *context)
491 {
492 comm_t *listener = (comm_t *)context;
493 int rv;
494 addr_t addr;
495 socklen_t addr_len = sizeof addr;
496 comm_t *comm;
497 char addrbuf[INET6_ADDRSTRLEN + 7];
498 int addrlen;
499
500 rv = accept(listener->io.sock, &addr.sa, &addr_len);
501 if (rv < 0) {
502 ERROR("accept: %s", strerror(errno));
503 close(listener->io.sock);
504 listener->io.sock = -1;
505 return;
506 }
507 inet_ntop(addr.sa.sa_family, (addr.sa.sa_family == AF_INET
508 ? (void *)&addr.sin.sin_addr
509 : (void *)&addr.sin6.sin6_addr), addrbuf, sizeof addrbuf);
510 addrlen = strlen(addrbuf);
511 snprintf(&addrbuf[addrlen], (sizeof addrbuf) - addrlen, "%%%d",
512 (addr.sa.sa_family == AF_INET ? addr.sin.sin_port : addr.sin6.sin6_port));
513 comm = calloc(1, sizeof *comm);
514 comm->name = strdup(addrbuf);
515 comm->io.sock = rv;
516 comm->address = addr;
517 comm->datagram_callback = listener->datagram_callback;
518 comm->send_response = tcp_send_response;
519 comm->tcp_stream = true;
520
521 if (listener->connected) {
522 listener->connected(comm);
523 }
524 add_reader(&comm->io, tcp_read_callback, NULL);
525 comm->io.finalize = comm_finalize;
526
527 #ifdef SO_NOSIGPIPE
528 int one = 1;
529 rv = setsockopt(comm->io.sock, SOL_SOCKET, SO_NOSIGPIPE, &one, sizeof one);
530 if (rv < 0) {
531 ERROR("SO_NOSIGPIPE failed: %s", strerror(errno));
532 }
533 #endif
534 }
535
536 comm_t *
537 setup_listener_socket(int family, int protocol, uint16_t port, const char *name,
538 comm_callback_t datagram_callback,
539 comm_callback_t connected, void *context)
540 {
541 comm_t *listener;
542 socklen_t sl;
543 int rv;
544 int flag = 1;
545
546 listener = calloc(1, sizeof *listener);
547 if (listener == NULL) {
548 return NULL;
549 }
550 listener->name = strdup(name);
551 if (!listener->name) {
552 free(listener);
553 return NULL;
554 }
555 listener->io.sock = socket(family, protocol == IPPROTO_UDP ? SOCK_DGRAM : SOCK_STREAM, protocol);
556 if (listener->io.sock < 0) {
557 ERROR("Can't get socket: %s", strerror(errno));
558 comm_free(listener);
559 return NULL;
560 }
561 rv = setsockopt(listener->io.sock, SOL_SOCKET, SO_REUSEPORT, &flag, sizeof flag);
562 if (rv < 0) {
563 ERROR("SO_REUSEPORT failed: %s", strerror(errno));
564 comm_free(listener);
565 return NULL;
566 }
567
568 if (family == AF_INET) {
569 sl = sizeof listener->address.sin;
570 listener->address.sin.sin_port = port ? port : htons(53);
571 } else {
572 sl = sizeof listener->address.sin6;
573 listener->address.sin6.sin6_port = port ? port : htons(53);
574 }
575 listener->address.sa.sa_family = family;
576 listener->address.sa.sa_len = sl;
577 if (bind(listener->io.sock, &listener->address.sa, sl) < 0) {
578 ERROR("Can't bind to 0#53/%s%s: %s",
579 protocol == IPPROTO_UDP ? "udp" : "tcp", family == AF_INET ? "v4" : "v6",
580 strerror(errno));
581 out:
582 close(listener->io.sock);
583 free(listener);
584 return NULL;
585 }
586
587 if (protocol == IPPROTO_TCP) {
588 if (listen(listener->io.sock, 5 /* xxx */) < 0) {
589 ERROR("Can't listen on 0#53/%s%s: %s.",
590 protocol == IPPROTO_UDP ? "udp" : "tcp", family == AF_INET ? "v4" : "v6",
591 strerror(errno));
592 goto out;
593 }
594 add_reader(&listener->io, listen_callback, NULL);
595 } else {
596 rv = setsockopt(listener->io.sock, family == AF_INET ? IPPROTO_IP : IPPROTO_IPV6,
597 family == AF_INET ? IP_PKTINFO : IPV6_RECVPKTINFO, &flag, sizeof flag);
598 if (rv < 0) {
599 ERROR("Can't set %s: %s.", family == AF_INET ? "IP_PKTINFO" : "IPV6_RECVPKTINFO",
600 strerror(errno));
601 goto out;
602 }
603 add_reader(&listener->io, udp_read_callback, NULL);
604 listener->send_response = udp_send_response;
605 }
606 listener->datagram_callback = datagram_callback;
607 listener->connected = connected;
608 return listener;
609 }
610
611 // Local Variables:
612 // mode: C
613 // tab-width: 4
614 // c-file-style: "bsd"
615 // c-basic-offset: 4
616 // fill-column: 108
617 // indent-tabs-mode: nil
618 // End: