Commit bbc1ba22 authored by Sam Lantinga's avatar Sam Lantinga

Added a FIFO test to the atomic test suite.

This is really useful because we might be able to use something like this
for the SDL event queue.
parent fc5dc0dd
......@@ -44,6 +44,9 @@
* subtle issues that can arise here:
* http://msdn.microsoft.com/en-us/library/ee418650%28v=vs.85%29.aspx
*
* There's also lots of good information here:
* http://www.1024cores.net/home/lock-free-algorithms
*
* These operations may or may not actually be implemented using
* processor specific atomic operations. When possible they are
* implemented as true processor specific atomic operations. When that
......
......@@ -231,10 +231,403 @@ void RunEpicTest()
/* End atomic operation test */
/**************************************************************************/
/**************************************************************************/
/* Lock-free FIFO test */
#define NUM_READERS 4
#define NUM_WRITERS 4
#define EVENTS_PER_WRITER 1000000
/* A decent guess for the size of a cache line on this architecture */
#define CACHELINE 64
/* The number of entries must be a power of 2 */
#define MAX_ENTRIES 256
#define WRAP_MASK (MAX_ENTRIES-1)
typedef struct
{
SDL_atomic_t sequence;
SDL_Event event;
} SDL_EventQueueEntry;
typedef struct
{
SDL_EventQueueEntry entries[MAX_ENTRIES];
char cache_pad1[CACHELINE-((sizeof(SDL_EventQueueEntry)*MAX_ENTRIES)%CACHELINE)];
SDL_atomic_t enqueue_pos;
char cache_pad2[CACHELINE-sizeof(SDL_atomic_t)];
SDL_atomic_t dequeue_pos;
char cache_pad3[CACHELINE-sizeof(SDL_atomic_t)];
SDL_bool active;
/* Only needed for the mutex test */
SDL_mutex *mutex;
} SDL_EventQueue;
static void InitEventQueue(SDL_EventQueue *queue)
{
int i;
for (i = 0; i < MAX_ENTRIES; ++i) {
SDL_AtomicSet(&queue->entries[i].sequence, i);
}
SDL_AtomicSet(&queue->enqueue_pos, 0);
SDL_AtomicSet(&queue->dequeue_pos, 0);
queue->active = SDL_TRUE;
}
static SDL_bool EnqueueEvent_LockFree(SDL_EventQueue *queue, const SDL_Event *event)
{
SDL_EventQueueEntry *entry;
unsigned queue_pos;
unsigned entry_seq;
int delta;
queue_pos = (unsigned)SDL_AtomicGet(&queue->enqueue_pos);
for ( ; ; ) {
entry = &queue->entries[queue_pos & WRAP_MASK];
entry_seq = (unsigned)SDL_AtomicGet(&entry->sequence);
delta = (int)(entry_seq - queue_pos);
if (delta == 0) {
/* The entry and the queue position match, try to increment the queue position */
if (SDL_AtomicCAS(&queue->enqueue_pos, (int)queue_pos, (int)(queue_pos+1))) {
break;
}
} else if (delta < 0) {
/* We ran into an old queue entry, which means it still needs to be dequeued */
return SDL_FALSE;
} else {
/* We ran into a new queue entry, get the new queue position */
queue_pos = (unsigned)SDL_AtomicGet(&queue->enqueue_pos);
}
}
/* We own the object, fill it! */
entry->event = *event;
SDL_AtomicSet(&entry->sequence, (int)(queue_pos + 1));
return SDL_TRUE;
}
static SDL_bool DequeueEvent_LockFree(SDL_EventQueue *queue, SDL_Event *event)
{
SDL_EventQueueEntry *entry;
unsigned queue_pos;
unsigned entry_seq;
int delta;
queue_pos = (unsigned)SDL_AtomicGet(&queue->dequeue_pos);
for ( ; ; ) {
entry = &queue->entries[queue_pos & WRAP_MASK];
entry_seq = (unsigned)SDL_AtomicGet(&entry->sequence);
delta = (int)(entry_seq - (queue_pos + 1));
if (delta == 0) {
/* The entry and the queue position match, try to increment the queue position */
if (SDL_AtomicCAS(&queue->dequeue_pos, (int)queue_pos, (int)(queue_pos+1))) {
break;
}
} else if (delta < 0) {
/* We ran into an old queue entry, which means we've hit empty */
return SDL_FALSE;
} else {
/* We ran into a new queue entry, get the new queue position */
queue_pos = (unsigned)SDL_AtomicGet(&queue->dequeue_pos);
}
}
/* We own the object, fill it! */
*event = entry->event;
SDL_AtomicSet(&entry->sequence, (int)(queue_pos+MAX_ENTRIES));
return SDL_TRUE;
}
static SDL_bool EnqueueEvent_Mutex(SDL_EventQueue *queue, const SDL_Event *event)
{
SDL_EventQueueEntry *entry;
unsigned queue_pos;
unsigned entry_seq;
int delta;
SDL_mutexP(queue->mutex);
queue_pos = (unsigned)queue->enqueue_pos.value;
entry = &queue->entries[queue_pos & WRAP_MASK];
entry_seq = (unsigned)entry->sequence.value;
delta = (int)(entry_seq - queue_pos);
if (delta == 0) {
++queue->enqueue_pos.value;
} else if (delta < 0) {
/* We ran into an old queue entry, which means it still needs to be dequeued */
SDL_mutexV(queue->mutex);
return SDL_FALSE;
} else {
printf("ERROR: mutex failed!\n");
}
/* We own the object, fill it! */
entry->event = *event;
entry->sequence.value = (int)(queue_pos + 1);
SDL_mutexV(queue->mutex);
return SDL_TRUE;
}
static SDL_bool DequeueEvent_Mutex(SDL_EventQueue *queue, SDL_Event *event)
{
SDL_EventQueueEntry *entry;
unsigned queue_pos;
unsigned entry_seq;
int delta;
SDL_mutexP(queue->mutex);
queue_pos = (unsigned)queue->dequeue_pos.value;
entry = &queue->entries[queue_pos & WRAP_MASK];
entry_seq = (unsigned)entry->sequence.value;
delta = (int)(entry_seq - (queue_pos + 1));
if (delta == 0) {
++queue->dequeue_pos.value;
} else if (delta < 0) {
/* We ran into an old queue entry, which means we've hit empty */
SDL_mutexV(queue->mutex);
return SDL_FALSE;
} else {
printf("ERROR: mutex failed!\n");
}
/* We own the object, fill it! */
*event = entry->event;
entry->sequence.value = (int)(queue_pos + MAX_ENTRIES);
SDL_mutexV(queue->mutex);
return SDL_TRUE;
}
static SDL_sem *writersDone;
static SDL_sem *readersDone;
static SDL_atomic_t writersRunning;
static SDL_atomic_t readersRunning;
typedef struct
{
SDL_EventQueue *queue;
int index;
char padding1[CACHELINE-(sizeof(SDL_EventQueue*)+sizeof(int))%CACHELINE];
int waits;
SDL_bool lock_free;
char padding2[CACHELINE-sizeof(int)-sizeof(SDL_bool)];
} WriterData;
typedef struct
{
SDL_EventQueue *queue;
int counters[NUM_WRITERS];
int waits;
SDL_bool lock_free;
char padding[CACHELINE-(sizeof(SDL_EventQueue*)+sizeof(int)*NUM_WRITERS+sizeof(int)+sizeof(SDL_bool))%CACHELINE];
} ReaderData;
static int FIFO_Writer(void* _data)
{
WriterData *data = (WriterData *)_data;
SDL_EventQueue *queue = data->queue;
int index = data->index;
int i;
SDL_Event event;
event.type = SDL_USEREVENT;
event.user.windowID = 0;
event.user.code = 0;
event.user.data1 = data;
event.user.data2 = NULL;
if (data->lock_free) {
for (i = 0; i < EVENTS_PER_WRITER; ++i) {
event.user.code = i;
while (!EnqueueEvent_LockFree(queue, &event)) {
++data->waits;
SDL_Delay(0);
}
}
} else {
for (i = 0; i < EVENTS_PER_WRITER; ++i) {
event.user.code = i;
while (!EnqueueEvent_Mutex(queue, &event)) {
++data->waits;
SDL_Delay(0);
}
}
}
SDL_AtomicAdd(&writersRunning, -1);
SDL_SemPost(writersDone);
return 0;
}
static int FIFO_Reader(void* _data)
{
ReaderData *data = (ReaderData *)_data;
SDL_EventQueue *queue = data->queue;
SDL_Event event;
int index;
if (data->lock_free) {
for ( ; ; ) {
if (DequeueEvent_LockFree(queue, &event)) {
WriterData *writer = (WriterData*)event.user.data1;
++data->counters[writer->index];
} else if (queue->active) {
++data->waits;
SDL_Delay(0);
} else {
/* We drained the queue, we're done! */
break;
}
}
} else {
for ( ; ; ) {
if (DequeueEvent_Mutex(queue, &event)) {
WriterData *writer = (WriterData*)event.user.data1;
++data->counters[writer->index];
} else if (queue->active) {
++data->waits;
SDL_Delay(0);
} else {
/* We drained the queue, we're done! */
break;
}
}
}
SDL_AtomicAdd(&readersRunning, -1);
SDL_SemPost(readersDone);
return 0;
}
static void RunFIFOTest(SDL_bool lock_free)
{
SDL_EventQueue queue;
WriterData writerData[NUM_WRITERS];
ReaderData readerData[NUM_READERS];
Uint32 start, end;
int i, j;
int grand_total;
printf("\nFIFO test---------------------------------------\n\n");
printf("Mode: %s\n", lock_free ? "LockFree" : "Mutex");
readersDone = SDL_CreateSemaphore(0);
writersDone = SDL_CreateSemaphore(0);
SDL_memset(&queue, 0xff, sizeof(queue));
InitEventQueue(&queue);
if (!lock_free) {
queue.mutex = SDL_CreateMutex();
}
start = SDL_GetTicks();
/* Start the readers first */
printf("Starting %d readers\n", NUM_READERS);
SDL_zero(readerData);
SDL_AtomicSet(&readersRunning, NUM_READERS);
for (i = 0; i < NUM_READERS; ++i) {
readerData[i].queue = &queue;
readerData[i].lock_free = lock_free;
SDL_CreateThread(FIFO_Reader, &readerData[i]);
}
/* Start up the writers */
printf("Starting %d writers\n", NUM_WRITERS);
SDL_zero(writerData);
SDL_AtomicSet(&writersRunning, NUM_WRITERS);
for (i = 0; i < NUM_WRITERS; ++i) {
writerData[i].queue = &queue;
writerData[i].index = i;
writerData[i].lock_free = lock_free;
SDL_CreateThread(FIFO_Writer, &writerData[i]);
}
/* Wait for the writers */
while (SDL_AtomicGet(&writersRunning) > 0) {
SDL_SemWait(writersDone);
}
/* Shut down the queue so readers exit */
queue.active = SDL_FALSE;
/* Wait for the readers */
while (SDL_AtomicGet(&readersRunning) > 0) {
SDL_SemWait(readersDone);
}
end = SDL_GetTicks();
SDL_DestroySemaphore(readersDone);
SDL_DestroySemaphore(writersDone);
if (!lock_free) {
SDL_DestroyMutex(queue.mutex);
}
printf("Finished in %f sec\n", (end - start) / 1000.f);
printf("\n");
for (i = 0; i < NUM_WRITERS; ++i) {
printf("Writer %d wrote %d events, had %d waits\n", i, EVENTS_PER_WRITER, writerData[i].waits);
}
printf("Writers wrote %d total events\n", NUM_WRITERS*EVENTS_PER_WRITER);
/* Print a breakdown of which readers read messages from which writer */
printf("\n");
grand_total = 0;
for (i = 0; i < NUM_READERS; ++i) {
int total = 0;
for (j = 0; j < NUM_WRITERS; ++j) {
total += readerData[i].counters[j];
}
grand_total += total;
printf("Reader %d read %d events, had %d waits\n", i, total, readerData[i].waits);
printf(" { ");
for (j = 0; j < NUM_WRITERS; ++j) {
if (j > 0) {
printf(", ");
}
printf("%d", readerData[i].counters[j]);
}
printf(" }\n");
}
printf("Readers read %d total events\n", grand_total);
}
/* End FIFO test */
/**************************************************************************/
int
main(int argc, char *argv[])
{
RunBasicTest();
RunEpicTest();
/* This test is really slow, so don't run it by default */
#if 0
RunFIFOTest(SDL_FALSE);
#endif
RunFIFOTest(SDL_TRUE);
return 0;
}
/* vi: set ts=4 sw=4 expandtab: */
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment