35#include <sys/cpuset.h>
36#include <sys/kernel.h>
37#include <sys/kthread.h>
38#include <sys/libkern.h>
39#include <sys/limits.h>
41#include <sys/malloc.h>
47#include <sys/gtaskqueue.h>
48#include <sys/unistd.h>
49#include <machine/stdarg.h>
72 struct mtx_padalign tq_mutex;
76 struct thread **tq_threads;
80 taskqueue_callback_fn tq_callbacks[TASKQUEUE_NUM_CALLBACKS];
81 void *tq_cb_contexts[TASKQUEUE_NUM_CALLBACKS];
84#define TQ_FLAGS_ACTIVE (1 << 0)
85#define TQ_FLAGS_BLOCKED (1 << 1)
86#define TQ_FLAGS_UNLOCKED_ENQUEUE (1 << 2)
88#define DT_CALLOUT_ARMED (1 << 0)
93 mtx_lock_spin(&(tq)->tq_mutex); \
95 mtx_lock(&(tq)->tq_mutex); \
97#define TQ_ASSERT_LOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_OWNED)
99#define TQ_UNLOCK(tq) \
102 mtx_unlock_spin(&(tq)->tq_mutex); \
104 mtx_unlock(&(tq)->tq_mutex); \
106#define TQ_ASSERT_UNLOCKED(tq) mtx_assert(&(tq)->tq_mutex, MA_NOTOWNED)
110gtask_dump(
struct gtask *gtask)
112 printf(
"gtask: %p ta_flags=%x ta_priority=%d ta_func=%p ta_context=%p\n",
113 gtask, gtask->ta_flags, gtask->ta_priority, gtask->ta_func, gtask->ta_context);
121 return (msleep_spin(p, (
struct mtx *)&tq->tq_mutex, wm, 0));
122 return (msleep(p, &tq->tq_mutex, 0, wm, 0));
127 taskqueue_enqueue_fn enqueue,
void *context,
128 int mtxflags,
const char *mtxname __unused)
133 tq_name =
malloc(TASKQUEUE_NAMELEN, M_GTASKQUEUE, mflags | M_ZERO);
141 free(tq_name, M_GTASKQUEUE);
145 STAILQ_INIT(&queue->tq_queue);
146 LIST_INIT(&queue->tq_active);
147 queue->tq_enqueue = enqueue;
148 queue->tq_context = context;
149 queue->tq_name = tq_name;
150 queue->tq_spin = (mtxflags & MTX_SPIN) != 0;
154 mtx_init(&queue->tq_mutex, tq_name, NULL, mtxflags);
166 while (tq->tq_tcount > 0 || tq->tq_callouts > 0) {
177 queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
179 KASSERT(LIST_EMPTY(&queue->tq_active), (
"Tasks still running?"));
180 KASSERT(queue->tq_callouts == 0, (
"Armed timeout tasks"));
181 mtx_destroy(&queue->tq_mutex);
182 free(queue->tq_threads, M_GTASKQUEUE);
183 free(queue->tq_name, M_GTASKQUEUE);
184 free(queue, M_GTASKQUEUE);
193 struct gtaskqueue *queue = grouptask->gt_taskqueue;
194 struct gtask *gtask = &grouptask->gt_task;
199 panic(
"queue == NULL");
203 gtask->ta_flags |= TASK_NOENQUEUE;
211 struct gtaskqueue *queue = grouptask->gt_taskqueue;
212 struct gtask *gtask = &grouptask->gt_task;
217 panic(
"queue == NULL");
221 gtask->ta_flags &= ~TASK_NOENQUEUE;
231 panic(
"queue == NULL");
235 if (gtask->ta_flags & TASK_ENQUEUED) {
239 if (gtask->ta_flags & TASK_NOENQUEUE) {
243 STAILQ_INSERT_TAIL(&queue->tq_queue, gtask, ta_link);
244 gtask->ta_flags |= TASK_ENQUEUED;
247 queue->tq_enqueue(queue->tq_context);
264 struct gtask t_barrier;
266 if (STAILQ_EMPTY(&queue->tq_queue))
278 STAILQ_INSERT_TAIL(&queue->tq_queue, &t_barrier, ta_link);
279 t_barrier.ta_flags |= TASK_ENQUEUED;
285 while (t_barrier.ta_flags & TASK_ENQUEUED)
286 TQ_SLEEP(queue, &t_barrier,
"gtq_qdrain");
300 if (LIST_EMPTY(&queue->tq_active))
304 queue->tq_callouts++;
309 LIST_FOREACH(tb, &queue->tq_active, tb_link) {
310 if ((
int)(tb->
tb_seq - seq) <= 0) {
317 queue->tq_callouts--;
336 queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
337 if (!STAILQ_EMPTY(&queue->tq_queue))
338 queue->tq_enqueue(queue->tq_context);
345 struct epoch_tracker et;
350 KASSERT(queue != NULL, (
"tq is NULL"));
353 LIST_INSERT_HEAD(&queue->tq_active, &tb, tb_link);
354 in_net_epoch =
false;
356 while ((gtask = STAILQ_FIRST(&queue->tq_queue)) != NULL) {
357 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
358 gtask->ta_flags &= ~TASK_ENQUEUED;
360 tb.
tb_seq = ++queue->tq_seq;
363 KASSERT(gtask->ta_func != NULL, (
"task->ta_func is NULL"));
364 if (!in_net_epoch && TASK_IS_NET(gtask)) {
367 }
else if (in_net_epoch && !TASK_IS_NET(gtask)) {
369 in_net_epoch =
false;
371 gtask->ta_func(gtask->ta_context);
378 LIST_REMOVE(&tb, tb_link);
387 LIST_FOREACH(tb, &queue->tq_active, tb_link) {
398 if (gtask->ta_flags & TASK_ENQUEUED)
399 STAILQ_REMOVE(&queue->tq_queue, gtask, gtask, ta_link);
400 gtask->ta_flags &= ~TASK_ENQUEUED;
419 while ((gtask->ta_flags & TASK_ENQUEUED) ||
task_is_running(queue, gtask))
420 TQ_SLEEP(queue, gtask,
"gtq_drain");
428 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
440 WITNESS_WARN(WARN_GIANTOK | WARN_SLEEPOK, NULL, __func__);
450 cpuset_t *
mask,
const char *
name, va_list ap)
452 char ktname[MAXCOMLEN + 1];
463 tq->tq_threads =
malloc(
sizeof(
struct thread *) *
count, M_GTASKQUEUE,
465 if (tq->tq_threads == NULL) {
466 printf(
"%s: no memory for %s threads\n", __func__, ktname);
470 for (i = 0; i <
count; i++) {
473 &tq->tq_threads[i], RFSTOPPED, 0,
"%s", ktname);
476 &tq->tq_threads[i], RFSTOPPED, 0,
480 printf(
"%s: kthread_add(%s): error %d", __func__,
482 tq->tq_threads[i] = NULL;
486 for (i = 0; i <
count; i++) {
487 if (tq->tq_threads[i] == NULL)
489 td = tq->tq_threads[i];
497 printf(
"%s: curthread=%llu: can't pin; "
500 (
unsigned long long) td->td_tid,
513 const char *
name, ...)
526 enum taskqueue_callback_type cb_type)
528 taskqueue_callback_fn tq_callback;
531 tq_callback = tq->tq_callbacks[cb_type];
532 if (tq_callback != NULL)
533 tq_callback(tq->tq_cb_contexts[cb_type]);
586 taskqueue_enqueue_fn enqueue,
void *context)
589 MTX_SPIN,
"fast_taskqueue");
617 LIST_INIT(&qcpu->tgc_tasks);
636 mtx_assert(&qgroup->
tqg_lock, MA_OWNED);
638 (
"qgroup %s has no queues", qgroup->
tqg_name));
645 for (idx = -1, mincnt = INT_MAX, strict = 1; mincnt == INT_MAX;
647 for (i = 0; i < qgroup->
tqg_cnt; i++) {
648 if (qgroup->
tqg_queue[i].tgc_cnt > mincnt)
651 LIST_FOREACH(n, &qgroup->
tqg_queue[i].tgc_tasks,
653 if (n->gt_uniq == uniq)
663 panic(
"%s: failed to pick a qid.", __func__);
670 void *uniq, device_t dev,
struct resource *irq,
const char *
name)
675 (
"qgroup %s has no queues", qgroup->
tqg_name));
677 gtask->gt_uniq = uniq;
678 snprintf(gtask->gt_name, GROUPTASK_NAMELEN,
"%s",
name ?
name :
"grouptask");
685 LIST_INSERT_HEAD(&qgroup->
tqg_queue[qid].tgc_tasks, gtask, gt_list);
686 gtask->gt_taskqueue = qgroup->
tqg_queue[qid].tgc_taskq;
687 if (dev != NULL && irq != NULL) {
693 printf(
"%s: binding interrupt failed for %s: %d\n",
694 __func__, gtask->gt_name, error);
701 void *uniq,
int cpu, device_t dev,
struct resource *irq,
const char *
name)
705 gtask->gt_uniq = uniq;
706 snprintf(gtask->gt_name, GROUPTASK_NAMELEN,
"%s",
name ?
name :
"grouptask");
711 for (i = 0, qid = -1; i < qgroup->
tqg_cnt; i++)
712 if (qgroup->
tqg_queue[i].tgc_cpu == cpu) {
718 printf(
"%s: qid not found for %s cpu=%d\n", __func__, gtask->gt_name, cpu);
722 LIST_INSERT_HEAD(&qgroup->
tqg_queue[qid].tgc_tasks, gtask, gt_list);
723 gtask->gt_taskqueue = qgroup->
tqg_queue[qid].tgc_taskq;
727 if (dev != NULL && irq != NULL) {
730 printf(
"%s: binding interrupt failed for %s: %d\n",
731 __func__, gtask->gt_name, error);
743 for (i = 0; i < qgroup->
tqg_cnt; i++)
744 if (qgroup->
tqg_queue[i].tgc_taskq == gtask->gt_taskqueue)
747 panic(
"%s: task %s not in group", __func__, gtask->gt_name);
749 LIST_REMOVE(gtask, gt_list);
751 gtask->gt_taskqueue = NULL;
752 gtask->gt_task.ta_flags &= ~TASK_NOENQUEUE;
766 thread_lock(curthread);
768 thread_unlock(curthread);
771 printf(
"%s: binding curthread failed: %d\n", __func__, error);
772 free(gtask, M_DEVBUF);
788 for (i = 0; i < qgroup->
tqg_cnt; i++) {
789 gtask =
malloc(
sizeof(*gtask), M_DEVBUF, M_WAITOK);
803 qgroup =
malloc(
sizeof(*qgroup), M_GTASKQUEUE, M_WAITOK | M_ZERO);
804 mtx_init(&qgroup->
tqg_lock,
"taskqgroup", NULL, MTX_DEF);
808 for (cpu = i = 0; i < cnt; i++) {
810 for (j = 0; j < stride; j++)
826 for (
int i = 0; i <
mp_ncpus; i++) {
static STAILQ_HEAD(cn_device)
int cpuset_setthread(lwpid_t id, cpuset_t *mask)
int kthread_add(void(*func)(void *), void *arg, struct proc *p, struct thread **newtdp, int flags, int pages, const char *fmt,...)
void *() malloc(size_t size, struct malloc_type *mtp, int flags)
void free(void *addr, struct malloc_type *mtp)
void panic(const char *fmt,...)
void wakeup_any(const void *ident)
void wakeup(const void *ident)
void wakeup_one(const void *ident)
void sched_bind(struct thread *td, int cpu)
void sched_prio(struct thread *td, u_char prio)
void sched_add(struct thread *td, int flags)
struct gtask * tb_running
struct taskqgroup_cpu tqg_queue[MAXCPU]
int bus_bind_intr(device_t dev, struct resource *r, int cpu)
Wrapper function for BUS_BIND_INTR().
static void gtaskqueue_terminate(struct thread **pp, struct gtaskqueue *tq)
int taskqgroup_attach_cpu(struct taskqgroup *qgroup, struct grouptask *gtask, void *uniq, int cpu, device_t dev, struct resource *irq, const char *name)
static struct gtaskqueue * gtaskqueue_create_fast(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context)
void grouptask_block(struct grouptask *grouptask)
#define TQ_ASSERT_LOCKED(tq)
TASKQGROUP_DEFINE(softirq, mp_ncpus, 1)
static void gtaskqueue_thread_enqueue(void *)
static int gtaskqueue_cancel_locked(struct gtaskqueue *queue, struct gtask *gtask)
#define TQ_ASSERT_UNLOCKED(tq)
void taskqgroup_detach(struct taskqgroup *qgroup, struct grouptask *gtask)
void taskqgroup_attach(struct taskqgroup *qgroup, struct grouptask *gtask, void *uniq, device_t dev, struct resource *irq, const char *name)
static MALLOC_DEFINE(M_GTASKQUEUE, "gtaskqueue", "Group Task Queues")
static void gtaskqueue_thread_loop(void *arg)
void gtaskqueue_drain(struct gtaskqueue *queue, struct gtask *gtask)
static void __unused gtaskqueue_free(struct gtaskqueue *queue)
void taskqgroup_bind(struct taskqgroup *qgroup)
void gtaskqueue_unblock(struct gtaskqueue *queue)
static void gtaskqueue_drain_tq_queue(struct gtaskqueue *queue)
static void gtaskqueue_drain_tq_active(struct gtaskqueue *queue)
#define TQ_FLAGS_UNLOCKED_ENQUEUE
void gtaskqueue_block(struct gtaskqueue *queue)
static __inline int TQ_SLEEP(struct gtaskqueue *tq, void *p, const char *wm)
static struct gtaskqueue * _gtaskqueue_create(const char *name, int mflags, taskqueue_enqueue_fn enqueue, void *context, int mtxflags, const char *mtxname __unused)
static void gtaskqueue_task_nop_fn(void *context)
static int taskqgroup_find(struct taskqgroup *qgroup, void *uniq)
static void gtaskqueue_run_locked(struct gtaskqueue *queue)
void gtaskqueue_drain_all(struct gtaskqueue *queue)
static int gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, const char *name,...)
void taskqgroup_drain_all(struct taskqgroup *tqg)
static int _gtaskqueue_start_threads(struct gtaskqueue **tqp, int count, int pri, cpuset_t *mask, const char *name, va_list ap)
void(* gtaskqueue_enqueue_fn)(void *context)
static void taskqgroup_binder(void *ctx)
int grouptaskqueue_enqueue(struct gtaskqueue *queue, struct gtask *gtask)
void grouptask_unblock(struct grouptask *grouptask)
int gtaskqueue_cancel(struct gtaskqueue *queue, struct gtask *gtask)
void taskqgroup_destroy(struct taskqgroup *qgroup)
static void gtaskqueue_run_callback(struct gtaskqueue *tq, enum taskqueue_callback_type cb_type)
static void taskqgroup_cpu_create(struct taskqgroup *qgroup, int idx, int cpu)
struct taskqgroup * taskqgroup_create(const char *name, int cnt, int stride)
static int task_is_running(struct gtaskqueue *queue, struct gtask *gtask)
static void gtaskqueue_drain_locked(struct gtaskqueue *queue, struct gtask *gtask)
int vsnprintf(char *str, size_t size, const char *format, va_list ap)
int printf(const char *fmt,...)
int snprintf(char *str, size_t size, const char *format,...)
void taskqueue_thread_enqueue(void *context)