diff test/testatomic.c @ 5100:470ede30189c

Added a FIFO test to the atomic test suite. This is really useful because we might be able to use something like this for the SDL event queue.
author Sam Lantinga <slouken@libsdl.org>
date Tue, 25 Jan 2011 23:23:52 -0800
parents 342b158efbbe
children 1b3678ac9804
line wrap: on
line diff
--- a/test/testatomic.c	Tue Jan 25 18:02:41 2011 -0800
+++ b/test/testatomic.c	Tue Jan 25 23:23:52 2011 -0800
@@ -231,10 +231,403 @@
 /* End atomic operation test */
 /**************************************************************************/
 
+/**************************************************************************/
+/* Lock-free FIFO test */
+
+#define NUM_READERS 4
+#define NUM_WRITERS 4
+#define EVENTS_PER_WRITER   1000000
+
+/* A decent guess for the size of a cache line on this architecture */
+#define CACHELINE   64
+
+/* The number of entries must be a power of 2 */
+#define MAX_ENTRIES 256
+#define WRAP_MASK   (MAX_ENTRIES-1)
+
+typedef struct
+{
+    SDL_atomic_t sequence;
+    SDL_Event event;
+} SDL_EventQueueEntry;
+
+typedef struct
+{
+    SDL_EventQueueEntry entries[MAX_ENTRIES];
+
+    char cache_pad1[CACHELINE-((sizeof(SDL_EventQueueEntry)*MAX_ENTRIES)%CACHELINE)];
+
+    SDL_atomic_t enqueue_pos;
+
+    char cache_pad2[CACHELINE-sizeof(SDL_atomic_t)];
+
+    SDL_atomic_t dequeue_pos;
+
+    char cache_pad3[CACHELINE-sizeof(SDL_atomic_t)];
+
+    SDL_bool active;
+
+    /* Only needed for the mutex test */
+    SDL_mutex *mutex;
+
+} SDL_EventQueue;
+
+static void InitEventQueue(SDL_EventQueue *queue)
+{
+    int i;
+
+    for (i = 0; i < MAX_ENTRIES; ++i) {
+        SDL_AtomicSet(&queue->entries[i].sequence, i);
+    }
+    SDL_AtomicSet(&queue->enqueue_pos, 0);
+    SDL_AtomicSet(&queue->dequeue_pos, 0);
+    queue->active = SDL_TRUE;
+}
+
+static SDL_bool EnqueueEvent_LockFree(SDL_EventQueue *queue, const SDL_Event *event)
+{
+    SDL_EventQueueEntry *entry;
+    unsigned queue_pos;
+    unsigned entry_seq;
+    int delta;
+
+    queue_pos = (unsigned)SDL_AtomicGet(&queue->enqueue_pos);
+    for ( ; ; ) {
+        entry = &queue->entries[queue_pos & WRAP_MASK];
+        entry_seq = (unsigned)SDL_AtomicGet(&entry->sequence);
+
+        delta = (int)(entry_seq - queue_pos);
+        if (delta == 0) {
+            /* The entry and the queue position match, try to increment the queue position */
+            if (SDL_AtomicCAS(&queue->enqueue_pos, (int)queue_pos, (int)(queue_pos+1))) {
+                break;
+            }
+        } else if (delta < 0) {
+            /* We ran into an old queue entry, which means it still needs to be dequeued */
+            return SDL_FALSE;
+        } else {
+            /* We ran into a new queue entry, get the new queue position */
+            queue_pos = (unsigned)SDL_AtomicGet(&queue->enqueue_pos);
+        }
+    }
+
+    /* We own the object, fill it! */
+    entry->event = *event;
+    SDL_AtomicSet(&entry->sequence, (int)(queue_pos + 1));
+
+    return SDL_TRUE;
+}
+
+static SDL_bool DequeueEvent_LockFree(SDL_EventQueue *queue, SDL_Event *event)
+{
+    SDL_EventQueueEntry *entry;
+    unsigned queue_pos;
+    unsigned entry_seq;
+    int delta;
+
+    queue_pos = (unsigned)SDL_AtomicGet(&queue->dequeue_pos);
+    for ( ; ; ) {
+        entry = &queue->entries[queue_pos & WRAP_MASK];
+        entry_seq = (unsigned)SDL_AtomicGet(&entry->sequence);
+
+        delta = (int)(entry_seq - (queue_pos + 1));
+        if (delta == 0) {
+            /* The entry and the queue position match, try to increment the queue position */
+            if (SDL_AtomicCAS(&queue->dequeue_pos, (int)queue_pos, (int)(queue_pos+1))) {
+                break;
+            }
+        } else if (delta < 0) {
+            /* We ran into an old queue entry, which means we've hit empty */
+            return SDL_FALSE;
+        } else {
+            /* We ran into a new queue entry, get the new queue position */
+            queue_pos = (unsigned)SDL_AtomicGet(&queue->dequeue_pos);
+        }
+    }
+
+    /* We own the object, fill it! */
+    *event = entry->event;
+    SDL_AtomicSet(&entry->sequence, (int)(queue_pos+MAX_ENTRIES));
+
+    return SDL_TRUE;
+}
+
+static SDL_bool EnqueueEvent_Mutex(SDL_EventQueue *queue, const SDL_Event *event)
+{
+    SDL_EventQueueEntry *entry;
+    unsigned queue_pos;
+    unsigned entry_seq;
+    int delta;
+
+    SDL_mutexP(queue->mutex);
+
+    queue_pos = (unsigned)queue->enqueue_pos.value;
+    entry = &queue->entries[queue_pos & WRAP_MASK];
+    entry_seq = (unsigned)entry->sequence.value;
+
+    delta = (int)(entry_seq - queue_pos);
+    if (delta == 0) {
+        ++queue->enqueue_pos.value;
+    } else if (delta < 0) {
+        /* We ran into an old queue entry, which means it still needs to be dequeued */
+        SDL_mutexV(queue->mutex);
+        return SDL_FALSE;
+    } else {
+        printf("ERROR: mutex failed!\n");
+    }
+
+    /* We own the object, fill it! */
+    entry->event = *event;
+    entry->sequence.value = (int)(queue_pos + 1);
+
+    SDL_mutexV(queue->mutex);
+
+    return SDL_TRUE;
+}
+
+static SDL_bool DequeueEvent_Mutex(SDL_EventQueue *queue, SDL_Event *event)
+{
+    SDL_EventQueueEntry *entry;
+    unsigned queue_pos;
+    unsigned entry_seq;
+    int delta;
+
+    SDL_mutexP(queue->mutex);
+
+    queue_pos = (unsigned)queue->dequeue_pos.value;
+    entry = &queue->entries[queue_pos & WRAP_MASK];
+    entry_seq = (unsigned)entry->sequence.value;
+
+    delta = (int)(entry_seq - (queue_pos + 1));
+    if (delta == 0) {
+        ++queue->dequeue_pos.value;
+    } else if (delta < 0) {
+        /* We ran into an old queue entry, which means we've hit empty */
+        SDL_mutexV(queue->mutex);
+        return SDL_FALSE;
+    } else {
+        printf("ERROR: mutex failed!\n");
+    }
+
+    /* We own the object, fill it! */
+    *event = entry->event;
+    entry->sequence.value = (int)(queue_pos + MAX_ENTRIES);
+
+    SDL_mutexV(queue->mutex);
+
+    return SDL_TRUE;
+}
+
+static SDL_sem *writersDone;
+static SDL_sem *readersDone;
+static SDL_atomic_t writersRunning;
+static SDL_atomic_t readersRunning;
+
+typedef struct
+{
+    SDL_EventQueue *queue;
+    int index;
+    char padding1[CACHELINE-(sizeof(SDL_EventQueue*)+sizeof(int))%CACHELINE];
+    int waits;
+    SDL_bool lock_free;
+    char padding2[CACHELINE-sizeof(int)-sizeof(SDL_bool)];
+} WriterData;
+
+typedef struct
+{
+    SDL_EventQueue *queue;
+    int counters[NUM_WRITERS];
+    int waits;
+    SDL_bool lock_free;
+    char padding[CACHELINE-(sizeof(SDL_EventQueue*)+sizeof(int)*NUM_WRITERS+sizeof(int)+sizeof(SDL_bool))%CACHELINE];
+} ReaderData;
+
+static int FIFO_Writer(void* _data)
+{
+    WriterData *data = (WriterData *)_data;
+    SDL_EventQueue *queue = data->queue;
+    int index = data->index;
+    int i;
+    SDL_Event event;
+
+    event.type = SDL_USEREVENT;
+    event.user.windowID = 0;
+    event.user.code = 0;
+    event.user.data1 = data;
+    event.user.data2 = NULL;
+
+    if (data->lock_free) {
+        for (i = 0; i < EVENTS_PER_WRITER; ++i) {
+            event.user.code = i;
+            while (!EnqueueEvent_LockFree(queue, &event)) {
+                ++data->waits;
+                SDL_Delay(0);
+            }
+        }
+    } else {
+        for (i = 0; i < EVENTS_PER_WRITER; ++i) {
+            event.user.code = i;
+            while (!EnqueueEvent_Mutex(queue, &event)) {
+                ++data->waits;
+                SDL_Delay(0);
+            }
+        }
+    }
+    SDL_AtomicAdd(&writersRunning, -1);
+    SDL_SemPost(writersDone);
+    return 0;
+}
+
+static int FIFO_Reader(void* _data)
+{
+    ReaderData *data = (ReaderData *)_data;
+    SDL_EventQueue *queue = data->queue;
+    SDL_Event event;
+    int index;
+
+    if (data->lock_free) {
+        for ( ; ; ) {
+            if (DequeueEvent_LockFree(queue, &event)) {
+                WriterData *writer = (WriterData*)event.user.data1;
+                ++data->counters[writer->index];
+            } else if (queue->active) {
+                ++data->waits;
+                SDL_Delay(0);
+            } else {
+                /* We drained the queue, we're done! */
+                break;
+            }
+        }
+    } else {
+        for ( ; ; ) {
+            if (DequeueEvent_Mutex(queue, &event)) {
+                WriterData *writer = (WriterData*)event.user.data1;
+                ++data->counters[writer->index];
+            } else if (queue->active) {
+                ++data->waits;
+                SDL_Delay(0);
+            } else {
+                /* We drained the queue, we're done! */
+                break;
+            }
+        }
+    }
+    SDL_AtomicAdd(&readersRunning, -1);
+    SDL_SemPost(readersDone);
+    return 0;
+}
+
+static void RunFIFOTest(SDL_bool lock_free)
+{
+    SDL_EventQueue queue;
+    WriterData writerData[NUM_WRITERS];
+    ReaderData readerData[NUM_READERS];
+    Uint32 start, end;
+    int i, j;
+    int grand_total;
+ 
+    printf("\nFIFO test---------------------------------------\n\n");
+    printf("Mode: %s\n", lock_free ? "LockFree" : "Mutex");
+
+    readersDone = SDL_CreateSemaphore(0);
+    writersDone = SDL_CreateSemaphore(0);
+
+    SDL_memset(&queue, 0xff, sizeof(queue));
+
+    InitEventQueue(&queue);
+    if (!lock_free) {
+        queue.mutex = SDL_CreateMutex();
+    }
+
+    start = SDL_GetTicks();
+ 
+    /* Start the readers first */
+    printf("Starting %d readers\n", NUM_READERS);
+    SDL_zero(readerData);
+    SDL_AtomicSet(&readersRunning, NUM_READERS);
+    for (i = 0; i < NUM_READERS; ++i) {
+        readerData[i].queue = &queue;
+        readerData[i].lock_free = lock_free;
+        SDL_CreateThread(FIFO_Reader, &readerData[i]);
+    }
+
+    /* Start up the writers */
+    printf("Starting %d writers\n", NUM_WRITERS);
+    SDL_zero(writerData);
+    SDL_AtomicSet(&writersRunning, NUM_WRITERS);
+    for (i = 0; i < NUM_WRITERS; ++i) {
+        writerData[i].queue = &queue;
+        writerData[i].index = i;
+        writerData[i].lock_free = lock_free;
+        SDL_CreateThread(FIFO_Writer, &writerData[i]);
+    }
+ 
+    /* Wait for the writers */
+    while (SDL_AtomicGet(&writersRunning) > 0) {
+        SDL_SemWait(writersDone);
+    }
+ 
+    /* Shut down the queue so readers exit */
+    queue.active = SDL_FALSE;
+
+    /* Wait for the readers */
+    while (SDL_AtomicGet(&readersRunning) > 0) {
+        SDL_SemWait(readersDone);
+    }
+
+    end = SDL_GetTicks();
+ 
+    SDL_DestroySemaphore(readersDone);
+    SDL_DestroySemaphore(writersDone);
+
+    if (!lock_free) {
+        SDL_DestroyMutex(queue.mutex);
+    }
+ 
+    printf("Finished in %f sec\n", (end - start) / 1000.f);
+
+    printf("\n");
+    for (i = 0; i < NUM_WRITERS; ++i) {
+        printf("Writer %d wrote %d events, had %d waits\n", i, EVENTS_PER_WRITER, writerData[i].waits);
+    }
+    printf("Writers wrote %d total events\n", NUM_WRITERS*EVENTS_PER_WRITER);
+
+    /* Print a breakdown of which readers read messages from which writer */
+    printf("\n");
+    grand_total = 0;
+    for (i = 0; i < NUM_READERS; ++i) {
+        int total = 0;
+        for (j = 0; j < NUM_WRITERS; ++j) {
+            total += readerData[i].counters[j];
+        }
+        grand_total += total;
+        printf("Reader %d read %d events, had %d waits\n", i, total, readerData[i].waits);
+        printf("  { ");
+        for (j = 0; j < NUM_WRITERS; ++j) {
+            if (j > 0) {
+                printf(", ");
+            }
+            printf("%d", readerData[i].counters[j]);
+        }
+        printf(" }\n");
+    }
+    printf("Readers read %d total events\n", grand_total);
+}
+
+/* End FIFO test */
+/**************************************************************************/
+
 int
 main(int argc, char *argv[])
 {
     RunBasicTest();
     RunEpicTest();
+/* This test is really slow, so don't run it by default */
+#if 0
+    RunFIFOTest(SDL_FALSE);
+#endif
+    RunFIFOTest(SDL_TRUE);
     return 0;
 }
+
+/* vi: set ts=4 sw=4 expandtab: */