-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
Copy pathStripedMpscBuffer.cs
428 lines (360 loc) · 12.3 KB
/
StripedMpscBuffer.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
using System;
using System.Linq;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Numerics;
using System.Runtime.CompilerServices;
namespace Orleans.Runtime.Utilities;
/// <summary>
/// Provides a striped bounded buffer. Add operations use thread ID to index into
/// the underlying array of buffers, and if TryAdd is contended the thread ID is
/// rehashed to select a different buffer to retry up to 3 times. Using this approach
/// writes scale linearly with number of concurrent threads.
/// </summary>
/// <remarks>
/// Note: this implementation was originally authored by Alex Peck and was copied from BitFaster.Caching: https://github.com/bitfaster/BitFaster.Caching/blob/275b9b072c0218e20f549b769cd183df1374e2ee/BitFaster.Caching/Buffers/StripedMpscBuffer.cs
/// </remarks>
[DebuggerDisplay("Count = {Count}/{Capacity}")]
internal sealed class StripedMpscBuffer<T> where T : class
{
private const int MaxAttempts = 3;
private readonly MpscBoundedBuffer<T>[] _buffers;
/// <summary>
/// Initializes a new instance of the StripedMpscBuffer class with the specified stripe count and buffer size.
/// </summary>
/// <param name="stripeCount">The stripe count.</param>
/// <param name="bufferSize">The buffer size.</param>
public StripedMpscBuffer(int stripeCount, int bufferSize)
{
_buffers = new MpscBoundedBuffer<T>[stripeCount];
for (var i = 0; i < stripeCount; i++)
{
_buffers[i] = new MpscBoundedBuffer<T>(bufferSize);
}
}
/// <summary>
/// Gets the number of items contained in the buffer.
/// </summary>
public int Count => _buffers.Sum(b => b.Count);
/// <summary>
/// The bounded capacity.
/// </summary>
public int Capacity => _buffers.Length * _buffers[0].Capacity;
/// <summary>
/// Drains the buffer into the specified array.
/// </summary>
/// <param name="outputBuffer">The output buffer</param>
/// <returns>The number of items written to the output buffer.</returns>
/// <remarks>
/// Thread safe for single try take/drain + multiple try add.
/// </remarks>
public int DrainTo(T[] outputBuffer) => DrainTo(outputBuffer.AsSpan());
/// <summary>
/// Drains the buffer into the specified span.
/// </summary>
/// <param name="outputBuffer">The output buffer</param>
/// <returns>The number of items written to the output buffer.</returns>
/// <remarks>
/// Thread safe for single try take/drain + multiple try add.
/// </remarks>
public int DrainTo(Span<T> outputBuffer)
{
var count = 0;
for (var i = 0; i < _buffers.Length; i++)
{
if (count == outputBuffer.Length)
{
break;
}
var segment = outputBuffer[count..];
count += _buffers[i].DrainTo(segment);
}
return count;
}
/// <summary>
/// Tries to add the specified item.
/// </summary>
/// <param name="item">The item to be added.</param>
/// <returns>A BufferStatus value indicating whether the operation succeeded.</returns>
/// <remarks>
/// Thread safe.
/// </remarks>
public BufferStatus TryAdd(T item)
{
var z = BitOps.Mix64((ulong)Environment.CurrentManagedThreadId);
var inc = (int)(z >> 32) | 1;
var h = (int)z;
var mask = _buffers.Length - 1;
var result = BufferStatus.Empty;
for (var i = 0; i < MaxAttempts; i++)
{
result = _buffers[h & mask].TryAdd(item);
if (result == BufferStatus.Success)
{
break;
}
h += inc;
}
return result;
}
/// <summary>
/// Removes all values from the buffer.
/// </summary>
/// <remarks>
/// Not thread safe.
/// </remarks>
public void Clear()
{
for (var i = 0; i < _buffers.Length; i++)
{
_buffers[i].Clear();
}
}
}
/// <summary>
/// Provides a multi-producer, single-consumer thread-safe ring buffer. When the buffer is full,
/// TryAdd fails and returns false. When the buffer is empty, TryTake fails and returns false.
/// </summary>
/// Based on the BoundedBuffer class in the Caffeine library by [email protected] (Ben Manes).
[DebuggerDisplay("Count = {Count}/{Capacity}")]
internal sealed class MpscBoundedBuffer<T> where T : class
{
private T[] _buffer;
private readonly int _mask;
private PaddedHeadAndTail _headAndTail; // mutable struct, don't mark readonly
/// <summary>
/// Initializes a new instance of the MpscBoundedBuffer class with the specified bounded capacity.
/// </summary>
/// <param name="boundedLength">The bounded length.</param>
/// <exception cref="ArgumentOutOfRangeException"></exception>
public MpscBoundedBuffer(int boundedLength)
{
ArgumentOutOfRangeException.ThrowIfLessThan(boundedLength, 0);
// must be power of 2 to use & slotsMask instead of %
boundedLength = BitOps.CeilingPowerOfTwo(boundedLength);
_buffer = new T[boundedLength];
_mask = boundedLength - 1;
}
/// <summary>
/// The bounded capacity.
/// </summary>
public int Capacity => _buffer.Length;
/// <summary>
/// Gets the number of items contained in the buffer.
/// </summary>
public int Count
{
get
{
var spinner = new SpinWait();
while (true)
{
var headNow = Volatile.Read(ref _headAndTail.Head);
var tailNow = Volatile.Read(ref _headAndTail.Tail);
if (headNow == Volatile.Read(ref _headAndTail.Head) &&
tailNow == Volatile.Read(ref _headAndTail.Tail))
{
return GetCount(headNow, tailNow);
}
spinner.SpinOnce();
}
}
}
private int GetCount(int head, int tail)
{
if (head != tail)
{
head &= _mask;
tail &= _mask;
return head < tail ? tail - head : _buffer.Length - head + tail;
}
return 0;
}
/// <summary>
/// Tries to add the specified item.
/// </summary>
/// <param name="item">The item to be added.</param>
/// <returns>A BufferStatus value indicating whether the operation succeeded.</returns>
/// <remarks>
/// Thread safe.
/// </remarks>
public BufferStatus TryAdd(T item)
{
int head = Volatile.Read(ref _headAndTail.Head);
int tail = _headAndTail.Tail;
int size = tail - head;
if (size >= _buffer.Length)
{
return BufferStatus.Full;
}
if (Interlocked.CompareExchange(ref _headAndTail.Tail, tail + 1, tail) == tail)
{
int index = tail & _mask;
Volatile.Write(ref _buffer[index], item);
return BufferStatus.Success;
}
return BufferStatus.Contended;
}
/// <summary>
/// Tries to remove an item.
/// </summary>
/// <param name="item">The item to be removed.</param>
/// <returns>A BufferStatus value indicating whether the operation succeeded.</returns>
/// <remarks>
/// Thread safe for single try take/drain + multiple try add.
/// </remarks>
public BufferStatus TryTake(out T item)
{
int head = Volatile.Read(ref _headAndTail.Head);
int tail = _headAndTail.Tail;
int size = tail - head;
if (size == 0)
{
item = default;
return BufferStatus.Empty;
}
int index = head & _mask;
item = Volatile.Read(ref _buffer[index]);
if (item == null)
{
// not published yet
return BufferStatus.Contended;
}
_buffer[index] = null;
Volatile.Write(ref _headAndTail.Head, ++head);
return BufferStatus.Success;
}
/// <summary>
/// Drains the buffer into the specified array segment.
/// </summary>
/// <param name="output">The output buffer</param>
/// <returns>The number of items written to the output buffer.</returns>
/// <remarks>
/// Thread safe for single try take/drain + multiple try add.
/// </remarks>
public int DrainTo(ArraySegment<T> output) => DrainTo(output.AsSpan());
/// <summary>
/// Drains the buffer into the specified span.
/// </summary>
/// <param name="output">The output buffer</param>
/// <returns>The number of items written to the output buffer.</returns>
/// <remarks>
/// Thread safe for single try take/drain + multiple try add.
/// </remarks>
public int DrainTo(Span<T> output) => DrainToImpl(output);
// use an outer wrapper method to force the JIT to inline the inner adaptor methods
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private int DrainToImpl(Span<T> output)
{
int head = Volatile.Read(ref _headAndTail.Head);
int tail = _headAndTail.Tail;
int size = tail - head;
if (size == 0)
{
return 0;
}
var localBuffer = _buffer.AsSpan();
int outCount = 0;
do
{
int index = head & _mask;
T item = Volatile.Read(ref localBuffer[index]);
if (item == null)
{
// not published yet
break;
}
localBuffer[index] = null;
Write(output, outCount++, item);
head++;
}
while (head != tail && outCount < Length(output));
_headAndTail.Head = head;
return outCount;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static void Write(Span<T> output, int index, T item) => output[index] = item;
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static int Length(Span<T> output) => output.Length;
/// <summary>
/// Removes all values from the buffer.
/// </summary>
/// <remarks>
/// Not thread safe.
/// </remarks>
public void Clear()
{
_buffer = new T[_buffer.Length];
_headAndTail = new PaddedHeadAndTail();
}
}
/// <summary>
/// Specifies the status of buffer operations.
/// </summary>
internal enum BufferStatus
{
/// <summary>
/// The buffer is full.
/// </summary>
Full,
/// <summary>
/// The buffer is empty.
/// </summary>
Empty,
/// <summary>
/// The buffer operation succeeded.
/// </summary>
Success,
/// <summary>
/// The buffer operation was contended.
/// </summary>
Contended,
}
/// <summary>
/// Provides utility methods for bit-twiddling operations.
/// </summary>
internal static class BitOps
{
/// <summary>
/// Calculate the smallest power of 2 greater than the input parameter.
/// </summary>
/// <param name="x">The input parameter.</param>
/// <returns>Smallest power of two greater than or equal to x.</returns>
public static int CeilingPowerOfTwo(int x) => (int)CeilingPowerOfTwo((uint)x);
/// <summary>
/// Calculate the smallest power of 2 greater than the input parameter.
/// </summary>
/// <param name="x">The input parameter.</param>
/// <returns>Smallest power of two greater than or equal to x.</returns>
public static uint CeilingPowerOfTwo(uint x) => 1u << -BitOperations.LeadingZeroCount(x - 1);
/// <summary>
/// Computes Stafford variant 13 of 64-bit mix function.
/// </summary>
/// <param name="z">The input parameter.</param>
/// <returns>A bit mix of the input parameter.</returns>
/// <remarks>
/// See http://zimbry.blogspot.com/2011/09/better-bit-mixing-improving-on.html
/// </remarks>
public static ulong Mix64(ulong z)
{
z = (z ^ z >> 30) * 0xbf58476d1ce4e5b9L;
z = (z ^ z >> 27) * 0x94d049bb133111ebL;
return z ^ z >> 31;
}
}
[DebuggerDisplay("Head = {Head}, Tail = {Tail}")]
[StructLayout(LayoutKind.Explicit, Size = 3 * Padding.CACHE_LINE_SIZE)] // padding before/between/after fields
internal struct PaddedHeadAndTail
{
[FieldOffset(1 * Padding.CACHE_LINE_SIZE)] public int Head;
[FieldOffset(2 * Padding.CACHE_LINE_SIZE)] public int Tail;
}
internal class Padding
{
#if TARGET_ARM64 || TARGET_LOONGARCH64
internal const int CACHE_LINE_SIZE = 128;
#else
internal const int CACHE_LINE_SIZE = 64;
#endif
}