[libgda] GdaDataPivot improvements



commit 50e5dd9607975c695fe80d5afb9657a0155695e1
Author: Vivien Malerba <malerba gnome-db org>
Date:   Wed Sep 28 21:38:58 2011 +0200

    GdaDataPivot improvements
    
    more scalable way of summarizing data

 libgda/gda-data-pivot.c |  862 ++++++++++++++++++++++++++++++++++++++---------
 libgda/gda-data-pivot.h |    3 +-
 2 files changed, 713 insertions(+), 152 deletions(-)
---
diff --git a/libgda/gda-data-pivot.c b/libgda/gda-data-pivot.c
index 8c87c93..72ef680 100644
--- a/libgda/gda-data-pivot.c
+++ b/libgda/gda-data-pivot.c
@@ -48,9 +48,13 @@ static void column_data_free (ColumnData *cdata);
 typedef struct {
 	gint    row;
 	gint    col;
+	GType   gtype;
 	GArray *values; /* array of #GValue, none will be GDA_TYPE_NULL, and there is at least always 1 GValue */
 	GdaDataPivotAggregate aggregate;
-	GValue *computed_value;
+
+	gint    nvalues;
+	GValue *data_value;
+	GError *error;
 } CellData;
 static guint cell_data_hash (gconstpointer key);
 static gboolean cell_data_equal (gconstpointer a, gconstpointer b);
@@ -58,6 +62,7 @@ static void cell_data_free (CellData *cdata);
 static void cell_data_compute_aggregate (CellData *cdata);
 
 static GValue *aggregate_get_empty_value (GdaDataPivotAggregate aggregate);
+static gboolean aggregate_handle_new_value (CellData *cdata, const GValue *new_value);
 
 
 struct _GdaDataPivotPrivate {
@@ -498,11 +503,11 @@ aggregate_get_empty_value (GdaDataPivotAggregate aggregate)
 	case GDA_DATA_PIVOT_MIN:
 	case GDA_DATA_PIVOT_MAX:
 	case GDA_DATA_PIVOT_AVG:
-		return gda_value_new_null ();
 	case GDA_DATA_PIVOT_SUM:
+		return gda_value_new_null ();
 	case GDA_DATA_PIVOT_COUNT:
-		value = gda_value_new (G_TYPE_INT);
-		g_value_set_int (value, 0);
+		value = gda_value_new (G_TYPE_UINT);
+		g_value_set_uint (value, 0);
 		return value;
 		break;
 	default:
@@ -510,73 +515,672 @@ aggregate_get_empty_value (GdaDataPivotAggregate aggregate)
 	}
 }
 
-static gint64
-compute_for_gint (GdaDataPivotAggregate type, GArray *values)
+static gboolean
+aggregate_handle_gint (CellData *cdata, gint val)
 {
-	guint i;
-	gint64 res = 0;
-	for (i = 0; i < values->len; i++) {
-		GValue *v;
-		v = g_array_index (values, GValue*, i);
-		if (type == GDA_DATA_PIVOT_SUM)
-			res += g_value_get_int (v);
-		else {
-			if (i == 0)
-				res = g_value_get_int (v);
-			else if (type == GDA_DATA_PIVOT_MIN)
-				res = MIN (res, g_value_get_int (v));
+	if (cdata->data_value) {
+		gint eval = 0;
+		if (G_VALUE_TYPE (cdata->data_value) == G_TYPE_INT)
+			eval = g_value_get_int (cdata->data_value);
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+			if (eval > val)
+				g_value_set_int (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_MAX:
+			if (eval < val)
+				g_value_set_int (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_SUM: {
+			gint64 tmp;
+			tmp = eval + val;
+			if ((tmp > G_MAXINT) || (tmp < G_MININT))
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
 			else
-				res = MAX (res, g_value_get_int (v));
+				g_value_set_int (cdata->data_value, (gint) tmp);
+			break;
+		}
+		case GDA_DATA_PIVOT_AVG: {
+			gint64 tmp;
+			tmp = g_value_get_int64 (cdata->data_value) + val;
+			if ((tmp > G_MAXINT64) || (tmp < G_MININT64))
+				/* FIXME: how to handle overflow detection ? */
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
+			else
+				g_value_set_int64 (cdata->data_value, tmp);
+			break;
+		}
+		default:
+			return FALSE;
 		}
-		gda_value_free (v);
 	}
-	return res;
+	else {
+		/* new initial value */
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+		case GDA_DATA_PIVOT_MAX:
+		case GDA_DATA_PIVOT_SUM:
+			cdata->data_value = gda_value_new (G_TYPE_INT);
+			g_value_set_int (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_AVG:
+			cdata->data_value = gda_value_new (G_TYPE_INT64);
+			g_value_set_int64 (cdata->data_value, (gint64) val);
+			break;
+		default:
+			return FALSE;
+		}
+	}
+	cdata->nvalues ++;
+	return TRUE;
 }
 
-static gfloat
-compute_for_gfloat (GdaDataPivotAggregate type, GArray *values)
+static gboolean
+aggregate_handle_guint (CellData *cdata, guint val)
 {
-	guint i;
-	gfloat res = 0;
-	for (i = 0; i < values->len; i++) {
-		GValue *v;
-		v = g_array_index (values, GValue*, i);
-		if (type == GDA_DATA_PIVOT_SUM)
-			res += g_value_get_float (v);
-		else {
-			if (i == 0)
-				res = g_value_get_float (v);
-			else if (type == GDA_DATA_PIVOT_MIN)
-				res = MIN (res, g_value_get_float (v));
+	if (cdata->data_value) {
+		guint eval = 0;
+		if (G_VALUE_TYPE (cdata->data_value) == G_TYPE_UINT)
+			eval = g_value_get_uint (cdata->data_value);
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+			if (eval > val)
+				g_value_set_uint (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_MAX:
+			if (eval < val)
+				g_value_set_uint (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_SUM: {
+			guint64 tmp;
+			tmp = eval + val;
+			if (tmp > G_MAXUINT)
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
 			else
-				res = MAX (res, g_value_get_float (v));
+				g_value_set_uint (cdata->data_value, (guint) tmp);
+			break;
+		}
+		case GDA_DATA_PIVOT_AVG: {
+			guint64 tmp;
+			tmp = g_value_get_uint64 (cdata->data_value) + val;
+			if (tmp > G_MAXUINT64)
+				/* FIXME: how to handle overflow detection ? */
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
+			else
+				g_value_set_uint64 (cdata->data_value, tmp);
+			break;
+		}
+		default:
+			return FALSE;
 		}
-		gda_value_free (v);
 	}
-	return res;
+	else {
+		/* new initial value */
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+		case GDA_DATA_PIVOT_MAX:
+		case GDA_DATA_PIVOT_SUM:
+			cdata->data_value = gda_value_new (G_TYPE_UINT);
+			g_value_set_uint (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_AVG:
+			cdata->data_value = gda_value_new (G_TYPE_UINT64);
+			g_value_set_uint64 (cdata->data_value, (guint64) val);
+			break;
+		default:
+			return FALSE;
+		}
+	}
+	cdata->nvalues ++;
+	return TRUE;
 }
 
-static gdouble
-compute_for_gdouble (GdaDataPivotAggregate type, GArray *values)
+static gboolean
+aggregate_handle_gint64 (CellData *cdata, gint64 val)
 {
-	guint i;
-	gdouble res = 0;
-	for (i = 0; i < values->len; i++) {
-		GValue *v;
-		v = g_array_index (values, GValue*, i);
-		if (type == GDA_DATA_PIVOT_SUM)
-			res += g_value_get_double (v);
-		else {
-			if (i == 0)
-				res = g_value_get_double (v);
-			else if (type == GDA_DATA_PIVOT_MIN)
-				res = MIN (res, g_value_get_double (v));
+	if (cdata->data_value) {
+		gint64 eval = 0;
+		if (G_VALUE_TYPE (cdata->data_value) == G_TYPE_INT64)
+			eval = g_value_get_int64 (cdata->data_value);
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+			if (eval > val)
+				g_value_set_int64 (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_MAX:
+			if (eval < val)
+				g_value_set_int64 (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_SUM:
+		case GDA_DATA_PIVOT_AVG: {
+			gint64 tmp;
+			tmp = eval + val;
+			if ((tmp > G_MAXINT64) || (tmp < G_MININT64))
+				/* FIXME: how to handle overflow detection ? */
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
 			else
-				res = MAX (res, g_value_get_double (v));
+				g_value_set_int64 (cdata->data_value, (gint64) tmp);
+			break;
+		}
+		default:
+			return FALSE;
 		}
-		gda_value_free (v);
 	}
-	return res;
+	else {
+		/* new initial value */
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+		case GDA_DATA_PIVOT_MAX:
+		case GDA_DATA_PIVOT_SUM:
+		case GDA_DATA_PIVOT_AVG:
+			cdata->data_value = gda_value_new (G_TYPE_INT64);
+			g_value_set_int64 (cdata->data_value, val);
+			break;
+		default:
+			return FALSE;
+		}
+	}
+	cdata->nvalues ++;
+	return TRUE;
+}
+
+static gboolean
+aggregate_handle_guint64 (CellData *cdata, guint val)
+{
+	if (cdata->data_value) {
+		guint64 eval = 0;
+		if (G_VALUE_TYPE (cdata->data_value) == G_TYPE_UINT64)
+			eval = g_value_get_uint64 (cdata->data_value);
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+			if (eval > val)
+				g_value_set_uint64 (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_MAX:
+			if (eval < val)
+				g_value_set_uint64 (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_SUM:
+		case GDA_DATA_PIVOT_AVG: {
+			guint64 tmp;
+			tmp = eval + val;
+			if (tmp > G_MAXUINT64)
+				/* FIXME: how to handle overflow detection ? */
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
+			else
+				g_value_set_uint64 (cdata->data_value, tmp);
+			break;
+		}
+		default:
+			return FALSE;
+		}
+	}
+	else {
+		/* new initial value */
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+		case GDA_DATA_PIVOT_MAX:
+		case GDA_DATA_PIVOT_SUM:
+		case GDA_DATA_PIVOT_AVG:
+			cdata->data_value = gda_value_new (G_TYPE_UINT64);
+			g_value_set_uint64 (cdata->data_value, val);
+			break;
+		default:
+			return FALSE;
+		}
+	}
+	cdata->nvalues ++;
+	return TRUE;
+}
+
+static gboolean
+aggregate_handle_float (CellData *cdata, gfloat val)
+{
+	if (cdata->data_value) {
+		gfloat eval = 0;
+		if (G_VALUE_TYPE (cdata->data_value) == G_TYPE_FLOAT)
+			eval = g_value_get_float (cdata->data_value);
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+			if (eval > val)
+				g_value_set_float (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_MAX:
+			if (eval < val)
+				g_value_set_float (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_SUM: {
+			gdouble tmp;
+			tmp = eval + val;
+			if ((tmp > G_MAXDOUBLE) || (tmp < G_MINDOUBLE))
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Float value overflow"));
+			else
+				g_value_set_float (cdata->data_value, (gfloat) tmp);
+			break;
+		}
+		case GDA_DATA_PIVOT_AVG: {
+			gdouble tmp;
+			tmp = g_value_get_double (cdata->data_value) + val;
+			if ((tmp > G_MAXDOUBLE) || (tmp < G_MINDOUBLE))
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Float value overflow"));
+			else
+				g_value_set_double (cdata->data_value, tmp);
+			break;
+		}
+		default:
+			return FALSE;
+		}
+	}
+	else {
+		/* new initial value */
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+		case GDA_DATA_PIVOT_MAX:
+		case GDA_DATA_PIVOT_SUM:
+			cdata->data_value = gda_value_new (G_TYPE_FLOAT);
+			g_value_set_float (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_AVG:
+			cdata->data_value = gda_value_new (G_TYPE_DOUBLE);
+			g_value_set_double (cdata->data_value, (gdouble) val);
+			break;
+		default:
+			return FALSE;
+		}
+	}
+	cdata->nvalues ++;
+	return TRUE;
+}
+
+static gboolean
+aggregate_handle_double (CellData *cdata, gdouble val)
+{
+	if (cdata->data_value) {
+		gdouble eval = 0;
+		if (G_VALUE_TYPE (cdata->data_value) == G_TYPE_DOUBLE)
+			eval = g_value_get_double (cdata->data_value);
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+			if (eval > val)
+				g_value_set_double (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_MAX:
+			if (eval < val)
+				g_value_set_double (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_SUM:
+		case GDA_DATA_PIVOT_AVG: {
+			gdouble tmp;
+			tmp = eval + val;
+			if ((tmp > G_MAXDOUBLE) || (tmp < G_MINDOUBLE))
+				/* FIXME: how to handle overflow detection ? */
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Double value overflow"));
+			else
+				g_value_set_double (cdata->data_value, (gdouble) tmp);
+			break;
+		}
+		default:
+			return FALSE;
+		}
+	}
+	else {
+		/* new initial value */
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+		case GDA_DATA_PIVOT_MAX:
+		case GDA_DATA_PIVOT_SUM:
+		case GDA_DATA_PIVOT_AVG:
+			cdata->data_value = gda_value_new (G_TYPE_DOUBLE);
+			g_value_set_double (cdata->data_value, val);
+			break;
+		default:
+			return FALSE;
+		}
+	}
+	cdata->nvalues ++;
+	return TRUE;
+}
+
+static gboolean
+aggregate_handle_char (CellData *cdata, gchar val)
+{
+	if (cdata->data_value) {
+		gchar eval = 0;
+		if (G_VALUE_TYPE (cdata->data_value) == G_TYPE_CHAR)
+			eval = g_value_get_char (cdata->data_value);
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+			if (eval > val)
+				g_value_set_char (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_MAX:
+			if (eval < val)
+				g_value_set_char (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_SUM: {
+			gint tmp;
+			tmp = eval + val;
+			if ((tmp > G_MAXINT8) || (tmp < G_MININT8))
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
+			else
+				g_value_set_char (cdata->data_value, (gchar) tmp);
+			break;
+		}
+		case GDA_DATA_PIVOT_AVG: {
+			gint64 tmp;
+			tmp = g_value_get_int64 (cdata->data_value) + val;
+			if ((tmp > G_MAXINT64) || (tmp < G_MININT64))
+				/* FIXME: how to handle overflow detection ? */
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
+			else
+				g_value_set_int64 (cdata->data_value, tmp);
+			break;
+		}
+		default:
+			return FALSE;
+		}
+	}
+	else {
+		/* new initial value */
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+		case GDA_DATA_PIVOT_MAX:
+		case GDA_DATA_PIVOT_SUM:
+			cdata->data_value = gda_value_new (G_TYPE_CHAR);
+			g_value_set_char (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_AVG:
+			cdata->data_value = gda_value_new (G_TYPE_INT64);
+			g_value_set_int64 (cdata->data_value, (gint64) val);
+			break;
+		default:
+			return FALSE;
+		}
+	}
+	cdata->nvalues ++;
+	return TRUE;
+}
+
+static gboolean
+aggregate_handle_uchar (CellData *cdata, guchar val)
+{
+	if (cdata->data_value) {
+		guchar eval = 0;
+		if (G_VALUE_TYPE (cdata->data_value) == G_TYPE_UCHAR)
+			eval = g_value_get_uchar (cdata->data_value);
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+			if (eval > val)
+				g_value_set_uchar (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_MAX:
+			if (eval < val)
+				g_value_set_uchar (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_SUM: {
+			guint tmp;
+			tmp = eval + val;
+			if (tmp > G_MAXUINT8)
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
+			else
+				g_value_set_uchar (cdata->data_value, (guchar) tmp);
+			break;
+		}
+		case GDA_DATA_PIVOT_AVG: {
+			guint64 tmp;
+			tmp = g_value_get_uint64 (cdata->data_value) + val;
+			if (tmp > G_MAXUINT64)
+				/* FIXME: how to handle overflow detection ? */
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
+			else
+				g_value_set_uint64 (cdata->data_value, tmp);
+			break;
+		}
+		default:
+			return FALSE;
+		}
+	}
+	else {
+		/* new initial value */
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+		case GDA_DATA_PIVOT_MAX:
+		case GDA_DATA_PIVOT_SUM:
+			cdata->data_value = gda_value_new (G_TYPE_UCHAR);
+			g_value_set_uchar (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_AVG:
+			cdata->data_value = gda_value_new (G_TYPE_UINT64);
+			g_value_set_uint64 (cdata->data_value, (guint64) val);
+			break;
+		default:
+			return FALSE;
+		}
+	}
+	cdata->nvalues ++;
+	return TRUE;
+}
+
+static gboolean
+aggregate_handle_short (CellData *cdata, gshort val)
+{
+	if (cdata->data_value) {
+		gshort eval = 0;
+		if (G_VALUE_TYPE (cdata->data_value) == GDA_TYPE_SHORT)
+			eval = gda_value_get_short (cdata->data_value);
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+			if (eval > val)
+				gda_value_set_short (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_MAX:
+			if (eval < val)
+				gda_value_set_short (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_SUM: {
+			gint tmp;
+			tmp = eval + val;
+			if ((tmp > G_MAXSHORT) || (tmp < G_MINSHORT))
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
+			else
+				gda_value_set_short (cdata->data_value, (gshort) tmp);
+			break;
+		}
+		case GDA_DATA_PIVOT_AVG: {
+			gint64 tmp;
+			tmp = g_value_get_int64 (cdata->data_value) + val;
+			if ((tmp > G_MAXINT64) || (tmp < G_MININT64))
+				/* FIXME: how to handle overflow detection ? */
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
+			else
+				g_value_set_int64 (cdata->data_value, tmp);
+			break;
+		}
+		default:
+			return FALSE;
+		}
+	}
+	else {
+		/* new initial value */
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+		case GDA_DATA_PIVOT_MAX:
+		case GDA_DATA_PIVOT_SUM:
+			cdata->data_value = gda_value_new (GDA_TYPE_SHORT);
+			gda_value_set_short (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_AVG:
+			cdata->data_value = gda_value_new (G_TYPE_INT64);
+			g_value_set_int64 (cdata->data_value, (gint64) val);
+			break;
+		default:
+			return FALSE;
+		}
+	}
+	cdata->nvalues ++;
+	return TRUE;
+}
+
+static gboolean
+aggregate_handle_ushort (CellData *cdata, gushort val)
+{
+	if (cdata->data_value) {
+		gushort eval = 0;
+		if (G_VALUE_TYPE (cdata->data_value) == GDA_TYPE_USHORT)
+			eval = gda_value_get_ushort (cdata->data_value);
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+			if (eval > val)
+				gda_value_set_ushort (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_MAX:
+			if (eval < val)
+				gda_value_set_ushort (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_SUM: {
+			guint tmp;
+			tmp = eval + val;
+			if (tmp > G_MAXUSHORT)
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
+			else
+				gda_value_set_ushort (cdata->data_value, (gushort) tmp);
+			break;
+		}
+		case GDA_DATA_PIVOT_AVG: {
+			guint64 tmp;
+			tmp = g_value_get_uint64 (cdata->data_value) + val;
+			if (tmp > G_MAXUINT64)
+				/* FIXME: how to handle overflow detection ? */
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
+			else
+				g_value_set_uint64 (cdata->data_value, tmp);
+			break;
+		}
+		default:
+			return FALSE;
+		}
+	}
+	else {
+		/* new initial value */
+		switch (cdata->aggregate) {
+		case GDA_DATA_PIVOT_MIN:
+		case GDA_DATA_PIVOT_MAX:
+		case GDA_DATA_PIVOT_SUM:
+			cdata->data_value = gda_value_new (GDA_TYPE_USHORT);
+			gda_value_set_ushort (cdata->data_value, val);
+			break;
+		case GDA_DATA_PIVOT_AVG:
+			cdata->data_value = gda_value_new (G_TYPE_UINT64);
+			g_value_set_uint64 (cdata->data_value, (guint64) val);
+			break;
+		default:
+			return FALSE;
+		}
+	}
+	cdata->nvalues ++;
+	return TRUE;
+}
+
+/*
+ * Returns: %TRUE if @new_value_has been handled (even if it created cdata->error),
+ * or %FALSE if cdata's aggregate needs to store all the data
+ */
+static gboolean
+aggregate_handle_new_value (CellData *cdata, const GValue *new_value)
+{
+	if (cdata->error)
+		return TRUE;
+	else if (cdata->values)
+		return FALSE;
+
+	/* chack data type */
+	if (G_VALUE_TYPE (new_value) != cdata->gtype) {
+		if (!cdata->error)
+			g_set_error (&(cdata->error),
+				     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_INTERNAL_ERROR,
+				     "%s", _("Inconsistent data type"));
+		return TRUE;
+	}
+
+	if (cdata->aggregate == GDA_DATA_PIVOT_COUNT) {
+		if (cdata->data_value) {
+			guint64 tmp;
+			tmp = g_value_get_uint (cdata->data_value) + 1;
+			if (tmp >= G_MAXUINT)
+				g_set_error (&(cdata->error),
+					     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_OVERFLOW_ERROR,
+					     "%s", _("Integer overflow"));
+			else
+				g_value_set_uint (cdata->data_value, (guint) tmp);
+		}
+		else {
+			cdata->data_value = gda_value_new (G_TYPE_UINT);
+			g_value_set_uint (cdata->data_value, 1);
+		}
+		return TRUE;
+	}
+	else if (cdata->gtype == G_TYPE_INT)
+		return aggregate_handle_gint (cdata, g_value_get_int (new_value));
+	else if (cdata->gtype == G_TYPE_UINT)
+		return aggregate_handle_guint (cdata, g_value_get_uint (new_value));
+	else if (cdata->gtype == G_TYPE_INT64)
+		return aggregate_handle_gint64 (cdata, g_value_get_int64 (new_value));
+	else if (cdata->gtype == G_TYPE_UINT64)
+		return aggregate_handle_guint64 (cdata, g_value_get_uint64 (new_value));
+	else if (cdata->gtype == G_TYPE_FLOAT)
+		return aggregate_handle_float (cdata, g_value_get_float (new_value));
+	else if (cdata->gtype == G_TYPE_DOUBLE)
+		return aggregate_handle_double (cdata, g_value_get_double (new_value));
+	else if (cdata->gtype == G_TYPE_CHAR)
+		return aggregate_handle_char (cdata, g_value_get_char (new_value));
+	else if (cdata->gtype == G_TYPE_UCHAR)
+		return aggregate_handle_uchar (cdata, g_value_get_uchar (new_value));
+	else if (cdata->gtype == GDA_TYPE_SHORT)
+		return aggregate_handle_short (cdata, gda_value_get_short (new_value));
+	else if (cdata->gtype == GDA_TYPE_USHORT)
+		return aggregate_handle_ushort (cdata, gda_value_get_ushort (new_value));
+	else {
+		/* incompatible data type for this kind of operation */
+		if (!cdata->error)
+			g_set_error (&(cdata->error),
+				     GDA_DATA_PIVOT_ERROR, GDA_DATA_PIVOT_INTERNAL_ERROR,
+				     "%s", _("Data type does not support requested computation"));
+		return TRUE;
+	}
 }
 
 /*
@@ -588,101 +1192,51 @@ compute_for_gdouble (GdaDataPivotAggregate type, GArray *values)
 static void
 cell_data_compute_aggregate (CellData *cdata)
 {
-	guint i;
-	switch (cdata->aggregate) {
-	case GDA_DATA_PIVOT_AVG:
-	case GDA_DATA_PIVOT_SUM: {
-		GValue *v;
-		g_assert (cdata->values && (cdata->values->len > 0));
-		v = g_array_index (cdata->values, GValue*, 0);
-		if ((G_VALUE_TYPE (v) == G_TYPE_INT) || (G_VALUE_TYPE (v) == G_TYPE_INT64) ||
-		    (G_VALUE_TYPE (v) == G_TYPE_CHAR) || (G_VALUE_TYPE (v) == GDA_TYPE_SHORT)) {
-			gint64 sum;
-			sum = compute_for_gint (GDA_DATA_PIVOT_SUM, cdata->values);
-			if (cdata->aggregate == GDA_DATA_PIVOT_AVG) {
-				cdata->computed_value = gda_value_new (G_TYPE_DOUBLE);
-				g_value_set_double (cdata->computed_value,
-						    sum / cdata->values->len);
+	if (cdata->data_value) {
+		/* finish computation */
+		if (cdata->error) {
+			gda_value_free (cdata->data_value);
+			cdata->data_value = NULL;
+		}
+		else if (cdata->aggregate == GDA_DATA_PIVOT_AVG) {
+			gdouble dval;
+			if ((cdata->gtype == G_TYPE_INT) || (cdata->gtype == G_TYPE_INT64) ||
+			    (cdata->gtype == G_TYPE_CHAR) || (cdata->gtype == GDA_TYPE_SHORT)) {
+				gint64 val;
+				g_assert (G_VALUE_TYPE (cdata->data_value) == G_TYPE_INT64);
+				val = g_value_get_int64 (cdata->data_value);
+				dval = val / (gdouble) cdata->nvalues;
 			}
-			else {
-				cdata->computed_value = gda_value_new (G_TYPE_INT64);
-				g_value_set_int64 (cdata->computed_value, sum);
+			else if ((cdata->gtype == G_TYPE_UINT) || (cdata->gtype == G_TYPE_UINT64) ||
+				 (cdata->gtype == G_TYPE_UCHAR) || (cdata->gtype == GDA_TYPE_USHORT)) {
+				guint64 val;
+				g_assert (G_VALUE_TYPE (cdata->data_value) == G_TYPE_UINT64);
+				val = g_value_get_uint64 (cdata->data_value);
+				dval = val / (gdouble) cdata->nvalues;
 			}
-		}
-		else if (G_VALUE_TYPE (v) == G_TYPE_FLOAT) {
-			gfloat sum;
-			sum = compute_for_gfloat (GDA_DATA_PIVOT_SUM, cdata->values);
-			cdata->computed_value = gda_value_new (G_TYPE_FLOAT);
-			if (cdata->aggregate == GDA_DATA_PIVOT_AVG)
-				g_value_set_float (cdata->computed_value,
-						   sum / cdata->values->len);
-			else
-				g_value_set_float (cdata->computed_value, sum);
-		}
-		else if (G_VALUE_TYPE (v) == G_TYPE_DOUBLE) {
-			gdouble sum;
-			sum = compute_for_gdouble (GDA_DATA_PIVOT_SUM, cdata->values);
-			cdata->computed_value = gda_value_new (G_TYPE_DOUBLE);
-			if (cdata->aggregate == GDA_DATA_PIVOT_AVG)
-				g_value_set_double (cdata->computed_value,
-						   sum / cdata->values->len);
+			else if (cdata->gtype == G_TYPE_FLOAT)
+				dval = g_value_get_float (cdata->data_value) / (gdouble) cdata->nvalues;
+			else if (cdata->gtype == G_TYPE_DOUBLE)
+				dval = g_value_get_double (cdata->data_value) / (gdouble) cdata->nvalues;
 			else
-				g_value_set_double (cdata->computed_value, sum);
-		}
-		else {
-			/* error: can't compute aggregate */
-			cdata->computed_value = NULL;
+				g_assert_not_reached ();
+				
+			gda_value_free (cdata->data_value);
+			cdata->data_value = gda_value_new (G_TYPE_DOUBLE);
+			g_value_set_double (cdata->data_value, dval);
 		}
-		break;
 	}
-	case GDA_DATA_PIVOT_COUNT: {
-		guint64 count = 0;
+	else if (cdata->values) {
+		TO_IMPLEMENT;
+		guint i;
 		for (i = 0; i < cdata->values->len; i++) {
 			GValue *value;
 			value = g_array_index (cdata->values, GValue*, i);
-			count += 1;
 			gda_value_free (value);
 		}
-		cdata->computed_value = gda_value_new (G_TYPE_UINT64);
-		g_value_set_uint64 (cdata->computed_value, count);
-		break;
-	}
-	case GDA_DATA_PIVOT_MAX:
-	case GDA_DATA_PIVOT_MIN: {
-		GValue *v;
-		g_assert (cdata->values && (cdata->values->len > 0));
-		v = g_array_index (cdata->values, GValue*, 0);
-		if (G_VALUE_TYPE (v) == G_TYPE_INT) {
-			gint64 res = 0;
-			res = compute_for_gint (cdata->aggregate, cdata->values);
-			cdata->computed_value = gda_value_new (G_TYPE_INT);
-			g_value_set_int (cdata->computed_value, (gint) res);
-		}
-		else if (G_VALUE_TYPE (v) == G_TYPE_FLOAT) {
-			gfloat res;
-			res = compute_for_gfloat (cdata->aggregate, cdata->values);
-			cdata->computed_value = gda_value_new (G_TYPE_FLOAT);
-			g_value_set_float (cdata->computed_value, res);
-		}
-		else if (G_VALUE_TYPE (v) == G_TYPE_DOUBLE) {
-			gdouble res;
-			res = compute_for_gdouble (cdata->aggregate, cdata->values);
-			cdata->computed_value = gda_value_new (G_TYPE_DOUBLE);
-			g_value_set_double (cdata->computed_value, res);
-		}
-		else {
-			/* error: can't compute aggregate */
-			cdata->computed_value = NULL;
-		}
-		break;
-	}
-	default:
-		TO_IMPLEMENT;
+		g_array_free (cdata->values, TRUE);
+		cdata->values = NULL;
 	}
-
-	/* free tmp data (don't free each value, it has already been done) */
-	g_array_free (cdata->values, TRUE);
-	cdata->values = NULL;
 }
 
 static GdaSqlStatement *
@@ -1281,7 +1835,6 @@ gda_data_pivot_populate (GdaDataPivot *pivot, GError **error)
 						vcol += pivot->priv->column_fields->len;
 					cvalue = gda_data_model_iter_get_value_at_e (iter, vcol, &lerror);
 					if (!cvalue || lerror) {
-						g_object_unref (nrow);
 						g_propagate_error (error, lerror);
 						goto out;
 					}
@@ -1299,20 +1852,26 @@ gda_data_pivot_populate (GdaDataPivot *pivot, GError **error)
 					ccdata.row = *rowindex;
 					ccdata.col = *colindex;
 					ccdata.values = NULL;
-					ccdata.computed_value = NULL;
+					ccdata.data_value = NULL;
 					pcdata = g_hash_table_lookup (data_hash, &ccdata);
 					if (!pcdata) {
 						pcdata = g_new (CellData, 1);
 						pcdata->row = *rowindex;
 						pcdata->col = *colindex;
+						pcdata->error = NULL;
 						pcdata->values = NULL;
-						pcdata->computed_value = NULL;
+						pcdata->gtype = G_VALUE_TYPE (value);
+						pcdata->nvalues = 0;
+						pcdata->data_value = NULL;
 						pcdata->aggregate = aggregate;
 						g_hash_table_insert (data_hash, pcdata, pcdata);
 					}
-					if (!pcdata->values)
-						pcdata->values = g_array_new (FALSE, FALSE, sizeof (GValue*));
-					g_array_append_val (pcdata->values, value);
+					if (!aggregate_handle_new_value (pcdata, value)) {
+						if (!pcdata->values)
+							pcdata->values = g_array_new (FALSE, FALSE,
+										      sizeof (GValue*));
+						g_array_append_val (pcdata->values, value);
+					}
 					/*g_print ("row %d col %d => [%s]\n", pcdata->row, pcdata->col,
 					  gda_value_stringify (value));*/
 				}
@@ -1372,14 +1931,14 @@ gda_data_pivot_populate (GdaDataPivot *pivot, GError **error)
 			ccdata.row = j;
 			ccdata.col = i;
 			ccdata.values = NULL;
-			ccdata.computed_value = NULL;
+			ccdata.data_value = NULL;
 			pcdata = g_hash_table_lookup (data_hash, &ccdata);
 			if (pcdata) {
 				cell_data_compute_aggregate (pcdata);
-				if (pcdata->computed_value) {
-					coltype = G_VALUE_TYPE (pcdata->computed_value);
+				if (pcdata->data_value) {
+					coltype = G_VALUE_TYPE (pcdata->data_value);
 					gda_value_reset_with_type (av, coltype);
-					g_value_copy (pcdata->computed_value, av);
+					g_value_copy (pcdata->data_value, av);
 				}
 				else
 					gda_row_invalidate_value (arow, av);
@@ -1413,7 +1972,8 @@ gda_data_pivot_populate (GdaDataPivot *pivot, GError **error)
 		for (i = 0; i < first_rows->len; i++) {
 			GObject *obj;
 			obj = g_array_index (first_rows, GObject*, i);
-			g_object_unref (obj);
+			if (obj)
+				g_object_unref (obj);
 		}
 		g_array_free (first_rows, TRUE);
 	}
@@ -1649,6 +2209,6 @@ cell_data_free (CellData *cdata)
 		}
 		g_array_free (cdata->values, TRUE);
 	}
-	gda_value_free (cdata->computed_value);
+	gda_value_free (cdata->data_value);
 	g_free (cdata);
 }
diff --git a/libgda/gda-data-pivot.h b/libgda/gda-data-pivot.h
index f4650ad..c05c6ab 100644
--- a/libgda/gda-data-pivot.h
+++ b/libgda/gda-data-pivot.h
@@ -64,7 +64,8 @@ typedef enum {
 	GDA_DATA_PIVOT_INTERNAL_ERROR,
 	GDA_DATA_PIVOT_SOURCE_MODEL_ERROR,
         GDA_DATA_PIVOT_FIELD_FORMAT_ERROR,
-	GDA_DATA_PIVOT_USAGE_ERROR
+	GDA_DATA_PIVOT_USAGE_ERROR,
+	GDA_DATA_PIVOT_OVERFLOW_ERROR
 } GdaDataPivotError;
 
 /**



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]