Re: New objects for GLib



Sebastian Wilhelmi <wilhelmi@ira.uka.de> writes:

> In my experience you often need to hold the lock even longer than the queue
> itself (that's why I added the explicit lock/unlock functions and the
> _unlocked function variants): Look at a part of GThreadPool:
> 
> void 
> g_thread_pool_push (GThreadPool     *pool,
>                     gpointer         data)
> {
>   GRealThreadPool *real = (GRealThreadPool*) pool;
>   g_async_queue_lock (real->queue);
>   if (g_async_queue_length_unlocked (real->queue) >= 0)
>     {
>       /* No thread is waiting in the queue */
>       g_thread_pool_start_thread (real);
>     }
>   g_async_queue_push_unlocked (real->queue, data);
>   g_async_queue_unlock (real->queue);
> }
> 
> Here I test the length and add a thread, when there is no thread waiting and I
> release the lock after that. That makes things slower, ok, but I can now
> garantee all sorts of properties, that would otherwise not be possible. I
> know, it's not a very convincing example, but thats what I found to be
> practical.

You can guarantee that the queue *is* in fact empty when you start a
new thread. Without the lock you could only 'guarantee' that the queue
would be 'almost' empty. I am not convinced that this guarantee is
worth it. The number of extra threads started would be low. I did
something similar which when stressed started only very few extra
threads.

> Now for your code. It looks, like it only works for exactly one reader and
> exactly one writer. But my Queue also works for multiple readers/writers and
> thats also what is needed for a thread pool. A comparing performance test of
> the two solutions might be interesting for the one-reader/one-writer case,
> though.

That is true. I forgot that I wrote the queue to solve a
producer/consumer problem. It is not very difficult to modify it to
support more than one reader/writer, just add a reader-mutex and a
writer-mutex. I have attached such a changed version. This version
also cleans up a few leftovers from an old version.

#include "lac.h"

#define g_mutex_my_lock(a) if (1) { g_print("%d lock %s, \n", __LINE__, #a); g_mutex_lock (a); } else 
#define g_mutex_my_unlock(a) if (1) { g_print("%d unlock %s, \n", __LINE__, #a); g_mutex_unlock (a); } else

typedef struct _LacQueueNode LacQueueNode;
typedef enum _LacQueueNodeState {
  LAC_QUEUE_NODE_FREE,
  LAC_QUEUE_NODE_DATA
} LacQueueNodeState;

struct _LacQueue {
  LacQueueNode *data_node;
  LacQueueNode *free_node;
  GMutex *read_mutex;
  GMutex *write_mutex;
};

struct _LacQueueNode {
  LacQueueNodeState state;
  GMutex *state_mutex;
  GCond *state_cond;

  gpointer data;
  LacQueueNode *next;
};

G_LOCK_DEFINE (lac_queue_global);
static GMemChunk *node_mem_chunk= NULL;
static LacQueueNode *node_free_list = NULL;

static LacQueueNode	*lac_queue_node_new		();
static void		 lac_queue_node_free_circle	(LacQueueNode *node);

static 
LacQueueNode *
lac_queue_node_new ()
{
  LacQueueNode *node;

  G_LOCK (lac_queue_global);
  if (node_free_list)
    {
      node = node_free_list;
      node_free_list = node->next;
    }
  else
    {
      if (!node_mem_chunk)
	node_mem_chunk = g_mem_chunk_create (LacQueueNode, 256, G_ALLOC_ONLY);
      node = g_chunk_new (LacQueueNode, node_mem_chunk);
    }
  G_UNLOCK (lac_queue_global);
  node->data = NULL;
  node->state = LAC_QUEUE_NODE_FREE;
  node->state_mutex = g_mutex_new ();
  node->state_cond = g_cond_new ();
  return node;
}

static void
lac_queue_node_free_circle (LacQueueNode *node)
{
  LacQueueNode *temp;

  G_LOCK (lac_queue_global);
  temp = node->next;
  node->next = node_free_list;
  node_free_list = temp;
  G_UNLOCK (lac_queue_global);
}

LacQueue *
lac_queue_new ()
{
  LacQueue *queue;
  
  queue = g_new (LacQueue, 1);
  queue->free_node = lac_queue_node_new();
  queue->free_node->next = lac_queue_node_new();
  queue->free_node->next->next = lac_queue_node_new();
  queue->free_node->next->next->next = queue->free_node;
  queue->data_node = queue->free_node;
  queue->read_mutex = g_mutex_new ();
  queue->write_mutex = g_mutex_new ();

  return queue;
}

void
lac_queue_free (LacQueue *queue)
{
  g_return_if_fail (queue != NULL);
  
  lac_queue_node_free_circle (queue->free_node);
  g_mutex_free (queue->read_mutex);
  g_mutex_free (queue->write_mutex);
  g_free (queue);
}

gboolean
lac_queue_empty (LacQueue *queue)
{
  gboolean empty;

  g_return_val_if_fail (queue != NULL, FALSE);

  g_mutex_lock (queue->data_node->state_mutex);
  if (queue->data_node->state == LAC_QUEUE_NODE_DATA)
    empty = FALSE;
  else
    empty = TRUE;
  g_mutex_unlock (queue->data_node->state_mutex);

  return empty;
}

gpointer
lac_queue_read (LacQueue *queue, gboolean may_block)
{
  gpointer result;
  LacQueueNode *tmp;

  g_return_val_if_fail (queue != NULL, NULL);

  g_mutex_lock (queue->read_mutex);
  g_mutex_lock (queue->data_node->state_mutex);
  if (may_block)
    while (queue->data_node->state != LAC_QUEUE_NODE_DATA)
      g_cond_wait (queue->data_node->state_cond, queue->data_node->state_mutex);
  else if (queue->data_node->state != LAC_QUEUE_NODE_DATA)
    {
      g_mutex_unlock (queue->data_node->state_mutex);
      g_mutex_unlock (queue->read_mutex);
      return NULL;
    }
  result = queue->data_node->data;
  queue->data_node->state = LAC_QUEUE_NODE_FREE;
  tmp = queue->data_node;
  queue->data_node = queue->data_node->next;
  g_mutex_unlock (tmp->state_mutex);
  g_mutex_unlock (queue->read_mutex);
  return result;
}

void
lac_queue_write (LacQueue *queue, gpointer data)
{
  LacQueueNode *new;

  g_return_if_fail (queue != NULL);

  g_mutex_lock (queue->write_mutex);
  g_mutex_lock (queue->free_node->next->state_mutex);
  if (queue->free_node->next->state != LAC_QUEUE_NODE_FREE)
    {
      g_mutex_unlock (queue->free_node->next->state_mutex);
      new = lac_queue_node_new ();
      new->next = queue->free_node->next;
      queue->free_node->next = new;
    }
  else
    g_mutex_unlock (queue->free_node->next->state_mutex);

  g_mutex_lock (queue->free_node->state_mutex);
  queue->free_node->data = data;
  queue->free_node->state = LAC_QUEUE_NODE_DATA;
  g_mutex_unlock (queue->free_node->state_mutex);

  g_cond_signal (queue->free_node->state_cond);

  queue->free_node = queue->free_node->next;
  g_mutex_unlock (queue->write_mutex);
}


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]