FreeBSD kernel kern code
kern_alq.c
Go to the documentation of this file.
1/*-
2 * SPDX-License-Identifier: BSD-2-Clause-FreeBSD
3 *
4 * Copyright (c) 2002, Jeffrey Roberson <jeff@freebsd.org>
5 * Copyright (c) 2008-2009, Lawrence Stewart <lstewart@freebsd.org>
6 * Copyright (c) 2009-2010, The FreeBSD Foundation
7 * All rights reserved.
8 *
9 * Portions of this software were developed at the Centre for Advanced
10 * Internet Architectures, Swinburne University of Technology, Melbourne,
11 * Australia by Lawrence Stewart under sponsorship from the FreeBSD Foundation.
12 *
13 * Redistribution and use in source and binary forms, with or without
14 * modification, are permitted provided that the following conditions
15 * are met:
16 * 1. Redistributions of source code must retain the above copyright
17 * notice unmodified, this list of conditions, and the following
18 * disclaimer.
19 * 2. Redistributions in binary form must reproduce the above copyright
20 * notice, this list of conditions and the following disclaimer in the
21 * documentation and/or other materials provided with the distribution.
22 *
23 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
24 * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
25 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
26 * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
28 * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
29 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
30 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
31 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
32 * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
33 */
34
35#include <sys/cdefs.h>
36__FBSDID("$FreeBSD$");
37
38#include "opt_mac.h"
39
40#include <sys/param.h>
41#include <sys/systm.h>
42#include <sys/kernel.h>
43#include <sys/kthread.h>
44#include <sys/lock.h>
45#include <sys/mount.h>
46#include <sys/mutex.h>
47#include <sys/namei.h>
48#include <sys/proc.h>
49#include <sys/vnode.h>
50#include <sys/alq.h>
51#include <sys/malloc.h>
52#include <sys/unistd.h>
53#include <sys/fcntl.h>
54#include <sys/eventhandler.h>
55
56#include <security/mac/mac_framework.h>
57
58/* Async. Logging Queue */
59struct alq {
60 char *aq_entbuf; /* Buffer for stored entries */
61 int aq_entmax; /* Max entries */
62 int aq_entlen; /* Entry length */
63 int aq_freebytes; /* Bytes available in buffer */
64 int aq_buflen; /* Total length of our buffer */
65 int aq_writehead; /* Location for next write */
66 int aq_writetail; /* Flush starts at this location */
67 int aq_wrapearly; /* # bytes left blank at end of buf */
68 int aq_flags; /* Queue flags */
69 int aq_waiters; /* Num threads waiting for resources
70 * NB: Used as a wait channel so must
71 * not be first field in the alq struct
72 */
73 struct ale aq_getpost; /* ALE for use by get/post */
74 struct mtx aq_mtx; /* Queue lock */
75 struct vnode *aq_vp; /* Open vnode handle */
76 struct ucred *aq_cred; /* Credentials of the opening thread */
77 LIST_ENTRY(alq) aq_act; /* List of active queues */
78 LIST_ENTRY(alq) aq_link; /* List of all queues */
79};
80
81#define AQ_WANTED 0x0001 /* Wakeup sleeper when io is done */
82#define AQ_ACTIVE 0x0002 /* on the active list */
83#define AQ_FLUSHING 0x0004 /* doing IO */
84#define AQ_SHUTDOWN 0x0008 /* Queue no longer valid */
85#define AQ_ORDERED 0x0010 /* Queue enforces ordered writes */
86#define AQ_LEGACY 0x0020 /* Legacy queue (fixed length writes) */
87
88#define ALQ_LOCK(alq) mtx_lock_spin(&(alq)->aq_mtx)
89#define ALQ_UNLOCK(alq) mtx_unlock_spin(&(alq)->aq_mtx)
90
91#define HAS_PENDING_DATA(alq) ((alq)->aq_freebytes != (alq)->aq_buflen)
92
93static MALLOC_DEFINE(M_ALD, "ALD", "ALD");
94
95/*
96 * The ald_mtx protects the ald_queues list and the ald_active list.
97 */
98static struct mtx ald_mtx;
99static LIST_HEAD(, alq) ald_queues;
100static LIST_HEAD(, alq) ald_active;
101static int ald_shutingdown = 0;
102struct thread *ald_thread;
103static struct proc *ald_proc;
104static eventhandler_tag alq_eventhandler_tag = NULL;
105
106#define ALD_LOCK() mtx_lock(&ald_mtx)
107#define ALD_UNLOCK() mtx_unlock(&ald_mtx)
108
109/* Daemon functions */
110static int ald_add(struct alq *);
111static int ald_rem(struct alq *);
112static void ald_startup(void *);
113static void ald_daemon(void);
114static void ald_shutdown(void *, int);
115static void ald_activate(struct alq *);
116static void ald_deactivate(struct alq *);
117
118/* Internal queue functions */
119static void alq_shutdown(struct alq *);
120static void alq_destroy(struct alq *);
121static int alq_doio(struct alq *);
122
123/*
124 * Add a new queue to the global list. Fail if we're shutting down.
125 */
126static int
127ald_add(struct alq *alq)
128{
129 int error;
130
131 error = 0;
132
133 ALD_LOCK();
134 if (ald_shutingdown) {
135 error = EBUSY;
136 goto done;
137 }
138 LIST_INSERT_HEAD(&ald_queues, alq, aq_link);
139done:
140 ALD_UNLOCK();
141 return (error);
142}
143
144/*
145 * Remove a queue from the global list unless we're shutting down. If so,
146 * the ald will take care of cleaning up it's resources.
147 */
148static int
150{
151 int error;
152
153 error = 0;
154
155 ALD_LOCK();
156 if (ald_shutingdown) {
157 error = EBUSY;
158 goto done;
159 }
160 LIST_REMOVE(alq, aq_link);
161done:
162 ALD_UNLOCK();
163 return (error);
164}
165
166/*
167 * Put a queue on the active list. This will schedule it for writing.
168 */
169static void
171{
172 LIST_INSERT_HEAD(&ald_active, alq, aq_act);
173 wakeup(&ald_active);
174}
175
176static void
178{
179 LIST_REMOVE(alq, aq_act);
180 alq->aq_flags &= ~AQ_ACTIVE;
181}
182
183static void
184ald_startup(void *unused)
185{
186 mtx_init(&ald_mtx, "ALDmtx", NULL, MTX_DEF|MTX_QUIET);
187 LIST_INIT(&ald_queues);
188 LIST_INIT(&ald_active);
189}
190
191static void
193{
194 int needwakeup;
195 struct alq *alq;
196
197 ald_thread = FIRST_THREAD_IN_PROC(ald_proc);
198
199 alq_eventhandler_tag = EVENTHANDLER_REGISTER(shutdown_pre_sync,
200 ald_shutdown, NULL, SHUTDOWN_PRI_FIRST);
201
202 ALD_LOCK();
203
204 for (;;) {
205 while ((alq = LIST_FIRST(&ald_active)) == NULL &&
206 !ald_shutingdown)
207 mtx_sleep(&ald_active, &ald_mtx, PWAIT, "aldslp", 0);
208
209 /* Don't shutdown until all active ALQs are flushed. */
210 if (ald_shutingdown && alq == NULL) {
211 ALD_UNLOCK();
212 break;
213 }
214
215 ALQ_LOCK(alq);
217 ALD_UNLOCK();
218 needwakeup = alq_doio(alq);
220 if (needwakeup)
222 ALD_LOCK();
223 }
224
225 kproc_exit(0);
226}
227
228static void
229ald_shutdown(void *arg, int howto)
230{
231 struct alq *alq;
232
233 ALD_LOCK();
234
235 /* Ensure no new queues can be created. */
236 ald_shutingdown = 1;
237
238 /* Shutdown all ALQs prior to terminating the ald_daemon. */
239 while ((alq = LIST_FIRST(&ald_queues)) != NULL) {
240 LIST_REMOVE(alq, aq_link);
241 ALD_UNLOCK();
243 ALD_LOCK();
244 }
245
246 /* At this point, all ALQs are flushed and shutdown. */
247
248 /*
249 * Wake ald_daemon so that it exits. It won't be able to do
250 * anything until we mtx_sleep because we hold the ald_mtx.
251 */
252 wakeup(&ald_active);
253
254 /* Wait for ald_daemon to exit. */
255 mtx_sleep(ald_proc, &ald_mtx, PWAIT, "aldslp", 0);
256
257 ALD_UNLOCK();
258}
259
260static void
262{
263 ALQ_LOCK(alq);
264
265 /* Stop any new writers. */
267
268 /*
269 * If the ALQ isn't active but has unwritten data (possible if
270 * the ALQ_NOACTIVATE flag has been used), explicitly activate the
271 * ALQ here so that the pending data gets flushed by the ald_daemon.
272 */
273 if (!(alq->aq_flags & AQ_ACTIVE) && HAS_PENDING_DATA(alq)) {
276 ALD_LOCK();
278 ALD_UNLOCK();
279 ALQ_LOCK(alq);
280 }
281
282 /* Drain IO */
283 while (alq->aq_flags & AQ_ACTIVE) {
285 msleep_spin(alq, &alq->aq_mtx, "aldclose", 0);
286 }
288
289 vn_close(alq->aq_vp, FWRITE, alq->aq_cred,
290 curthread);
292}
293
294void
296{
297 /* Drain all pending IO. */
299
300 mtx_destroy(&alq->aq_mtx);
301 free(alq->aq_entbuf, M_ALD);
302 free(alq, M_ALD);
303}
304
305/*
306 * Flush all pending data to disk. This operation will block.
307 */
308static int
310{
311 struct thread *td;
312 struct mount *mp;
313 struct vnode *vp;
314 struct uio auio;
315 struct iovec aiov[2];
316 int totlen;
317 int iov;
318 int wrapearly;
319
320 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
321
322 vp = alq->aq_vp;
323 td = curthread;
324 totlen = 0;
325 iov = 1;
326 wrapearly = alq->aq_wrapearly;
327
328 bzero(&aiov, sizeof(aiov));
329 bzero(&auio, sizeof(auio));
330
331 /* Start the write from the location of our buffer tail pointer. */
332 aiov[0].iov_base = alq->aq_entbuf + alq->aq_writetail;
333
335 /* Buffer not wrapped. */
336 totlen = aiov[0].iov_len = alq->aq_writehead - alq->aq_writetail;
337 } else if (alq->aq_writehead == 0) {
338 /* Buffer not wrapped (special case to avoid an empty iov). */
339 totlen = aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
340 wrapearly;
341 } else {
342 /*
343 * Buffer wrapped, requires 2 aiov entries:
344 * - first is from writetail to end of buffer
345 * - second is from start of buffer to writehead
346 */
347 aiov[0].iov_len = alq->aq_buflen - alq->aq_writetail -
348 wrapearly;
349 iov++;
350 aiov[1].iov_base = alq->aq_entbuf;
351 aiov[1].iov_len = alq->aq_writehead;
352 totlen = aiov[0].iov_len + aiov[1].iov_len;
353 }
354
357
358 auio.uio_iov = &aiov[0];
359 auio.uio_offset = 0;
360 auio.uio_segflg = UIO_SYSSPACE;
361 auio.uio_rw = UIO_WRITE;
362 auio.uio_iovcnt = iov;
363 auio.uio_resid = totlen;
364 auio.uio_td = td;
365
366 /*
367 * Do all of the junk required to write now.
368 */
369 vn_start_write(vp, &mp, V_WAIT);
370 vn_lock(vp, LK_EXCLUSIVE | LK_RETRY);
371 /*
372 * XXX: VOP_WRITE error checks are ignored.
373 */
374#ifdef MAC
375 if (mac_vnode_check_write(alq->aq_cred, NOCRED, vp) == 0)
376#endif
377 VOP_WRITE(vp, &auio, IO_UNIT | IO_APPEND, alq->aq_cred);
378 VOP_UNLOCK(vp);
380
381 ALQ_LOCK(alq);
382 alq->aq_flags &= ~AQ_FLUSHING;
383
384 /* Adjust writetail as required, taking into account wrapping. */
385 alq->aq_writetail = (alq->aq_writetail + totlen + wrapearly) %
386 alq->aq_buflen;
387 alq->aq_freebytes += totlen + wrapearly;
388
389 /*
390 * If we just flushed part of the buffer which wrapped, reset the
391 * wrapearly indicator.
392 */
393 if (wrapearly)
394 alq->aq_wrapearly = 0;
395
396 /*
397 * If we just flushed the buffer completely, reset indexes to 0 to
398 * minimise buffer wraps.
399 * This is also required to ensure alq_getn() can't wedge itself.
400 */
401 if (!HAS_PENDING_DATA(alq))
403
404 KASSERT((alq->aq_writetail >= 0 && alq->aq_writetail < alq->aq_buflen),
405 ("%s: aq_writetail < 0 || aq_writetail >= aq_buflen", __func__));
406
407 if (alq->aq_flags & AQ_WANTED) {
408 alq->aq_flags &= ~AQ_WANTED;
409 return (1);
410 }
411
412 return(0);
413}
414
415static struct kproc_desc ald_kp = {
416 "ALQ Daemon",
418 &ald_proc
419};
420
421SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp);
422SYSINIT(ald, SI_SUB_LOCK, SI_ORDER_ANY, ald_startup, NULL);
423
424/* User visible queue functions */
425
426/*
427 * Create the queue data structure, allocate the buffer, and open the file.
428 */
429
430int
431alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
432 int size, int flags)
433{
434 struct nameidata nd;
435 struct alq *alq;
436 int oflags;
437 int error;
438
439 KASSERT((size > 0), ("%s: size <= 0", __func__));
440
441 *alqp = NULL;
442
443 NDINIT(&nd, LOOKUP, NOFOLLOW, UIO_SYSSPACE, file);
444 oflags = FWRITE | O_NOFOLLOW | O_CREAT;
445
446 error = vn_open_cred(&nd, &oflags, cmode, 0, cred, NULL);
447 if (error)
448 return (error);
449
450 NDFREE(&nd, NDF_ONLY_PNBUF);
451 /* We just unlock so we hold a reference */
452 VOP_UNLOCK(nd.ni_vp);
453
454 alq = malloc(sizeof(*alq), M_ALD, M_WAITOK|M_ZERO);
455 alq->aq_vp = nd.ni_vp;
456 alq->aq_cred = crhold(cred);
457
458 mtx_init(&alq->aq_mtx, "ALD Queue", NULL, MTX_SPIN|MTX_QUIET);
459
460 alq->aq_buflen = size;
461 alq->aq_entmax = 0;
462 alq->aq_entlen = 0;
463
465 alq->aq_entbuf = malloc(alq->aq_buflen, M_ALD, M_WAITOK|M_ZERO);
467 if (flags & ALQ_ORDERED)
469
470 if ((error = ald_add(alq)) != 0) {
472 return (error);
473 }
474
475 *alqp = alq;
476
477 return (0);
478}
479
480int
481alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode,
482 int size, int count)
483{
484 int ret;
485
486 KASSERT((count >= 0), ("%s: count < 0", __func__));
487
488 if (count > 0) {
489 if ((ret = alq_open_flags(alqp, file, cred, cmode,
490 size*count, 0)) == 0) {
491 (*alqp)->aq_flags |= AQ_LEGACY;
492 (*alqp)->aq_entmax = count;
493 (*alqp)->aq_entlen = size;
494 }
495 } else
496 ret = alq_open_flags(alqp, file, cred, cmode, size, 0);
497
498 return (ret);
499}
500
501/*
502 * Copy a new entry into the queue. If the operation would block either
503 * wait or return an error depending on the value of waitok.
504 */
505int
506alq_writen(struct alq *alq, void *data, int len, int flags)
507{
508 int activate, copy, ret;
509 void *waitchan;
510
511 KASSERT((len > 0 && len <= alq->aq_buflen),
512 ("%s: len <= 0 || len > aq_buflen", __func__));
513
514 activate = ret = 0;
515 copy = len;
516 waitchan = NULL;
517
518 ALQ_LOCK(alq);
519
520 /*
521 * Fail to perform the write and return EWOULDBLOCK if:
522 * - The message is larger than our underlying buffer.
523 * - The ALQ is being shutdown.
524 * - There is insufficient free space in our underlying buffer
525 * to accept the message and the user can't wait for space.
526 * - There is insufficient free space in our underlying buffer
527 * to accept the message and the alq is inactive due to prior
528 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
529 */
530 if (len > alq->aq_buflen ||
532 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
533 HAS_PENDING_DATA(alq))) && alq->aq_freebytes < len)) {
535 return (EWOULDBLOCK);
536 }
537
538 /*
539 * If we want ordered writes and there is already at least one thread
540 * waiting for resources to become available, sleep until we're woken.
541 */
542 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
543 KASSERT(!(flags & ALQ_NOWAIT),
544 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
545 alq->aq_waiters++;
546 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqwnord", 0);
547 alq->aq_waiters--;
548 }
549
550 /*
551 * (ALQ_WAITOK && aq_freebytes < len) or aq_freebytes >= len, either
552 * enter while loop and sleep until we have enough free bytes (former)
553 * or skip (latter). If AQ_ORDERED is set, only 1 thread at a time will
554 * be in this loop. Otherwise, multiple threads may be sleeping here
555 * competing for ALQ resources.
556 */
557 while (alq->aq_freebytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
558 KASSERT(!(flags & ALQ_NOWAIT),
559 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
561 alq->aq_waiters++;
562 if (waitchan)
563 wakeup(waitchan);
564 msleep_spin(alq, &alq->aq_mtx, "alqwnres", 0);
565 alq->aq_waiters--;
566
567 /*
568 * If we're the first thread to wake after an AQ_WANTED wakeup
569 * but there isn't enough free space for us, we're going to loop
570 * and sleep again. If there are other threads waiting in this
571 * loop, schedule a wakeup so that they can see if the space
572 * they require is available.
573 */
574 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
575 alq->aq_freebytes < len && !(alq->aq_flags & AQ_WANTED))
576 waitchan = alq;
577 else
578 waitchan = NULL;
579 }
580
581 /*
582 * If there are waiters, we need to signal the waiting threads after we
583 * complete our work. The alq ptr is used as a wait channel for threads
584 * requiring resources to be freed up. In the AQ_ORDERED case, threads
585 * are not allowed to concurrently compete for resources in the above
586 * while loop, so we use a different wait channel in this case.
587 */
588 if (alq->aq_waiters > 0) {
589 if (alq->aq_flags & AQ_ORDERED)
590 waitchan = &alq->aq_waiters;
591 else
592 waitchan = alq;
593 } else
594 waitchan = NULL;
595
596 /* Bail if we're shutting down. */
597 if (alq->aq_flags & AQ_SHUTDOWN) {
598 ret = EWOULDBLOCK;
599 goto unlock;
600 }
601
602 /*
603 * If we need to wrap the buffer to accommodate the write,
604 * we'll need 2 calls to bcopy.
605 */
606 if ((alq->aq_buflen - alq->aq_writehead) < len)
607 copy = alq->aq_buflen - alq->aq_writehead;
608
609 /* Copy message (or part thereof if wrap required) to the buffer. */
610 bcopy(data, alq->aq_entbuf + alq->aq_writehead, copy);
611 alq->aq_writehead += copy;
612
613 if (alq->aq_writehead >= alq->aq_buflen) {
614 KASSERT((alq->aq_writehead == alq->aq_buflen),
615 ("%s: alq->aq_writehead (%d) > alq->aq_buflen (%d)",
616 __func__,
618 alq->aq_buflen));
619 alq->aq_writehead = 0;
620 }
621
622 if (copy != len) {
623 /*
624 * Wrap the buffer by copying the remainder of our message
625 * to the start of the buffer and resetting aq_writehead.
626 */
627 bcopy(((uint8_t *)data)+copy, alq->aq_entbuf, len - copy);
628 alq->aq_writehead = len - copy;
629 }
630
631 KASSERT((alq->aq_writehead >= 0 && alq->aq_writehead < alq->aq_buflen),
632 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen", __func__));
633
634 alq->aq_freebytes -= len;
635
636 if (!(alq->aq_flags & AQ_ACTIVE) && !(flags & ALQ_NOACTIVATE)) {
638 activate = 1;
639 }
640
641 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
642
643unlock:
645
646 if (activate) {
647 ALD_LOCK();
649 ALD_UNLOCK();
650 }
651
652 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
653 if (waitchan != NULL)
654 wakeup_one(waitchan);
655
656 return (ret);
657}
658
659int
660alq_write(struct alq *alq, void *data, int flags)
661{
662 /* Should only be called in fixed length message (legacy) mode. */
663 KASSERT((alq->aq_flags & AQ_LEGACY),
664 ("%s: fixed length write on variable length queue", __func__));
665 return (alq_writen(alq, data, alq->aq_entlen, flags));
666}
667
668/*
669 * Retrieve a pointer for the ALQ to write directly into, avoiding bcopy.
670 */
671struct ale *
672alq_getn(struct alq *alq, int len, int flags)
673{
674 int contigbytes;
675 void *waitchan;
676
677 KASSERT((len > 0 && len <= alq->aq_buflen),
678 ("%s: len <= 0 || len > alq->aq_buflen", __func__));
679
680 waitchan = NULL;
681
682 ALQ_LOCK(alq);
683
684 /*
685 * Determine the number of free contiguous bytes.
686 * We ensure elsewhere that if aq_writehead == aq_writetail because
687 * the buffer is empty, they will both be set to 0 and therefore
688 * aq_freebytes == aq_buflen and is fully contiguous.
689 * If they are equal and the buffer is not empty, aq_freebytes will
690 * be 0 indicating the buffer is full.
691 */
693 contigbytes = alq->aq_freebytes;
694 else {
695 contigbytes = alq->aq_buflen - alq->aq_writehead;
696
697 if (contigbytes < len) {
698 /*
699 * Insufficient space at end of buffer to handle a
700 * contiguous write. Wrap early if there's space at
701 * the beginning. This will leave a hole at the end
702 * of the buffer which we will have to skip over when
703 * flushing the buffer to disk.
704 */
705 if (alq->aq_writetail >= len || flags & ALQ_WAITOK) {
706 /* Keep track of # bytes left blank. */
707 alq->aq_wrapearly = contigbytes;
708 /* Do the wrap and adjust counters. */
709 contigbytes = alq->aq_freebytes =
711 alq->aq_writehead = 0;
712 }
713 }
714 }
715
716 /*
717 * Return a NULL ALE if:
718 * - The message is larger than our underlying buffer.
719 * - The ALQ is being shutdown.
720 * - There is insufficient free space in our underlying buffer
721 * to accept the message and the user can't wait for space.
722 * - There is insufficient free space in our underlying buffer
723 * to accept the message and the alq is inactive due to prior
724 * use of the ALQ_NOACTIVATE flag (which would lead to deadlock).
725 */
726 if (len > alq->aq_buflen ||
728 (((flags & ALQ_NOWAIT) || (!(alq->aq_flags & AQ_ACTIVE) &&
729 HAS_PENDING_DATA(alq))) && contigbytes < len)) {
731 return (NULL);
732 }
733
734 /*
735 * If we want ordered writes and there is already at least one thread
736 * waiting for resources to become available, sleep until we're woken.
737 */
738 if (alq->aq_flags & AQ_ORDERED && alq->aq_waiters > 0) {
739 KASSERT(!(flags & ALQ_NOWAIT),
740 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
741 alq->aq_waiters++;
742 msleep_spin(&alq->aq_waiters, &alq->aq_mtx, "alqgnord", 0);
743 alq->aq_waiters--;
744 }
745
746 /*
747 * (ALQ_WAITOK && contigbytes < len) or contigbytes >= len, either enter
748 * while loop and sleep until we have enough contiguous free bytes
749 * (former) or skip (latter). If AQ_ORDERED is set, only 1 thread at a
750 * time will be in this loop. Otherwise, multiple threads may be
751 * sleeping here competing for ALQ resources.
752 */
753 while (contigbytes < len && !(alq->aq_flags & AQ_SHUTDOWN)) {
754 KASSERT(!(flags & ALQ_NOWAIT),
755 ("%s: ALQ_NOWAIT set but incorrectly ignored!", __func__));
757 alq->aq_waiters++;
758 if (waitchan)
759 wakeup(waitchan);
760 msleep_spin(alq, &alq->aq_mtx, "alqgnres", 0);
761 alq->aq_waiters--;
762
764 contigbytes = alq->aq_freebytes;
765 else
766 contigbytes = alq->aq_buflen - alq->aq_writehead;
767
768 /*
769 * If we're the first thread to wake after an AQ_WANTED wakeup
770 * but there isn't enough free space for us, we're going to loop
771 * and sleep again. If there are other threads waiting in this
772 * loop, schedule a wakeup so that they can see if the space
773 * they require is available.
774 */
775 if (alq->aq_waiters > 0 && !(alq->aq_flags & AQ_ORDERED) &&
776 contigbytes < len && !(alq->aq_flags & AQ_WANTED))
777 waitchan = alq;
778 else
779 waitchan = NULL;
780 }
781
782 /*
783 * If there are waiters, we need to signal the waiting threads after we
784 * complete our work. The alq ptr is used as a wait channel for threads
785 * requiring resources to be freed up. In the AQ_ORDERED case, threads
786 * are not allowed to concurrently compete for resources in the above
787 * while loop, so we use a different wait channel in this case.
788 */
789 if (alq->aq_waiters > 0) {
790 if (alq->aq_flags & AQ_ORDERED)
791 waitchan = &alq->aq_waiters;
792 else
793 waitchan = alq;
794 } else
795 waitchan = NULL;
796
797 /* Bail if we're shutting down. */
798 if (alq->aq_flags & AQ_SHUTDOWN) {
800 if (waitchan != NULL)
801 wakeup_one(waitchan);
802 return (NULL);
803 }
804
805 /*
806 * If we are here, we have a contiguous number of bytes >= len
807 * available in our buffer starting at aq_writehead.
808 */
810 alq->aq_getpost.ae_bytesused = len;
811
812 return (&alq->aq_getpost);
813}
814
815struct ale *
816alq_get(struct alq *alq, int flags)
817{
818 /* Should only be called in fixed length message (legacy) mode. */
819 KASSERT((alq->aq_flags & AQ_LEGACY),
820 ("%s: fixed length get on variable length queue", __func__));
821 return (alq_getn(alq, alq->aq_entlen, flags));
822}
823
824void
825alq_post_flags(struct alq *alq, struct ale *ale, int flags)
826{
827 int activate;
828 void *waitchan;
829
830 activate = 0;
831
832 if (ale->ae_bytesused > 0) {
833 if (!(alq->aq_flags & AQ_ACTIVE) &&
834 !(flags & ALQ_NOACTIVATE)) {
836 activate = 1;
837 }
838
839 alq->aq_writehead += ale->ae_bytesused;
840 alq->aq_freebytes -= ale->ae_bytesused;
841
842 /* Wrap aq_writehead if we filled to the end of the buffer. */
843 if (alq->aq_writehead == alq->aq_buflen)
844 alq->aq_writehead = 0;
845
846 KASSERT((alq->aq_writehead >= 0 &&
848 ("%s: aq_writehead < 0 || aq_writehead >= aq_buflen",
849 __func__));
850
851 KASSERT((HAS_PENDING_DATA(alq)), ("%s: queue empty!", __func__));
852 }
853
854 /*
855 * If there are waiters, we need to signal the waiting threads after we
856 * complete our work. The alq ptr is used as a wait channel for threads
857 * requiring resources to be freed up. In the AQ_ORDERED case, threads
858 * are not allowed to concurrently compete for resources in the
859 * alq_getn() while loop, so we use a different wait channel in this case.
860 */
861 if (alq->aq_waiters > 0) {
862 if (alq->aq_flags & AQ_ORDERED)
863 waitchan = &alq->aq_waiters;
864 else
865 waitchan = alq;
866 } else
867 waitchan = NULL;
868
870
871 if (activate) {
872 ALD_LOCK();
874 ALD_UNLOCK();
875 }
876
877 /* NB: We rely on wakeup_one waking threads in a FIFO manner. */
878 if (waitchan != NULL)
879 wakeup_one(waitchan);
880}
881
882void
884{
885 int needwakeup = 0;
886
887 ALD_LOCK();
888 ALQ_LOCK(alq);
889
890 /*
891 * Pull the lever iff there is data to flush and we're
892 * not already in the middle of a flush operation.
893 */
895 if (alq->aq_flags & AQ_ACTIVE)
897
898 ALD_UNLOCK();
899 needwakeup = alq_doio(alq);
900 } else
901 ALD_UNLOCK();
902
904
905 if (needwakeup)
907}
908
909/*
910 * Flush remaining data, close the file and free all resources.
911 */
912void
914{
915 /* Only flush and destroy alq if not already shutting down. */
916 if (ald_rem(alq) == 0)
918}
919
920static int
921alq_load_handler(module_t mod, int what, void *arg)
922{
923 int ret;
924
925 ret = 0;
926
927 switch (what) {
928 case MOD_LOAD:
929 case MOD_SHUTDOWN:
930 break;
931
932 case MOD_QUIESCE:
933 ALD_LOCK();
934 /* Only allow unload if there are no open queues. */
935 if (LIST_FIRST(&ald_queues) == NULL) {
936 ald_shutingdown = 1;
937 ALD_UNLOCK();
938 EVENTHANDLER_DEREGISTER(shutdown_pre_sync,
939 alq_eventhandler_tag);
940 ald_shutdown(NULL, 0);
941 mtx_destroy(&ald_mtx);
942 } else {
943 ALD_UNLOCK();
944 ret = EBUSY;
945 }
946 break;
947
948 case MOD_UNLOAD:
949 /* If MOD_QUIESCE failed we must fail here too. */
950 if (ald_shutingdown == 0)
951 ret = EBUSY;
952 break;
953
954 default:
955 ret = EINVAL;
956 break;
957 }
958
959 return (ret);
960}
961
962static moduledata_t alq_mod =
963{
964 "alq",
966 NULL
967};
968
969DECLARE_MODULE(alq, alq_mod, SI_SUB_LAST, SI_ORDER_ANY);
int * count
Definition: cpufreq_if.m:63
#define AQ_LEGACY
Definition: kern_alq.c:86
void alq_close(struct alq *alq)
Definition: kern_alq.c:913
struct ale * alq_getn(struct alq *alq, int len, int flags)
Definition: kern_alq.c:672
static struct mtx ald_mtx
Definition: kern_alq.c:98
#define AQ_ORDERED
Definition: kern_alq.c:85
#define AQ_SHUTDOWN
Definition: kern_alq.c:84
int alq_write(struct alq *alq, void *data, int flags)
Definition: kern_alq.c:660
MODULE_VERSION(alq, 1)
static void alq_shutdown(struct alq *alq)
Definition: kern_alq.c:261
static void ald_deactivate(struct alq *alq)
Definition: kern_alq.c:177
static struct kproc_desc ald_kp
Definition: kern_alq.c:415
static moduledata_t alq_mod
Definition: kern_alq.c:962
int alq_open_flags(struct alq **alqp, const char *file, struct ucred *cred, int cmode, int size, int flags)
Definition: kern_alq.c:431
static int alq_doio(struct alq *alq)
Definition: kern_alq.c:309
#define ALD_LOCK()
#define ALD_UNLOCK()
void alq_flush(struct alq *alq)
Definition: kern_alq.c:883
struct ale * alq_get(struct alq *alq, int flags)
Definition: kern_alq.c:816
__FBSDID("$FreeBSD$")
static void ald_daemon(void)
Definition: kern_alq.c:192
static void ald_startup(void *unused)
Definition: kern_alq.c:184
SYSINIT(aldthread, SI_SUB_KTHREAD_IDLE, SI_ORDER_ANY, kproc_start, &ald_kp)
#define ALQ_LOCK(alq)
Definition: kern_alq.c:88
static MALLOC_DEFINE(M_ALD, "ALD", "ALD")
static void ald_activate(struct alq *alq)
Definition: kern_alq.c:170
void alq_post_flags(struct alq *alq, struct ale *ale, int flags)
Definition: kern_alq.c:825
#define AQ_ACTIVE
Definition: kern_alq.c:82
#define ALQ_UNLOCK(alq)
Definition: kern_alq.c:89
#define AQ_FLUSHING
Definition: kern_alq.c:83
static int ald_rem(struct alq *alq)
Definition: kern_alq.c:149
#define HAS_PENDING_DATA(alq)
Definition: kern_alq.c:91
int alq_writen(struct alq *alq, void *data, int len, int flags)
Definition: kern_alq.c:506
DECLARE_MODULE(alq, alq_mod, SI_SUB_LAST, SI_ORDER_ANY)
static int alq_load_handler(module_t mod, int what, void *arg)
Definition: kern_alq.c:921
#define AQ_WANTED
Definition: kern_alq.c:81
static void ald_shutdown(void *arg, int howto)
Definition: kern_alq.c:229
void alq_destroy(struct alq *alq)
Definition: kern_alq.c:295
int alq_open(struct alq **alqp, const char *file, struct ucred *cred, int cmode, int size, int count)
Definition: kern_alq.c:481
static LIST_HEAD(alq)
Definition: kern_alq.c:99
void kproc_exit(int ecode)
Definition: kern_kthread.c:148
void kproc_start(const void *udata)
Definition: kern_kthread.c:62
void *() malloc(size_t size, struct malloc_type *mtp, int flags)
Definition: kern_malloc.c:632
void free(void *addr, struct malloc_type *mtp)
Definition: kern_malloc.c:907
struct ucred * crhold(struct ucred *cr)
Definition: kern_prot.c:2014
void crfree(struct ucred *cr)
Definition: kern_prot.c:2035
void wakeup(const void *ident)
Definition: kern_synch.c:349
void wakeup_one(const void *ident)
Definition: kern_synch.c:369
uint32_t * data
Definition: msi_if.m:90
Definition: kern_alq.c:59
struct ucred * aq_cred
Definition: kern_alq.c:76
int aq_writetail
Definition: kern_alq.c:66
struct ale aq_getpost
Definition: kern_alq.c:73
struct mtx aq_mtx
Definition: kern_alq.c:74
int aq_freebytes
Definition: kern_alq.c:63
char * aq_entbuf
Definition: kern_alq.c:60
int aq_waiters
Definition: kern_alq.c:69
int aq_wrapearly
Definition: kern_alq.c:67
int aq_entmax
Definition: kern_alq.c:61
int aq_writehead
Definition: kern_alq.c:65
int aq_flags
Definition: kern_alq.c:68
int aq_entlen
Definition: kern_alq.c:62
struct vnode * aq_vp
Definition: kern_alq.c:75
int aq_buflen
Definition: kern_alq.c:64
uint16_t flags
Definition: subr_stats.c:2
struct mtx mtx
Definition: uipc_ktls.c:0
void() NDFREE(struct nameidata *ndp, const u_int flags)
Definition: vfs_lookup.c:1555
int vn_start_write(struct vnode *vp, struct mount **mpp, int flags)
Definition: vfs_vnops.c:1901
int vn_open_cred(struct nameidata *ndp, int *flagp, int cmode, u_int vn_open_flags, struct ucred *cred, struct file *fp)
Definition: vfs_vnops.c:228
int vn_close(struct vnode *vp, int flags, struct ucred *file_cred, struct thread *td)
Definition: vfs_vnops.c:553
void vn_finished_write(struct mount *mp)
Definition: vfs_vnops.c:2009