1/*-
2 * Copyright (c) 2014 Chelsio Communications, Inc.
3 * All rights reserved.
4 * Written by: Navdeep Parhar <np@FreeBSD.org>
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 *    notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 *    notice, this list of conditions and the following disclaimer in the
13 *    documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 */
27
28#include <sys/cdefs.h>
29__FBSDID("$FreeBSD$");
30
31#include <sys/types.h>
32#include <sys/param.h>
33#include <sys/systm.h>
34#include <sys/counter.h>
35#include <sys/lock.h>
36#include <sys/mutex.h>
37#include <sys/malloc.h>
38#include <machine/cpu.h>
39
40#if defined(__i386__)
41#define atomic_cmpset_acq_64 atomic_cmpset_64
42#define atomic_cmpset_rel_64 atomic_cmpset_64
43#endif
44
45#include <net/mp_ring.h>
46
47union ring_state {
48	struct {
49		uint16_t pidx_head;
50		uint16_t pidx_tail;
51		uint16_t cidx;
52		uint16_t flags;
53	};
54	uint64_t state;
55};
56
57enum {
58	IDLE = 0,	/* consumer ran to completion, nothing more to do. */
59	BUSY,		/* consumer is running already, or will be shortly. */
60	STALLED,	/* consumer stopped due to lack of resources. */
61	ABDICATED,	/* consumer stopped even though there was work to be
62			   done because it wants another thread to take over. */
63};
64
65static inline uint16_t
66space_available(struct ifmp_ring *r, union ring_state s)
67{
68	uint16_t x = r->size - 1;
69
70	if (s.cidx == s.pidx_head)
71		return (x);
72	else if (s.cidx > s.pidx_head)
73		return (s.cidx - s.pidx_head - 1);
74	else
75		return (x - s.pidx_head + s.cidx);
76}
77
78static inline uint16_t
79increment_idx(struct ifmp_ring *r, uint16_t idx, uint16_t n)
80{
81	int x = r->size - idx;
82
83	MPASS(x > 0);
84	return (x > n ? idx + n : n - x);
85}
86
87/* Consumer is about to update the ring's state to s */
88static inline uint16_t
89state_to_flags(union ring_state s, int abdicate)
90{
91
92	if (s.cidx == s.pidx_tail)
93		return (IDLE);
94	else if (abdicate && s.pidx_tail != s.pidx_head)
95		return (ABDICATED);
96
97	return (BUSY);
98}
99
100#ifdef NO_64BIT_ATOMICS
101static void
102drain_ring_locked(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
103{
104	union ring_state ns;
105	int n, pending, total;
106	uint16_t cidx = os.cidx;
107	uint16_t pidx = os.pidx_tail;
108
109	MPASS(os.flags == BUSY);
110	MPASS(cidx != pidx);
111
112	if (prev == IDLE)
113		counter_u64_add(r->starts, 1);
114	pending = 0;
115	total = 0;
116
117	while (cidx != pidx) {
118
119		/* Items from cidx to pidx are available for consumption. */
120		n = r->drain(r, cidx, pidx);
121		if (n == 0) {
122			os.state = ns.state = r->state;
123			ns.cidx = cidx;
124			ns.flags = STALLED;
125			r->state = ns.state;
126			if (prev != STALLED)
127				counter_u64_add(r->stalls, 1);
128			else if (total > 0) {
129				counter_u64_add(r->restarts, 1);
130				counter_u64_add(r->stalls, 1);
131			}
132			break;
133		}
134		cidx = increment_idx(r, cidx, n);
135		pending += n;
136		total += n;
137
138		/*
139		 * We update the cidx only if we've caught up with the pidx, the
140		 * real cidx is getting too far ahead of the one visible to
141		 * everyone else, or we have exceeded our budget.
142		 */
143		if (cidx != pidx && pending < 64 && total < budget)
144			continue;
145
146		os.state = ns.state = r->state;
147		ns.cidx = cidx;
148		ns.flags = state_to_flags(ns, total >= budget);
149		r->state = ns.state;
150
151		if (ns.flags == ABDICATED)
152			counter_u64_add(r->abdications, 1);
153		if (ns.flags != BUSY) {
154			/* Wrong loop exit if we're going to stall. */
155			MPASS(ns.flags != STALLED);
156			if (prev == STALLED) {
157				MPASS(total > 0);
158				counter_u64_add(r->restarts, 1);
159			}
160			break;
161		}
162
163		/*
164		 * The acquire style atomic above guarantees visibility of items
165		 * associated with any pidx change that we notice here.
166		 */
167		pidx = ns.pidx_tail;
168		pending = 0;
169	}
170}
171#else
172/*
173 * Caller passes in a state, with a guarantee that there is work to do and that
174 * all items up to the pidx_tail in the state are visible.
175 */
176static void
177drain_ring_lockless(struct ifmp_ring *r, union ring_state os, uint16_t prev, int budget)
178{
179	union ring_state ns;
180	int n, pending, total;
181	uint16_t cidx = os.cidx;
182	uint16_t pidx = os.pidx_tail;
183
184	MPASS(os.flags == BUSY);
185	MPASS(cidx != pidx);
186
187	if (prev == IDLE)
188		counter_u64_add(r->starts, 1);
189	pending = 0;
190	total = 0;
191
192	while (cidx != pidx) {
193
194		/* Items from cidx to pidx are available for consumption. */
195		n = r->drain(r, cidx, pidx);
196		if (n == 0) {
197			critical_enter();
198			do {
199				os.state = ns.state = r->state;
200				ns.cidx = cidx;
201				ns.flags = STALLED;
202			} while (atomic_cmpset_64(&r->state, os.state,
203			    ns.state) == 0);
204			critical_exit();
205			if (prev != STALLED)
206				counter_u64_add(r->stalls, 1);
207			else if (total > 0) {
208				counter_u64_add(r->restarts, 1);
209				counter_u64_add(r->stalls, 1);
210			}
211			break;
212		}
213		cidx = increment_idx(r, cidx, n);
214		pending += n;
215		total += n;
216
217		/*
218		 * We update the cidx only if we've caught up with the pidx, the
219		 * real cidx is getting too far ahead of the one visible to
220		 * everyone else, or we have exceeded our budget.
221		 */
222		if (cidx != pidx && pending < 64 && total < budget)
223			continue;
224		critical_enter();
225		do {
226			os.state = ns.state = r->state;
227			ns.cidx = cidx;
228			ns.flags = state_to_flags(ns, total >= budget);
229		} while (atomic_cmpset_acq_64(&r->state, os.state, ns.state) == 0);
230		critical_exit();
231
232		if (ns.flags == ABDICATED)
233			counter_u64_add(r->abdications, 1);
234		if (ns.flags != BUSY) {
235			/* Wrong loop exit if we're going to stall. */
236			MPASS(ns.flags != STALLED);
237			if (prev == STALLED) {
238				MPASS(total > 0);
239				counter_u64_add(r->restarts, 1);
240			}
241			break;
242		}
243
244		/*
245		 * The acquire style atomic above guarantees visibility of items
246		 * associated with any pidx change that we notice here.
247		 */
248		pidx = ns.pidx_tail;
249		pending = 0;
250	}
251}
252#endif
253
254int
255ifmp_ring_alloc(struct ifmp_ring **pr, int size, void *cookie, mp_ring_drain_t drain,
256    mp_ring_can_drain_t can_drain, struct malloc_type *mt, int flags)
257{
258	struct ifmp_ring *r;
259
260	/* All idx are 16b so size can be 65536 at most */
261	if (pr == NULL || size < 2 || size > 65536 || drain == NULL ||
262	    can_drain == NULL)
263		return (EINVAL);
264	*pr = NULL;
265	flags &= M_NOWAIT | M_WAITOK;
266	MPASS(flags != 0);
267
268	r = malloc(__offsetof(struct ifmp_ring, items[size]), mt, flags | M_ZERO);
269	if (r == NULL)
270		return (ENOMEM);
271	r->size = size;
272	r->cookie = cookie;
273	r->mt = mt;
274	r->drain = drain;
275	r->can_drain = can_drain;
276	r->enqueues = counter_u64_alloc(flags);
277	r->drops = counter_u64_alloc(flags);
278	r->starts = counter_u64_alloc(flags);
279	r->stalls = counter_u64_alloc(flags);
280	r->restarts = counter_u64_alloc(flags);
281	r->abdications = counter_u64_alloc(flags);
282	if (r->enqueues == NULL || r->drops == NULL || r->starts == NULL ||
283	    r->stalls == NULL || r->restarts == NULL ||
284	    r->abdications == NULL) {
285		ifmp_ring_free(r);
286		return (ENOMEM);
287	}
288
289	*pr = r;
290#ifdef NO_64BIT_ATOMICS
291	mtx_init(&r->lock, "mp_ring lock", NULL, MTX_DEF);
292#endif
293	return (0);
294}
295
296void
297ifmp_ring_free(struct ifmp_ring *r)
298{
299
300	if (r == NULL)
301		return;
302
303	if (r->enqueues != NULL)
304		counter_u64_free(r->enqueues);
305	if (r->drops != NULL)
306		counter_u64_free(r->drops);
307	if (r->starts != NULL)
308		counter_u64_free(r->starts);
309	if (r->stalls != NULL)
310		counter_u64_free(r->stalls);
311	if (r->restarts != NULL)
312		counter_u64_free(r->restarts);
313	if (r->abdications != NULL)
314		counter_u64_free(r->abdications);
315
316	free(r, r->mt);
317}
318
319/*
320 * Enqueue n items and maybe drain the ring for some time.
321 *
322 * Returns an errno.
323 */
324#ifdef NO_64BIT_ATOMICS
325int
326ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
327{
328	union ring_state os, ns;
329	uint16_t pidx_start, pidx_stop;
330	int i;
331
332	MPASS(items != NULL);
333	MPASS(n > 0);
334
335	mtx_lock(&r->lock);
336	/*
337	 * Reserve room for the new items.  Our reservation, if successful, is
338	 * from 'pidx_start' to 'pidx_stop'.
339	 */
340	os.state = r->state;
341	if (n >= space_available(r, os)) {
342		counter_u64_add(r->drops, n);
343		MPASS(os.flags != IDLE);
344		mtx_unlock(&r->lock);
345		if (os.flags == STALLED)
346			ifmp_ring_check_drainage(r, 0);
347		return (ENOBUFS);
348	}
349	ns.state = os.state;
350	ns.pidx_head = increment_idx(r, os.pidx_head, n);
351	r->state = ns.state;
352	pidx_start = os.pidx_head;
353	pidx_stop = ns.pidx_head;
354
355	/*
356	 * Wait for other producers who got in ahead of us to enqueue their
357	 * items, one producer at a time.  It is our turn when the ring's
358	 * pidx_tail reaches the beginning of our reservation (pidx_start).
359	 */
360	while (ns.pidx_tail != pidx_start) {
361		cpu_spinwait();
362		ns.state = r->state;
363	}
364
365	/* Now it is our turn to fill up the area we reserved earlier. */
366	i = pidx_start;
367	do {
368		r->items[i] = *items++;
369		/*HAIKU*/KASSERT((r->items[i] == NULL) || ((uintptr_t)(r->items[i]) > 1024UL), ("is %p", r->items[i]));
370		if (__predict_false(++i == r->size))
371			i = 0;
372	} while (i != pidx_stop);
373
374	/*
375	 * Update the ring's pidx_tail.  The release style atomic guarantees
376	 * that the items are visible to any thread that sees the updated pidx.
377	 */
378	os.state = ns.state = r->state;
379	ns.pidx_tail = pidx_stop;
380	if (abdicate) {
381		if (os.flags == IDLE)
382			ns.flags = ABDICATED;
383	}
384	else {
385		ns.flags = BUSY;
386	}
387	r->state = ns.state;
388	counter_u64_add(r->enqueues, n);
389
390	if (!abdicate) {
391		/*
392		 * Turn into a consumer if some other thread isn't active as a consumer
393		 * already.
394		 */
395		if (os.flags != BUSY)
396			drain_ring_locked(r, ns, os.flags, budget);
397	}
398
399	mtx_unlock(&r->lock);
400	return (0);
401}
402
403#else
404int
405ifmp_ring_enqueue(struct ifmp_ring *r, void **items, int n, int budget, int abdicate)
406{
407	union ring_state os, ns;
408	uint16_t pidx_start, pidx_stop;
409	int i;
410
411	MPASS(items != NULL);
412	MPASS(n > 0);
413
414	/*
415	 * Reserve room for the new items.  Our reservation, if successful, is
416	 * from 'pidx_start' to 'pidx_stop'.
417	 */
418	for (;;) {
419		os.state = r->state;
420		if (n >= space_available(r, os)) {
421			counter_u64_add(r->drops, n);
422			MPASS(os.flags != IDLE);
423			if (os.flags == STALLED)
424				ifmp_ring_check_drainage(r, 0);
425			return (ENOBUFS);
426		}
427		ns.state = os.state;
428		ns.pidx_head = increment_idx(r, os.pidx_head, n);
429		critical_enter();
430		if (atomic_cmpset_64(&r->state, os.state, ns.state))
431			break;
432		critical_exit();
433		cpu_spinwait();
434	}
435	pidx_start = os.pidx_head;
436	pidx_stop = ns.pidx_head;
437
438	/*
439	 * Wait for other producers who got in ahead of us to enqueue their
440	 * items, one producer at a time.  It is our turn when the ring's
441	 * pidx_tail reaches the beginning of our reservation (pidx_start).
442	 */
443	while (ns.pidx_tail != pidx_start) {
444		cpu_spinwait();
445		ns.state = r->state;
446	}
447
448	/* Now it is our turn to fill up the area we reserved earlier. */
449	i = pidx_start;
450	do {
451		r->items[i] = *items++;
452		if (__predict_false(++i == r->size))
453			i = 0;
454	} while (i != pidx_stop);
455
456	/*
457	 * Update the ring's pidx_tail.  The release style atomic guarantees
458	 * that the items are visible to any thread that sees the updated pidx.
459	 */
460	do {
461		os.state = ns.state = r->state;
462		ns.pidx_tail = pidx_stop;
463		if (abdicate) {
464			if (os.flags == IDLE)
465				ns.flags = ABDICATED;
466		}
467		else {
468			ns.flags = BUSY;
469		}
470	} while (atomic_cmpset_rel_64(&r->state, os.state, ns.state) == 0);
471	critical_exit();
472	counter_u64_add(r->enqueues, n);
473
474	if (!abdicate) {
475		/*
476		 * Turn into a consumer if some other thread isn't active as a consumer
477		 * already.
478		 */
479		if (os.flags != BUSY)
480			drain_ring_lockless(r, ns, os.flags, budget);
481	}
482
483	return (0);
484}
485#endif
486
487void
488ifmp_ring_check_drainage(struct ifmp_ring *r, int budget)
489{
490	union ring_state os, ns;
491
492	os.state = r->state;
493	if ((os.flags != STALLED && os.flags != ABDICATED) ||	// Only continue in STALLED and ABDICATED
494	    os.pidx_head != os.pidx_tail ||			// Require work to be available
495	    (os.flags != ABDICATED && r->can_drain(r) == 0))	// Can either drain, or everyone left
496		return;
497
498	MPASS(os.cidx != os.pidx_tail);	/* implied by STALLED */
499	ns.state = os.state;
500	ns.flags = BUSY;
501
502
503#ifdef NO_64BIT_ATOMICS
504	mtx_lock(&r->lock);
505	if (r->state != os.state) {
506		mtx_unlock(&r->lock);
507		return;
508	}
509	r->state = ns.state;
510	drain_ring_locked(r, ns, os.flags, budget);
511	mtx_unlock(&r->lock);
512#else
513	/*
514	 * The acquire style atomic guarantees visibility of items associated
515	 * with the pidx that we read here.
516	 */
517	if (!atomic_cmpset_acq_64(&r->state, os.state, ns.state))
518		return;
519
520
521	drain_ring_lockless(r, ns, os.flags, budget);
522#endif
523}
524
525void
526ifmp_ring_reset_stats(struct ifmp_ring *r)
527{
528
529	counter_u64_zero(r->enqueues);
530	counter_u64_zero(r->drops);
531	counter_u64_zero(r->starts);
532	counter_u64_zero(r->stalls);
533	counter_u64_zero(r->restarts);
534	counter_u64_zero(r->abdications);
535}
536
537int
538ifmp_ring_is_idle(struct ifmp_ring *r)
539{
540	union ring_state s;
541
542	s.state = r->state;
543	if (s.pidx_head == s.pidx_tail && s.pidx_tail == s.cidx &&
544	    s.flags == IDLE)
545		return (1);
546
547	return (0);
548}
549
550int
551ifmp_ring_is_stalled(struct ifmp_ring *r)
552{
553	union ring_state s;
554
555	s.state = r->state;
556	if (s.pidx_head == s.pidx_tail && s.flags == STALLED)
557		return (1);
558
559	return (0);
560}
561