Re: Glib sort routines



Stim sort is a sorting method targeting for linked lists in glib. It is closely related to Tim sort. It is a stable sort, has NlogN worst case and a N best case.

---Algorithm---
The algorithm breaks into three parts: extraction, stacking, and merging.

Extraction:
From the original list, we look for sorted sequences. We look for either increasing or decreasing sequences. In the case of an increasing (already sorted) sequence, we tie off the last element next pointer (which pointed to a non-increasing element) to NULL. If the list is decreasing, we remove each element and prepend it to a new list (thus reversing the sequence). Both increasing and decreasing sequences can have sequences of equal elements, but in the decreasing list, there are added in order rather than reversed. This method will always find sequences of two or more (unless it is a list of length 1 or 0).

Stacking:
The real sorting function takes a "want" parameter which specifies the minimum number of nodes items we wish to remove from the list and have sorted. The top function calls with the list length. If the number is large (over 16) the function then recurses asking for half that number as a start. Otherwise it asks from an extracted strand. These functions reply two sorted sequences. The first is an attempt to forfill the requested number, and the second is any sequences beyond this that may be very large. If the current strand is greater than 1/3 of what we desire, we request for the remainder, otherwise we request an equal size block to double up. If we encounter any large blocks that are greater than the request, we return up the stack. Further up the stack, the system decides which sequences to merge first (the smaller ones).

Merge:
Merge combines two sorted sequences into one large one. Each list has a type, ascending(1), descending(2) or neither(0). The first elements of a descending list directly followed by an ascending list were already compared during the extraction step. So this can be used to skip the first compare. The product of an ascending list which had the lower is another list, is an ascending list. A merge of any list with a descending list which has the lowest element at the head, is a descending list. All other combinations become neither.

System walks along the list with the lower element, looking for the point the elements become higher than one at the head of the other list. If 8 steps are made looking for this, the searcher starts galloping. Taking increasingly larger step sizes, it looks for either the end of the list, or a component higher than the other list head. From here it performs a binary chop down to the element. Initial step size of 3 seemed to be marginally faster in tests.

In doubly linked lists, the prev pointers are ignored during the sort, and are zipped to nodes once the sort is finished.

---Benchmarks---

List size: 1000
Test                           ,MergeSort,StimSort ,Percent of comparisons
Sorted ascending               ,     4932,      999,    20.26%
Sorted descending              ,     5044,      999,    19.81%
All equal                      ,     4932,      999,    20.26%
Sorted but 10 unsorted at start,     5798,     1201,    20.71%
Sorted but 10 unsorted at end  ,     4990,     1199,    24.04%
Sorted but with three swaps    ,     5714,     1233,    21.59%
Sorted but with ten random     ,     5097,     1040,    20.40%
Sorted sequences of 16         ,     5452,     2762,    50.66%
Random but many same (4 values),     7991,     5468,    68.43%
Random                         ,     8707,     9035,   103.77%

List size: 10,000,000
Test                           ,MergeSort,StimSort ,Percent of comparisons
Sorted ascending               ,114434624,  9999999,     8.74%
Sorted descending              ,118788160,  9999999,     8.42%
All equal                      ,114434624,  9999999,     8.74%
Sorted but 10 unsorted at start,122942957, 10000472,     8.13%
Sorted but 10 unsorted at end  ,114434811, 10000475,     8.74%
Sorted but with three swaps    ,122892793, 10000633,     8.14%
Sorted but with ten random     ,116036786, 10000100,     8.62%
Sorted sequences of 16         ,120125153, 28238529,    23.51%
Random but many same (4 values),195962270, 55544515,    28.34%
Random                         ,220099126,223401423,   101.50%

Callgrind running on random 10,000 element list.
Merge sort: 61,298,879 Instructions
Stim sort:  58,411,259 Instructions

Ultimately, there is a couple percent overhead when working with truly random data. In terms of raw performance assuming a simple comparison function, even with random data, there is a small performance improvement. There is a noticeable increase in code size. It is over 330 lines of code. Same algorithm can be applied to slist and queue. The galloping can be applied to the insert sorted function.

Could someone take a look at the patch and see if it looks sensible?
diff --git a/glib/glist.c b/glib/glist.c
index 252330a..885d7e9 100644
--- a/glib/glist.c
+++ b/glib/glist.c
@@ -1018,70 +1018,343 @@ g_list_insert_sorted_with_data (GList            *list,
   return g_list_insert_sorted_real (list, data, (GFunc) func, user_data);
 }
 
-static GList *
-g_list_sort_merge (GList     *l1, 
-		   GList     *l2,
-		   GFunc     compare_func,
-		   gpointer  user_data)
+static void
+g_list_sort_merge (GList        *node_a,
+                        int           type_a,
+                        GList        *node_b,
+                        int           type_b,
+                        GList       **reply_list,
+                        int          *reply_type,
+                        GFunc         compare_func,
+		                gpointer      user_data)
 {
-  GList list, *l, *lprev;
-  gint cmp;
+  GList *merged_list;
+  GList *swap_temp_list;
+  GList *old_node_a;
+  
+  if (!node_a)
+    {
+      *reply_list = node_b;
+      *reply_type = type_b;
+      return;
+    }
+  if (!node_b)
+    {
+      *reply_list = node_a;
+      *reply_type = type_a;
+      return;
+    }
+  *reply_type = 0;
+  if (!(type_a == 2 && type_b == 1) && 
+      ((GCompareDataFunc) compare_func) (node_a->data, node_b->data, user_data) > 0)
+    {
+      swap_temp_list = node_b;
+      node_b = node_a;
+      node_a = swap_temp_list;
+      if (type_b == 2)
+        *reply_type = 2;
+    }
+  else
+    {
+      if (type_a == 1)
+        *reply_type = 1;
+    }
+  *reply_list = node_a;
+  
+  while (1)
+    {
+      int step_count = 0;
+      do
+        {
+          if (step_count++ >= 8)
+            {
+              int step_size = 3;
+              int step_mul = 1;
+              int steps_done;
+              GList *gallop_node;
+              while (1)
+                {
+                  gallop_node = node_a;
+                  if (!node_a->next)
+                    {
+                      node_a->next = node_b;
+                      return;
+                    }
+                  for(steps_done = 0; steps_done < step_size; steps_done++)
+                    {
+                      if (!gallop_node->next)
+                        break;
+                      gallop_node = gallop_node->next;
+                    }
+                  if (((GCompareDataFunc) compare_func) (gallop_node->data, node_b->data, user_data) <= 0)
+                    {
+                      node_a = gallop_node;
+                      if (step_mul)
+                        step_size *= 2;
+                      else 
+                        step_size -= steps_done / 2 ;
+                    }
+                  else
+                    {
+                      step_size = steps_done / 2;
+                      step_mul = 0;
+                    }
+                  if (!step_size)
+                    {
+                      old_node_a = node_a;
+                      node_a = node_a->next;
+                      break;
+                    }
+                }
+              step_count = 0;
+              break;
+            }
+          old_node_a = node_a;
+          node_a = node_a->next;
+          if (!node_a)
+            {
+              old_node_a->next = node_b;
+              return;
+            }
+        }
+      while (((GCompareDataFunc) compare_func) (node_a->data, node_b->data, user_data) <= 0);
 
-  l = &list; 
-  lprev = NULL;
+      old_node_a->next = node_b;
+      swap_temp_list = node_b;
+      node_b = node_a;
+      node_a = swap_temp_list;
+    }
+  
+}
 
-  while (l1 && l2)
+static void
+g_list_sort_extract (GList         *node,
+                     GList        **reply_list,
+                     int           *reply_count,
+                     int           *reply_type,
+                     GList        **reply_rest,
+                     GFunc          compare_func,
+		             gpointer       user_data)
+{
+  int cmp;
+  int count = 0;
+  GList *new_list;
+  if (!node)
     {
-      cmp = ((GCompareDataFunc) compare_func) (l1->data, l2->data, user_data);
-
-      if (cmp <= 0)
+      *reply_list = NULL;
+      *reply_rest = NULL;
+      *reply_count = 0;
+      *reply_type = 0;
+      return;
+    }
+  if (!node->next)
+    {
+      *reply_list = node;
+      *reply_rest = NULL;
+      *reply_count = 1;
+      *reply_type = 1;
+      return;
+    }
+  new_list = node;
+  while (1)
+    {
+      cmp = ((GCompareDataFunc) compare_func) (node->data, node->next->data, user_data);
+      if (cmp)
+        break;
+      node = node->next;
+      count++;
+      if (!node->next)
         {
-	  l->next = l1;
-	  l1 = l1->next;
-        } 
-      else 
-	{
-	  l->next = l2;
-	  l2 = l2->next;
+          *reply_list = new_list;
+          *reply_rest = NULL;
+          *reply_count = count+1;
+          return;
         }
-      l = l->next;
-      l->prev = lprev; 
-      lprev = l;
     }
-  l->next = l1 ? l1 : l2;
-  l->next->prev = l;
+  if (cmp < 0)
+    {
+      /* a < b so do ascending list */
+      *reply_list = new_list;
+      *reply_type = 1;
+      node = node->next;
+      count += 2;
+      while(1)
+        {
+          if (!node->next)
+            {
+              *reply_rest = NULL;
+              *reply_count = count;
+              return;
+            }
+          if (((GCompareDataFunc) compare_func) (node->data, node->next->data, user_data) > 0)
+            {
+              *reply_rest = node->next;
+              node->next = NULL;
+              *reply_count = count;
+              return;
+            }
+          count++;
+          node = node->next;
+        }
+    }
+  else
+    {
+      /* a > b so do a descending list */
+      GList *cur_node;
+      GList *eq_node;
+      
+      cur_node = node;
+      node = node->next;
+      cur_node->next = NULL;
+      cur_node = node;
+      node = node->next;
+      cur_node->next = new_list;
+      new_list = cur_node;
+      eq_node = new_list;
+      *reply_type = 2;
+      count += 2;
+      while(1)
+        {
+          if (!node)
+            {
+              *reply_list = new_list;
+              *reply_rest = NULL;
+              *reply_count = count;
+              return;
+            }
+          cmp = ((GCompareDataFunc) compare_func) (new_list->data, node->data, user_data);
+          if (cmp < 0)
+            {
+              *reply_list = new_list;
+              *reply_rest = node;
+              *reply_count = count;
+              return;
+            }
+          count++;
+          cur_node = node;
+          node = node->next;
+          if (cmp == 0)
+            {
+              cur_node->next = eq_node->next;
+              eq_node->next = cur_node;
+            }
+          else
+            {
+              cur_node->next = new_list;
+              new_list = cur_node;
+            }
+          eq_node = cur_node;
+       
+        }
+    }
+}
 
-  return list.next;
+static void
+g_list_sort_body (GList         *node,
+                  GList         *left_list,
+                  int            left_count,
+                  int            left_type,
+                  GList        **reply1_list,
+                  int           *reply1_count,
+                  int           *reply1_type,
+                  GList        **reply2_list,
+                  int           *reply2_count,
+                  int           *reply2_type,
+                  GList        **reply_rest,
+                  int            want,
+                  GFunc          compare_func,
+		          gpointer       user_data)
+{
+  GList *right1_list, *right2_list;
+  int right1_count, right2_count;
+  int right1_type, right2_type;
+  GList *rest;
+  if (want < 16)
+    {
+      g_list_sort_extract (node,
+                           &right1_list, &right1_count, &right1_type,
+                           &rest,
+                           compare_func,
+                           user_data);
+    }
+  else
+    {
+      g_list_sort_body (node,
+                        left_list, left_count, left_type,
+                        &left_list, &left_count, &left_type, 
+                        &right1_list, &right1_count, &right1_type,
+                        &rest,
+                        want/2,
+                        compare_func,
+                        user_data);
+    }
+  
+  while (left_count < want && right1_count && right1_count < want)
+    {
+      int mywant = left_count > want / 3 ? want - left_count : left_count;
+      g_list_sort_body (rest,
+                        right1_list, right1_count, right1_type,
+                        &right1_list, &right1_count, &right1_type, 
+                        &right2_list, &right2_count, &right2_type,
+                        &rest,
+                        mywant,
+                        compare_func,
+                        user_data);
+      g_list_sort_merge (left_list, left_type,
+                         right1_list, right1_type,
+                         &left_list, &left_type,
+                         compare_func,
+                         user_data);
+      left_count = left_count + right1_count;
+      right1_list = right2_list;
+      right1_count = right2_count;
+      right1_type = right2_type;
+    }
+  *reply1_list = left_list;
+  *reply1_count = left_count;;
+  *reply1_type = left_type;;
+  *reply2_list = right1_list;
+  *reply2_count = right1_count;
+  *reply2_type = right1_type;
+  *reply_rest = rest;
+  return;
 }
 
-static GList* 
-g_list_sort_real (GList    *list,
-		  GFunc     compare_func,
-		  gpointer  user_data)
+static GList*
+g_list_sort_real (GList*       list,
+                  GFunc        compare_func,
+		          gpointer     user_data)
 {
-  GList *l1, *l2;
+  GList *node, *rest;
+  GList *reply1, *reply2; 
+  int count;
+  int type;
+  int reply1_count, reply2_count;
+  int reply1_type, reply2_type;
+  int want = g_list_length (list);
   
-  if (!list) 
-    return NULL;
-  if (!list->next) 
-    return list;
+  g_list_sort_extract (list, &node, &count, &type, &rest, compare_func, user_data);
   
-  l1 = list; 
-  l2 = list->next;
-
-  while ((l2 = l2->next) != NULL)
+  g_list_sort_body (rest,
+                    node, count, type,
+                    &reply1, &reply1_count, &reply1_type,
+                    &reply2, &reply2_count, &reply2_type,
+                    &rest,
+                    want,
+                    compare_func,
+                    user_data);
+  node = reply1;
+  g_assert(reply2_count == 0);
+  g_assert(reply1_count == want);
+  
+  GList *prev_node = NULL;
+  while (node)
     {
-      if ((l2 = l2->next) == NULL) 
-	break;
-      l1 = l1->next;
+      node->prev = prev_node;
+      prev_node = node;
+      node = node->next;
     }
-  l2 = l1->next; 
-  l1->next = NULL; 
-
-  return g_list_sort_merge (g_list_sort_real (list, compare_func, user_data),
-			    g_list_sort_real (l2, compare_func, user_data),
-			    compare_func,
-			    user_data);
+  return reply1;
 }
 
 /**
@@ -1111,7 +1384,7 @@ g_list_sort_real (GList    *list,
  **/
 GList *
 g_list_sort (GList        *list,
-	     GCompareFunc  compare_func)
+             GCompareFunc  compare_func)
 {
   return g_list_sort_real (list, (GFunc) compare_func, NULL);
 			    
@@ -1143,8 +1416,8 @@ g_list_sort (GList        *list,
  **/
 GList *
 g_list_sort_with_data (GList            *list,
-		       GCompareDataFunc  compare_func,
-		       gpointer          user_data)
+                       GCompareDataFunc  compare_func,
+                       gpointer          user_data)
 {
   return g_list_sort_real (list, (GFunc) compare_func, user_data);
 }
diff --git a/tests/Makefile.am b/tests/Makefile.am
index f78e9d5..13d9509 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -96,6 +96,7 @@ test_programs =					\
 	gio-test				\
 	iochannel-test				\
 	list-test				\
+	list-performance-test				\
 	mainloop-test				\
 	mapping-test				\
 	markup-collect				\
@@ -151,6 +152,7 @@ file_test_LDADD = $(progs_ldadd)
 env_test_LDADD = $(progs_ldadd)
 gio_test_LDADD = $(progs_ldadd)
 iochannel_test_LDADD = $(progs_ldadd)
+list_performance_test_LDADD = $(progs_ldadd)
 list_test_LDADD = $(progs_ldadd)
 mainloop_test_LDADD = $(thread_ldadd)
 markup_test_LDADD = $(progs_ldadd)


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]