34#include <sys/counter.h>
36#include <sys/malloc.h>
38#include <sys/sysctl.h>
39#include <machine/cpu.h>
44#define atomic_cmpset_acq_64 atomic_cmpset_64
45#define atomic_cmpset_rel_64 atomic_cmpset_64
94 uint16_t x =
r->size - 1;
104static inline uint16_t
107 int x =
r->size - idx;
110 return (x > n ? idx + n : n - x);
121 int n, pending, total;
128 os.
state = atomic_load_acq_64(&
r->state);
144 os.
state = atomic_load_64(&
r->state);
157 }
while (atomic_fcmpset_64(&
r->state, &os.
state,
171 os.
state = atomic_load_64(&
r->state);
179 MPASS(total >= budget);
187 if (total >= budget) {
194 }
while (atomic_fcmpset_acq_64(&
r->state, &os.
state, ns.
state) == 0);
242 MPASS(coalescing ==
false);
244 os.
state = atomic_load_64(&
r->state);
253 }
while (atomic_fcmpset_acq_64(&
r->state, &os.
state, ns.
state) == 0);
269 if (pr == NULL || size < 2 || size > 65536 ||
drain == NULL ||
273 flags &= M_NOWAIT | M_WAITOK;
276 r = malloc(__offsetof(
struct mp_ring, items[size]),
mt, flags | M_ZERO);
285 if ((
r->
dropped = counter_u64_alloc(flags)) == NULL)
287 for (i = 0; i < nitems(
r->
consumer); i++) {
288 if ((
r->
consumer[i] = counter_u64_alloc(flags)) == NULL)
293 if ((
r->
abdications = counter_u64_alloc(flags)) == NULL)
295 if ((
r->
stalls = counter_u64_alloc(flags)) == NULL)
297 if ((
r->
consumed = counter_u64_alloc(flags)) == NULL)
299 if ((
r->
cons_idle = counter_u64_alloc(flags)) == NULL)
301 if ((
r->
cons_idle2 = counter_u64_alloc(flags)) == NULL)
321 for (i = 0; i < nitems(
r->
consumer); i++) {
350 uint16_t pidx_start, pidx_stop;
354 MPASS(items != NULL);
362 os.
state = atomic_load_64(&
r->state);
372 if (__predict_false(++nospc > 100)) {
380 os.
state = atomic_load_64(&
r->state);
401 if (atomic_fcmpset_64(&
r->state, &os.
state, ns.
state))
413 r->items[i] = *items++;
414 if (__predict_false(++i ==
r->size))
416 }
while (i != pidx_stop);
432 ns.
state = atomic_load_64(&
r->state);
438 r->items[i] = *items++;
439 if (__predict_false(++i ==
r->size))
441 }
while (i != pidx_stop);
447 os.
state = atomic_load_64(&
r->state);
458 }
while (atomic_fcmpset_rel_64(&
r->state, &os.
state, ns.
state) == 0);
483 os.
state = atomic_load_64(&
r->state);
488 if (atomic_cmpset_acq_64(&
r->state, os.
state, ns.
state)) {
497 if (atomic_cmpset_acq_64(&
r->state, os.
state, ns.
state)) {
511 for (i = 0; i < nitems(
r->
consumer); i++)
526 s.
state = atomic_load_64(&
r->state);
536 struct sysctl_oid_list *children)
538 struct sysctl_oid *oid;
540 oid = SYSCTL_ADD_NODE(ctx, children, OID_AUTO,
"mp_ring", CTLFLAG_RD |
541 CTLFLAG_MPSAFE, NULL,
"mp_ring statistics");
542 children = SYSCTL_CHILDREN(oid);
544 SYSCTL_ADD_U64(ctx, children, OID_AUTO,
"state", CTLFLAG_RD,
545 __DEVOLATILE(uint64_t *, &
r->state), 0,
"ring state");
546 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
"dropped", CTLFLAG_RD,
548 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
"consumed",
549 CTLFLAG_RD, &
r->
consumed,
"# of items consumed");
550 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
"fast_consumer",
552 "# of times producer became consumer (fast)");
553 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
"consumer2",
555 "# of times producer became consumer (2)");
556 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
"consumer3",
558 "# of times producer became consumer (3)");
559 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
"takeovers",
561 "# of times producer took over from another consumer.");
562 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
"not_consumer",
564 "# of times producer did not become consumer");
565 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
"abdications",
566 CTLFLAG_RD, &
r->
abdications,
"# of consumer abdications");
567 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
"stalls",
568 CTLFLAG_RD, &
r->
stalls,
"# of consumer stalls");
569 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
"cons_idle",
571 "# of times consumer ran fully to completion");
572 SYSCTL_ADD_COUNTER_U64(ctx, children, OID_AUTO,
"cons_idle2",
574 "# of times consumer idled when another enqueue was in progress");
counter_u64_t not_consumer
counter_u64_t consumer[4]
ring_can_drain_t can_drain
counter_u64_t abdications
static uint16_t space_available(struct mp_ring *r, union ring_state s)
int mp_ring_alloc(struct mp_ring **pr, int size, void *cookie, ring_drain_t drain, ring_can_drain_t can_drain, struct malloc_type *mt, struct mtx *lck, int flags)
void mp_ring_check_drainage(struct mp_ring *r, int budget)
void mp_ring_sysctls(struct mp_ring *r, struct sysctl_ctx_list *ctx, struct sysctl_oid_list *children)
bool mp_ring_is_idle(struct mp_ring *r)
int mp_ring_enqueue(struct mp_ring *r, void **items, int n, int budget)
static void drain_ring(struct mp_ring *r, int budget)
static void drain_txpkts(struct mp_ring *r, union ring_state os, int budget)
void mp_ring_free(struct mp_ring *r)
void mp_ring_reset_stats(struct mp_ring *r)
static uint16_t increment_idx(struct mp_ring *r, uint16_t idx, uint16_t n)
u_int(* ring_can_drain_t)(struct mp_ring *)
u_int(* ring_drain_t)(struct mp_ring *, u_int, u_int, bool *)