DPDK  25.11.0
rte_ring_rts_elem_pvt.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_RTS_ELEM_PVT_H_
11 #define _RTE_RING_RTS_ELEM_PVT_H_
12 
24 static __rte_always_inline void
25 __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
26 {
27  union __rte_ring_rts_poscnt h, ot, nt;
28 
29  /*
30  * If there are other enqueues/dequeues in progress that
31  * might preceded us, then don't update tail with new value.
32  */
33 
34  /*
35  * A0 = {A0.a, A0.b}: Synchronizes with the CAS at R0.
36  * The CAS at R0 in same typed thread establishes a happens-before
37  * relationship with this load acquire. Ensures that this thread
38  * observes the same or later values for h.raw/h.val.cnt
39  * observed by the other thread when it updated ht->tail.raw.
40  * If not, ht->tail.raw may get updated out of sync (e.g. getting
41  * updated to the same value twice). A0.a makes sure this condition
42  * holds when CAS succeeds and A0.b when it fails.
43  */
44  /* A0.a */
45  ot.raw = rte_atomic_load_explicit(&ht->tail.raw, rte_memory_order_acquire);
46 
47  do {
48  /* on 32-bit systems we have to do atomic read here */
49  h.raw = rte_atomic_load_explicit(&ht->head.raw, rte_memory_order_relaxed);
50 
51  nt.raw = ot.raw;
52  if (++nt.val.cnt == h.val.cnt)
53  nt.val.pos = h.val.pos;
54  /*
55  * R0: Synchronizes with A2 of a different thread of the opposite type and A0.b
56  * of a different thread of the same type.
57  */
58  /* A0.b */
59  } while (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
60  (uint64_t *)(uintptr_t)&ot.raw, nt.raw,
61  rte_memory_order_release, rte_memory_order_acquire) == 0);
62 }
63 
68 static __rte_always_inline union __rte_ring_rts_poscnt
69 __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
70  rte_memory_order memorder)
71 {
72  union __rte_ring_rts_poscnt h;
73  uint32_t max = ht->htd_max;
74 
75  h.raw = rte_atomic_load_explicit(&ht->head.raw, memorder);
76 
77  while (h.val.pos - ht->tail.val.pos > max) {
78  rte_pause();
79  h.raw = rte_atomic_load_explicit(&ht->head.raw, memorder);
80  }
81 
82  return h;
83 }
84 
109 static __rte_always_inline uint32_t
110 __rte_ring_rts_move_head(struct rte_ring_rts_headtail *d,
111  const struct rte_ring_headtail *s, uint32_t capacity, uint32_t num,
112  enum rte_ring_queue_behavior behavior, uint32_t *old_head,
113  uint32_t *entries)
114 {
115  uint32_t n, stail;
116  union __rte_ring_rts_poscnt nh, oh;
117 
118  do {
119  /* Reset n to the initial burst count */
120  n = num;
121 
122  /*
123  * wait for prod head/tail distance,
124  * make sure that we read prod head *before*
125  * reading cons tail.
126  */
127  /*
128  * A1 Synchronizes with the CAS at R1.
129  * Establishes a happens-before relationship with a thread of the same
130  * type that released the ht.raw, ensuring this thread observes all of
131  * its memory effects needed to maintain a safe partial order.
132  */
133  oh = __rte_ring_rts_head_wait(d, rte_memory_order_acquire);
134 
135  /*
136  * A2: Establish a synchronizes-with edge using a store-release at R0.
137  * This ensures that all memory effects from the preceding opposing
138  * thread are observed.
139  */
140  stail = rte_atomic_load_explicit(&s->tail, rte_memory_order_acquire);
141 
142  /*
143  * The subtraction is done between two unsigned 32bits value
144  * (the result is always modulo 32 bits even if we have
145  * *old_head > cons_tail). So 'entries' is always between 0
146  * and capacity (which is < size).
147  */
148  *entries = capacity + stail - oh.val.pos;
149 
150  /* check that we have enough room in ring */
151  if (unlikely(n > *entries))
152  n = (behavior == RTE_RING_QUEUE_FIXED) ?
153  0 : *entries;
154 
155  if (n == 0)
156  break;
157 
158  nh.val.pos = oh.val.pos + n;
159  nh.val.cnt = oh.val.cnt + 1;
160 
161  /*
162  * R1: Establishes a synchronizes-with edge with the load-acquire
163  * of ht.raw at A1. Ensures that the store-release to the tail by
164  * this thread, if it was of the opposite type, becomes
165  * visible to another thread of the current type. That thread will
166  * then observe the updates in the same order, keeping a safe
167  * partial order.
168  */
169  } while (rte_atomic_compare_exchange_strong_explicit(&d->head.raw,
170  (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
171  rte_memory_order_release,
172  rte_memory_order_relaxed) == 0);
173 
174  *old_head = oh.val.pos;
175  return n;
176 }
177 
181 static __rte_always_inline uint32_t
182 __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
183  enum rte_ring_queue_behavior behavior, uint32_t *old_head,
184  uint32_t *free_entries)
185 {
186  return __rte_ring_rts_move_head(&r->rts_prod, &r->cons,
187  r->capacity, num, behavior, old_head, free_entries);
188 }
189 
193 static __rte_always_inline unsigned int
194 __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
195  enum rte_ring_queue_behavior behavior, uint32_t *old_head,
196  uint32_t *entries)
197 {
198  return __rte_ring_rts_move_head(&r->rts_cons, &r->prod,
199  0, num, behavior, old_head, entries);
200 }
201 
224 static __rte_always_inline unsigned int
225 __rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table,
226  uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
227  uint32_t *free_space)
228 {
229  uint32_t free, head;
230 
231  n = __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
232 
233  if (n != 0) {
234  __rte_ring_enqueue_elems(r, head, obj_table, esize, n);
235  __rte_ring_rts_update_tail(&r->rts_prod);
236  }
237 
238  if (free_space != NULL)
239  *free_space = free - n;
240  return n;
241 }
242 
265 static __rte_always_inline unsigned int
266 __rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table,
267  uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
268  uint32_t *available)
269 {
270  uint32_t entries, head;
271 
272  n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
273 
274  if (n != 0) {
275  __rte_ring_dequeue_elems(r, head, obj_table, esize, n);
276  __rte_ring_rts_update_tail(&r->rts_cons);
277  }
278 
279  if (available != NULL)
280  *available = entries - n;
281  return n;
282 }
283 
284 #endif /* _RTE_RING_RTS_ELEM_PVT_H_ */
#define unlikely(x)
#define __rte_always_inline
Definition: rte_common.h:490
static void rte_pause(void)
rte_ring_queue_behavior
Definition: rte_ring_core.h:40
@ RTE_RING_QUEUE_FIXED
Definition: rte_ring_core.h:42
uint32_t capacity