Parallel grouping sets

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Parallel grouping sets

Richard Guo-2
Hi all,

Paul and I have been hacking recently to implement parallel grouping
sets, and here we have two implementations.

Implementation 1
================

Attached is the patch and also there is a github branch [1] for this
work.

Parallel aggregation has already been supported in PostgreSQL and it is
implemented by aggregating in two stages. First, each worker performs an
aggregation step, producing a partial result for each group of which
that process is aware. Second, the partial results are transferred to
the leader via the Gather node. Finally, the leader merges the partial
results and produces the final result for each group.

We are implementing parallel grouping sets in the same way. The only
difference is that in the final stage, the leader performs a grouping
sets aggregation, rather than a normal aggregation.

The plan looks like:

# explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3));
                       QUERY PLAN
---------------------------------------------------------
 Finalize MixedAggregate
   Output: c1, c2, avg(c3), c3
   Hash Key: t2.c2, t2.c3
   Group Key: t2.c1, t2.c2
   Group Key: t2.c1
   ->  Gather Merge
         Output: c1, c2, c3, (PARTIAL avg(c3))
         Workers Planned: 2
         ->  Sort
               Output: c1, c2, c3, (PARTIAL avg(c3))
               Sort Key: t2.c1, t2.c2
               ->  Partial HashAggregate
                     Output: c1, c2, c3, PARTIAL avg(c3)
                     Group Key: t2.c1, t2.c2, t2.c3
                     ->  Parallel Seq Scan on public.t2
                           Output: c1, c2, c3
(16 rows)


As the partial aggregation can be performed in parallel, we can expect a
speedup if the number of groups seen by the Finalize Aggregate node is
some less than the number of input rows.

For example, for the table provided in the test case within the patch,
running the above query in my Linux box:

# explain analyze select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3)); -- without patch
 Planning Time: 0.123 ms
 Execution Time: 9459.362 ms


# explain analyze select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3)); -- with patch
 Planning Time: 0.204 ms
 Execution Time: 1077.654 ms


But sometimes we may not benefit from this patch. For example, in the
worst-case scenario the number of groups seen by the Finalize Aggregate
node could be as many as the number of input rows which were seen by all
worker processes in the Partial Aggregate stage. This is prone to
happening with this patch, because the group key for Partial Aggregate
is all the columns involved in the grouping sets, such as in the above
query, it is (c1, c2, c3).

So, we have been working on another way to implement parallel grouping
sets.

Implementation 2
================

This work can be found in github branch [2]. As it contains some hacky
codes and a list of TODO items, this is far from a patch. So please
consider it as a PoC.

The idea is instead of performing grouping sets aggregation in Finalize
Aggregate, we perform it in Partial Aggregate.

The plan looks like:

# explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1));
                          QUERY PLAN
--------------------------------------------------------------
 Finalize GroupAggregate
   Output: c1, c2, avg(c3), (gset_id)
   Group Key: t2.c1, t2.c2, (gset_id)
   ->  Gather Merge
         Output: c1, c2, (gset_id), (PARTIAL avg(c3))
         Workers Planned: 2
         ->  Sort
               Output: c1, c2, (gset_id), (PARTIAL avg(c3))
               Sort Key: t2.c1, t2.c2, (gset_id)
               ->  Partial HashAggregate
                     Output: c1, c2, gset_id, PARTIAL avg(c3)
                     Hash Key: t2.c1, t2.c2
                     Hash Key: t2.c1
                     ->  Parallel Seq Scan on public.t2
                           Output: c1, c2, c3
(15 rows)


With this method, there is a problem, i.e., in the final stage of
aggregation, the leader does not have a way to distinguish which tuple
comes from which grouping set, which turns out to be needed by leader
for merging the partial results.

For instance, suppose we have a table t(c1, c2, c3) containing one row
(1, NULL, 3), and we are selecting agg(c3) group by grouping sets
((c1,c2), (c1)). Then the leader would get two tuples via Gather node
for that row, both are (1, NULL, agg(3)), one is from group by (c1,c2)
and one is from group by (c1). If the leader cannot tell that the
two tuples are from two different grouping sets, it will merge them
incorrectly.

So we add a hidden column 'gset_id', representing grouping set id, to
the targetlist of Partial Aggregate node, as well as to the group key
for Finalize Aggregate node. So only tuples coming from the same
grouping set can get merged in the final stage of aggregation.

With this method, for grouping sets with multiple rollups, to simplify
the implementation, we generate a separate aggregation path for each
rollup, and then append them for the final path.

References:
[1] https://github.com/greenplum-db/postgres/tree/parallel_groupingsets
[2] https://github.com/greenplum-db/postgres/tree/parallel_groupingsets_2

Any comments and feedback are welcome.

Thanks
Richard

v1-0001-Implementing-parallel-grouping-sets.patch (23K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

David Rowley-3
On Wed, 12 Jun 2019 at 14:59, Richard Guo <[hidden email]> wrote:
> Implementation 1

> Parallel aggregation has already been supported in PostgreSQL and it is
> implemented by aggregating in two stages. First, each worker performs an
> aggregation step, producing a partial result for each group of which
> that process is aware. Second, the partial results are transferred to
> the leader via the Gather node. Finally, the leader merges the partial
> results and produces the final result for each group.
>
> We are implementing parallel grouping sets in the same way. The only
> difference is that in the final stage, the leader performs a grouping
> sets aggregation, rather than a normal aggregation.

Hi Richard,

I think it was you an I that discussed #1 at unconference at PGCon 2
weeks ago. The good thing about #1 is that it can be implemented as
planner-only changes just by adding some additional paths and some
costing. #2 will be useful when we're unable to reduce the number of
inputs to the final aggregate node by doing the initial grouping.
However, since #1 is easier, then I'd suggest going with it first,
since it's the path of least resistance. #1 should be fine as long as
you properly cost the parallel agg and don't choose it when the number
of groups going into the final agg isn't reduced by the partial agg
node.  Which brings me to:

You'll need to do further work with the dNumGroups value. Since you're
grouping by all the columns/exprs in the grouping sets you'll need the
number of groups to be an estimate of that.

Here's a quick test I did that shows the problem:

create table abc(a int, b int, c int);
insert into abc select a,b,1 from generate_Series(1,1000)
a,generate_Series(1,1000) b;
create statistics abc_a_b_stats (ndistinct) on a,b from abc;
analyze abc;

-- Here the Partial HashAggregate really should estimate that there
will be 1 million rows.
explain analyze select a,b,sum(c) from abc group by grouping sets ((a),(b));
                                                              QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------
 Finalize HashAggregate  (cost=14137.67..14177.67 rows=2000 width=16)
(actual time=1482.746..1483.203 rows=2000 loops=1)
   Hash Key: a
   Hash Key: b
   ->  Gather  (cost=13697.67..14117.67 rows=4000 width=16) (actual
time=442.140..765.931 rows=1000000 loops=1)
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial HashAggregate  (cost=12697.67..12717.67 rows=2000
width=16) (actual time=402.917..526.045 rows=333333 loops=3)
               Group Key: a, b
               ->  Parallel Seq Scan on abc  (cost=0.00..9572.67
rows=416667 width=12) (actual time=0.036..50.275 rows=333333 loops=3)
 Planning Time: 0.140 ms
 Execution Time: 1489.734 ms
(11 rows)

but really, likely the parallel plan should not be chosen in this case
since we're not really reducing the number of groups going into the
finalize aggregate node.  That'll need to be factored into the costing
so that we don't choose the parallel plan when we're not going to
reduce the work in the finalize aggregate node. I'm unsure exactly how
that'll look. Logically, I think the choice parallelize or not to
parallelize needs to be if (cost_partial_agg + cost_gather +
cost_final_agg < cost_agg) { do it in parallel } else { do it in
serial }.  If you build both a serial and parallel set of paths then
you should see which one is cheaper without actually constructing an
"if" test like the one above.

Here's a simple group by with the same group by clause items as you
have in the plan above that does get the estimated number of groups
perfectly.  The plan above should have the same estimate.

explain analyze select a,b,sum(c) from abc group by a,b;
                                                         QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=132154.34..152154.34 rows=1000000 width=16)
(actual time=404.304..1383.343 rows=1000000 loops=1)
   Group Key: a, b
   ->  Sort  (cost=132154.34..134654.34 rows=1000000 width=12) (actual
time=404.291..620.774 rows=1000000 loops=1)
         Sort Key: a, b
         Sort Method: external merge  Disk: 21584kB
         ->  Seq Scan on abc  (cost=0.00..15406.00 rows=1000000
width=12) (actual time=0.017..100.299 rows=1000000 loops=1)
 Planning Time: 0.115 ms
 Execution Time: 1412.034 ms
(8 rows)

Also, in the tests:

> insert into gstest select 1,10,100 from generate_series(1,1000000)i;
> insert into gstest select 1,10,200 from generate_series(1,1000000)i;
> insert into gstest select 1,20,30 from generate_series(1,1000000)i;
> insert into gstest select 2,30,40 from generate_series(1,1000000)i;
> insert into gstest select 2,40,50 from generate_series(1,1000000)i;
> insert into gstest select 3,50,60 from generate_series(1,1000000)i;
> insert into gstest select 1,NULL,000000 from generate_series(1,1000000)i;
> analyze gstest;

You'll likely want to reduce the number of rows being used just to
stop the regression tests becoming slow on older machines. I think
some of the other parallel aggregate tests use must fewer rows than
what you're using there. You might be able to use the standard set of
regression test tables too, tenk, tenk1 etc. That'll save the test
having to build and populate one of its own.

--
 David Rowley                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services


Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Richard Guo-2
On Thu, Jun 13, 2019 at 12:29 PM David Rowley <[hidden email]> wrote:
On Wed, 12 Jun 2019 at 14:59, Richard Guo <[hidden email]> wrote:
> Implementation 1

> Parallel aggregation has already been supported in PostgreSQL and it is
> implemented by aggregating in two stages. First, each worker performs an
> aggregation step, producing a partial result for each group of which
> that process is aware. Second, the partial results are transferred to
> the leader via the Gather node. Finally, the leader merges the partial
> results and produces the final result for each group.
>
> We are implementing parallel grouping sets in the same way. The only
> difference is that in the final stage, the leader performs a grouping
> sets aggregation, rather than a normal aggregation.

Hi Richard,

I think it was you an I that discussed #1 at unconference at PGCon 2
weeks ago. The good thing about #1 is that it can be implemented as
planner-only changes just by adding some additional paths and some
costing. #2 will be useful when we're unable to reduce the number of
inputs to the final aggregate node by doing the initial grouping.
However, since #1 is easier, then I'd suggest going with it first,
since it's the path of least resistance. #1 should be fine as long as
you properly cost the parallel agg and don't choose it when the number
of groups going into the final agg isn't reduced by the partial agg
node.  Which brings me to:

Hi David,

Yes. Thank you for the discussion at PGCon. I learned a lot from that.
And glad to meet you here. :)

I agree with you on going with #1 first.
 

You'll need to do further work with the dNumGroups value. Since you're
grouping by all the columns/exprs in the grouping sets you'll need the
number of groups to be an estimate of that.

Exactly. The v1 patch estimates number of partial groups incorrectly, as
it calculates the number of groups for each grouping set and then add
them for dNumPartialPartialGroups, while we actually should calculate
the number of groups for all the columns in the grouping sets. I have
fixed this issue in v2 patch.
 

Here's a quick test I did that shows the problem:

create table abc(a int, b int, c int);
insert into abc select a,b,1 from generate_Series(1,1000)
a,generate_Series(1,1000) b;
create statistics abc_a_b_stats (ndistinct) on a,b from abc;
analyze abc;

-- Here the Partial HashAggregate really should estimate that there
will be 1 million rows.
explain analyze select a,b,sum(c) from abc group by grouping sets ((a),(b));
                                                              QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------
 Finalize HashAggregate  (cost=14137.67..14177.67 rows=2000 width=16)
(actual time=1482.746..1483.203 rows=2000 loops=1)
   Hash Key: a
   Hash Key: b
   ->  Gather  (cost=13697.67..14117.67 rows=4000 width=16) (actual
time=442.140..765.931 rows=1000000 loops=1)
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial HashAggregate  (cost=12697.67..12717.67 rows=2000
width=16) (actual time=402.917..526.045 rows=333333 loops=3)
               Group Key: a, b
               ->  Parallel Seq Scan on abc  (cost=0.00..9572.67
rows=416667 width=12) (actual time=0.036..50.275 rows=333333 loops=3)
 Planning Time: 0.140 ms
 Execution Time: 1489.734 ms
(11 rows)

but really, likely the parallel plan should not be chosen in this case
since we're not really reducing the number of groups going into the
finalize aggregate node.  That'll need to be factored into the costing
so that we don't choose the parallel plan when we're not going to
reduce the work in the finalize aggregate node. I'm unsure exactly how
that'll look. Logically, I think the choice parallelize or not to
parallelize needs to be if (cost_partial_agg + cost_gather +
cost_final_agg < cost_agg) { do it in parallel } else { do it in
serial }.  If you build both a serial and parallel set of paths then
you should see which one is cheaper without actually constructing an
"if" test like the one above.

Both the serial and parallel set of paths would be built and the cheaper
one will be selected. So we don't need the 'if' test.

With v2 patch, the parallel plan will not be chosen for the above query:

# explain analyze select a,b,sum(c) from abc group by grouping sets ((a),(b));
                                                      QUERY PLAN
----------------------------------------------------------------------------------------------------------------------
 HashAggregate  (cost=20406.00..25426.00 rows=2000 width=16) (actual time=935.048..935.697 rows=2000 loops=1)
   Hash Key: a
   Hash Key: b
   ->  Seq Scan on abc  (cost=0.00..15406.00 rows=1000000 width=12) (actual time=0.041..170.906 rows=1000000 loops=1)
 Planning Time: 0.240 ms
 Execution Time: 935.978 ms
(6 rows)

 

Here's a simple group by with the same group by clause items as you
have in the plan above that does get the estimated number of groups
perfectly.  The plan above should have the same estimate.

explain analyze select a,b,sum(c) from abc group by a,b;
                                                         QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=132154.34..152154.34 rows=1000000 width=16)
(actual time=404.304..1383.343 rows=1000000 loops=1)
   Group Key: a, b
   ->  Sort  (cost=132154.34..134654.34 rows=1000000 width=12) (actual
time=404.291..620.774 rows=1000000 loops=1)
         Sort Key: a, b
         Sort Method: external merge  Disk: 21584kB
         ->  Seq Scan on abc  (cost=0.00..15406.00 rows=1000000
width=12) (actual time=0.017..100.299 rows=1000000 loops=1)
 Planning Time: 0.115 ms
 Execution Time: 1412.034 ms
(8 rows)

Also, in the tests:

> insert into gstest select 1,10,100 from generate_series(1,1000000)i;
> insert into gstest select 1,10,200 from generate_series(1,1000000)i;
> insert into gstest select 1,20,30 from generate_series(1,1000000)i;
> insert into gstest select 2,30,40 from generate_series(1,1000000)i;
> insert into gstest select 2,40,50 from generate_series(1,1000000)i;
> insert into gstest select 3,50,60 from generate_series(1,1000000)i;
> insert into gstest select 1,NULL,000000 from generate_series(1,1000000)i;
> analyze gstest;

You'll likely want to reduce the number of rows being used just to
stop the regression tests becoming slow on older machines. I think
some of the other parallel aggregate tests use must fewer rows than
what you're using there. You might be able to use the standard set of
regression test tables too, tenk, tenk1 etc. That'll save the test
having to build and populate one of its own.

Yes, that makes sense. Table size has been reduced in v2 patch.
Currently I do not use the standard regression test tables as I'd like
to customize the table with some specific data for correctness
verification. But we may switch to the standard test table later.

Also in v2 patch, I'v fixed two addition issues. One is about the sort
key for sort-based grouping sets in Partial Aggregate, which should be
all the columns in parse->groupClause. The other one is about
GroupingFunc. Since Partial Aggregate will not handle multiple grouping
sets at once, it does not need to evaluate GroupingFunc. So GroupingFunc
is removed from the targetlists of Partial Aggregate.

Thanks
Richard 

v2-0001-Implementing-parallel-grouping-sets.patch (29K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Tomas Vondra-4
In reply to this post by Richard Guo-2
On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote:

>Hi all,
>
>Paul and I have been hacking recently to implement parallel grouping
>sets, and here we have two implementations.
>
>Implementation 1
>================
>
>Attached is the patch and also there is a github branch [1] for this
>work.
>
>Parallel aggregation has already been supported in PostgreSQL and it is
>implemented by aggregating in two stages. First, each worker performs an
>aggregation step, producing a partial result for each group of which
>that process is aware. Second, the partial results are transferred to
>the leader via the Gather node. Finally, the leader merges the partial
>results and produces the final result for each group.
>
>We are implementing parallel grouping sets in the same way. The only
>difference is that in the final stage, the leader performs a grouping
>sets aggregation, rather than a normal aggregation.
>
>The plan looks like:
>
># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
>grouping sets((c1,c2), (c1), (c2,c3));
>                       QUERY PLAN
>---------------------------------------------------------
> Finalize MixedAggregate
>   Output: c1, c2, avg(c3), c3
>   Hash Key: t2.c2, t2.c3
>   Group Key: t2.c1, t2.c2
>   Group Key: t2.c1
>   ->  Gather Merge
>         Output: c1, c2, c3, (PARTIAL avg(c3))
>         Workers Planned: 2
>         ->  Sort
>               Output: c1, c2, c3, (PARTIAL avg(c3))
>               Sort Key: t2.c1, t2.c2
>               ->  Partial HashAggregate
>                     Output: c1, c2, c3, PARTIAL avg(c3)
>                     Group Key: t2.c1, t2.c2, t2.c3
>                     ->  Parallel Seq Scan on public.t2
>                           Output: c1, c2, c3
>(16 rows)
>
>As the partial aggregation can be performed in parallel, we can expect a
>speedup if the number of groups seen by the Finalize Aggregate node is
>some less than the number of input rows.
>
>For example, for the table provided in the test case within the patch,
>running the above query in my Linux box:
>
># explain analyze select c1, c2, avg(c3) from t2 group by grouping
>sets((c1,c2), (c1), (c2,c3)); -- without patch
> Planning Time: 0.123 ms
> Execution Time: 9459.362 ms
>
># explain analyze select c1, c2, avg(c3) from t2 group by grouping
>sets((c1,c2), (c1), (c2,c3)); -- with patch
> Planning Time: 0.204 ms
> Execution Time: 1077.654 ms
>

Very nice. That's pretty much exactly how I imagined it'd work.

>But sometimes we may not benefit from this patch. For example, in the
>worst-case scenario the number of groups seen by the Finalize Aggregate
>node could be as many as the number of input rows which were seen by all
>worker processes in the Partial Aggregate stage. This is prone to
>happening with this patch, because the group key for Partial Aggregate
>is all the columns involved in the grouping sets, such as in the above
>query, it is (c1, c2, c3).
>
>So, we have been working on another way to implement parallel grouping
>sets.
>
>Implementation 2
>================
>
>This work can be found in github branch [2]. As it contains some hacky
>codes and a list of TODO items, this is far from a patch. So please
>consider it as a PoC.
>
>The idea is instead of performing grouping sets aggregation in Finalize
>Aggregate, we perform it in Partial Aggregate.
>
>The plan looks like:
>
># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
>grouping sets((c1,c2), (c1));
>                          QUERY PLAN
>--------------------------------------------------------------
> Finalize GroupAggregate
>   Output: c1, c2, avg(c3), (gset_id)
>   Group Key: t2.c1, t2.c2, (gset_id)
>   ->  Gather Merge
>         Output: c1, c2, (gset_id), (PARTIAL avg(c3))
>         Workers Planned: 2
>         ->  Sort
>               Output: c1, c2, (gset_id), (PARTIAL avg(c3))
>               Sort Key: t2.c1, t2.c2, (gset_id)
>               ->  Partial HashAggregate
>                     Output: c1, c2, gset_id, PARTIAL avg(c3)
>                     Hash Key: t2.c1, t2.c2
>                     Hash Key: t2.c1
>                     ->  Parallel Seq Scan on public.t2
>                           Output: c1, c2, c3
>(15 rows)
>

OK, I'm not sure I understand the point of this - can you give an
example which is supposed to benefit from this? Where does the speedup
came from?

regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services


Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

David Rowley-3
On Fri, 14 Jun 2019 at 11:45, Tomas Vondra <[hidden email]> wrote:
>
> On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote:

> ># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
> >grouping sets((c1,c2), (c1));
> >                          QUERY PLAN
> >--------------------------------------------------------------
> > Finalize GroupAggregate
> >   Output: c1, c2, avg(c3), (gset_id)
> >   Group Key: t2.c1, t2.c2, (gset_id)
> >   ->  Gather Merge
> >         Output: c1, c2, (gset_id), (PARTIAL avg(c3))
> >         Workers Planned: 2
> >         ->  Sort
> >               Output: c1, c2, (gset_id), (PARTIAL avg(c3))
> >               Sort Key: t2.c1, t2.c2, (gset_id)
> >               ->  Partial HashAggregate
> >                     Output: c1, c2, gset_id, PARTIAL avg(c3)
> >                     Hash Key: t2.c1, t2.c2
> >                     Hash Key: t2.c1
> >                     ->  Parallel Seq Scan on public.t2
> >                           Output: c1, c2, c3
> >(15 rows)
> >
>
> OK, I'm not sure I understand the point of this - can you give an
> example which is supposed to benefit from this? Where does the speedup
> came from?

I think this is a bad example since the first grouping set is a
superset of the 2nd. If those were independent and each grouping set
produced a reasonable number of groups then it may be better to do it
this way instead of grouping by all exprs in all grouping sets in the
first phase, as is done by #1.   To do #2 would require that we tag
the aggregate state with the grouping set that belong to, which seem
to be what gset_id is in Richard's output.

In my example upthread the first phase of aggregation produced a group
per input row. Method #2 would work better for that case since it
would only produce 2000 groups instead of 1 million.

Likely both methods would be good to consider, but since #1 seems much
easier than #2, then to me it seems to make sense to start there.

--
 David Rowley                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services


Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Tomas Vondra-4
On Fri, Jun 14, 2019 at 12:02:52PM +1200, David Rowley wrote:

>On Fri, 14 Jun 2019 at 11:45, Tomas Vondra <[hidden email]> wrote:
>>
>> On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote:
>
>> ># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
>> >grouping sets((c1,c2), (c1));
>> >                          QUERY PLAN
>> >--------------------------------------------------------------
>> > Finalize GroupAggregate
>> >   Output: c1, c2, avg(c3), (gset_id)
>> >   Group Key: t2.c1, t2.c2, (gset_id)
>> >   ->  Gather Merge
>> >         Output: c1, c2, (gset_id), (PARTIAL avg(c3))
>> >         Workers Planned: 2
>> >         ->  Sort
>> >               Output: c1, c2, (gset_id), (PARTIAL avg(c3))
>> >               Sort Key: t2.c1, t2.c2, (gset_id)
>> >               ->  Partial HashAggregate
>> >                     Output: c1, c2, gset_id, PARTIAL avg(c3)
>> >                     Hash Key: t2.c1, t2.c2
>> >                     Hash Key: t2.c1
>> >                     ->  Parallel Seq Scan on public.t2
>> >                           Output: c1, c2, c3
>> >(15 rows)
>> >
>>
>> OK, I'm not sure I understand the point of this - can you give an
>> example which is supposed to benefit from this? Where does the speedup
>> came from?
>
>I think this is a bad example since the first grouping set is a
>superset of the 2nd. If those were independent and each grouping set
>produced a reasonable number of groups then it may be better to do it
>this way instead of grouping by all exprs in all grouping sets in the
>first phase, as is done by #1.   To do #2 would require that we tag
>the aggregate state with the grouping set that belong to, which seem
>to be what gset_id is in Richard's output.
>

Aha! So if we have grouping sets (a,b) and (c,d), then with the first
approach we'd do partial aggregate on (a,b,c,d) - which may produce
quite a few distinct groups, making it inefficient. But with the second
approach, we'd do just (a,b) and (c,d) and mark the rows with gset_id.

Neat!

>In my example upthread the first phase of aggregation produced a group
>per input row. Method #2 would work better for that case since it
>would only produce 2000 groups instead of 1 million.
>
>Likely both methods would be good to consider, but since #1 seems much
>easier than #2, then to me it seems to make sense to start there.
>

Yep. Thanks for the explanation.


regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services


Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Richard Guo-2
In reply to this post by Richard Guo-2
On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <[hidden email]> wrote:
Hi all,

Paul and I have been hacking recently to implement parallel grouping
sets, and here we have two implementations.

Implementation 1
================

Attached is the patch and also there is a github branch [1] for this
work.

Rebased with the latest master.

Thanks
Richard 

v3-0001-Implementing-parallel-grouping-sets.patch (31K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Tomas Vondra-4
On Tue, Jul 30, 2019 at 03:50:32PM +0800, Richard Guo wrote:

>On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <[hidden email]> wrote:
>
>> Hi all,
>>
>> Paul and I have been hacking recently to implement parallel grouping
>> sets, and here we have two implementations.
>>
>> Implementation 1
>> ================
>>
>> Attached is the patch and also there is a github branch [1] for this
>> work.
>>
>
>Rebased with the latest master.
>

Hi Richard,

thanks for the rebased patch. I think the patch is mostly fine (at least I
don't see any serious issues). A couple minor comments:

1) I think get_number_of_groups() would deserve a short explanation why
it's OK to handle (non-partial) grouping sets and regular GROUP BY in the
same branch. Before these cases were clearly separated, now it seems a bit
mixed up and it may not be immediately obvious why it's OK.

2) There are new regression tests, but they are not added to any schedule
(parallel or serial), and so are not executed as part of "make check". I
suppose this is a mistake.

3) The regression tests do check plan and results like this:

    EXPLAIN (COSTS OFF, VERBOSE) SELECT ...;
    SELECT ... ORDER BY 1, 2, 3;

which however means that the query might easily use a different plan than
what's verified in the eplain (thanks to the additional ORDER BY clause).
So I think this should explain and execute the same query.

(In this case the plans seems to be the same, but that may easily change
in the future, and we could miss it here, failing to verify the results.)

4) It might be a good idea to check the negative case too, i.e. a query on
data set that we should not parallelize (because the number of partial
groups would be too high).


Do you have any plans to hack on the second approach too? AFAICS those two
approaches are complementary (address different data sets / queries), and
it would be nice to have both. One of the things I've been wondering is if
we need to invent gset_id as a new concept, or if we could simply use the
existing GROUPING() function - that uniquely identifies the grouping set.


regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Richard Guo-2
On Tue, Jul 30, 2019 at 11:05 PM Tomas Vondra <[hidden email]> wrote:
On Tue, Jul 30, 2019 at 03:50:32PM +0800, Richard Guo wrote:
>On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <[hidden email]> wrote:
>
>> Hi all,
>>
>> Paul and I have been hacking recently to implement parallel grouping
>> sets, and here we have two implementations.
>>
>> Implementation 1
>> ================
>>
>> Attached is the patch and also there is a github branch [1] for this
>> work.
>>
>
>Rebased with the latest master.
>

Hi Richard,

thanks for the rebased patch. I think the patch is mostly fine (at least I
don't see any serious issues). A couple minor comments:

Hi Tomas,

Thank you for reviewing this patch.
 

1) I think get_number_of_groups() would deserve a short explanation why
it's OK to handle (non-partial) grouping sets and regular GROUP BY in the
same branch. Before these cases were clearly separated, now it seems a bit
mixed up and it may not be immediately obvious why it's OK.

Added a short comment in get_number_of_groups() explaining the behavior
when doing partial aggregation for grouping sets.
 

2) There are new regression tests, but they are not added to any schedule
(parallel or serial), and so are not executed as part of "make check". I
suppose this is a mistake.

Yes, thanks. Added the new regression test in parallel_schedule and
serial_schedule.
 

3) The regression tests do check plan and results like this:

    EXPLAIN (COSTS OFF, VERBOSE) SELECT ...;
    SELECT ... ORDER BY 1, 2, 3;

which however means that the query might easily use a different plan than
what's verified in the eplain (thanks to the additional ORDER BY clause).
So I think this should explain and execute the same query.

(In this case the plans seems to be the same, but that may easily change
in the future, and we could miss it here, failing to verify the results.)

Thank you for pointing this out. Fixed it in V4 patch.
 

4) It might be a good idea to check the negative case too, i.e. a query on
data set that we should not parallelize (because the number of partial
groups would be too high).

Yes, agree. Added a negative case.
 


Do you have any plans to hack on the second approach too? AFAICS those two
approaches are complementary (address different data sets / queries), and
it would be nice to have both. One of the things I've been wondering is if
we need to invent gset_id as a new concept, or if we could simply use the
existing GROUPING() function - that uniquely identifies the grouping set.


Yes, I'm planning to hack on the second approach in short future. I'm
also reconsidering the gset_id stuff since it brings a lot of complexity
for the second approach.  I agree with you that we can try GROUPING()
function to see if it can replace gset_id.

Thanks
Richard

v4-0001-Implementing-parallel-grouping-sets.patch (33K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Pengzhou Tang
Hi Richard & Tomas:

I followed the idea of the second approach to add a gset_id in the targetlist of the first stage of 
grouping sets and uses it to combine the aggregate in final stage. gset_id stuff is still kept
because of GROUPING() cannot uniquely identify a grouping set, grouping sets may contain
duplicated set, eg: group by grouping sets((c1, c2), (c1,c2)).

There are some differences to implement the second approach comparing to the original idea from
Richard, gset_id is not used as additional group key in the final stage, instead, we use it to
dispatch the input tuple to the specified grouping set directly and then do the aggregate.
One advantage of this is that we can handle multiple rollups with better performance without APPEND node.

the plan now looks like:

gpadmin=# explain select c1, c2 from gstest group by grouping sets(rollup(c1, c2), rollup(c3));
                                         QUERY PLAN
--------------------------------------------------------------------------------------------
 Finalize MixedAggregate  (cost=1000.00..73108.57 rows=8842 width=12)
   Dispatched by: (GROUPINGSETID())
   Hash Key: c1, c2
   Hash Key: c1
   Hash Key: c3
   Group Key: ()
     Group Key: ()
   ->  Gather  (cost=1000.00..71551.48 rows=17684 width=16)
         Workers Planned: 2
         ->  Partial MixedAggregate  (cost=0.00..68783.08 rows=8842 width=16)
               Hash Key: c1, c2
               Hash Key: c1
               Hash Key: c3
               Group Key: ()
               Group Key: ()
               ->  Parallel Seq Scan on gstest  (cost=0.00..47861.33 rows=2083333 width=12)
(16 rows)

gpadmin=# set enable_hashagg to off;
gpadmin=# explain select c1, c2 from gstest group by grouping sets(rollup(c1, c2), rollup(c3));
                                               QUERY PLAN
--------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=657730.66..663207.45 rows=8842 width=12)
   Dispatched by: (GROUPINGSETID())
   Group Key: c1, c2
   Sort Key: c1
     Group Key: c1
     Group Key: ()
     Group Key: ()
   Sort Key: c3
     Group Key: c3
   ->  Sort  (cost=657730.66..657774.87 rows=17684 width=16)
         Sort Key: c1, c2
         ->  Gather  (cost=338722.94..656483.04 rows=17684 width=16)
               Workers Planned: 2
               ->  Partial GroupAggregate  (cost=337722.94..653714.64 rows=8842 width=16)
                     Group Key: c1, c2
                     Group Key: c1
                     Group Key: ()
                     Group Key: ()
                     Sort Key: c3
                       Group Key: c3
                     ->  Sort  (cost=337722.94..342931.28 rows=2083333 width=12)
                           Sort Key: c1, c2
                           ->  Parallel Seq Scan on gstest  (cost=0.00..47861.33 rows=2083333 width=12)



On Wed, Jul 31, 2019 at 4:07 PM Richard Guo <[hidden email]> wrote:
On Tue, Jul 30, 2019 at 11:05 PM Tomas Vondra <[hidden email]> wrote:
On Tue, Jul 30, 2019 at 03:50:32PM +0800, Richard Guo wrote:
>On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <[hidden email]> wrote:
>
>> Hi all,
>>
>> Paul and I have been hacking recently to implement parallel grouping
>> sets, and here we have two implementations.
>>
>> Implementation 1
>> ================
>>
>> Attached is the patch and also there is a github branch [1] for this
>> work.
>>
>
>Rebased with the latest master.
>

Hi Richard,

thanks for the rebased patch. I think the patch is mostly fine (at least I
don't see any serious issues). A couple minor comments:

Hi Tomas,

Thank you for reviewing this patch.
 

1) I think get_number_of_groups() would deserve a short explanation why
it's OK to handle (non-partial) grouping sets and regular GROUP BY in the
same branch. Before these cases were clearly separated, now it seems a bit
mixed up and it may not be immediately obvious why it's OK.

Added a short comment in get_number_of_groups() explaining the behavior
when doing partial aggregation for grouping sets.
 

2) There are new regression tests, but they are not added to any schedule
(parallel or serial), and so are not executed as part of "make check". I
suppose this is a mistake.

Yes, thanks. Added the new regression test in parallel_schedule and
serial_schedule.
 

3) The regression tests do check plan and results like this:

    EXPLAIN (COSTS OFF, VERBOSE) SELECT ...;
    SELECT ... ORDER BY 1, 2, 3;

which however means that the query might easily use a different plan than
what's verified in the eplain (thanks to the additional ORDER BY clause).
So I think this should explain and execute the same query.

(In this case the plans seems to be the same, but that may easily change
in the future, and we could miss it here, failing to verify the results.)

Thank you for pointing this out. Fixed it in V4 patch.
 

4) It might be a good idea to check the negative case too, i.e. a query on
data set that we should not parallelize (because the number of partial
groups would be too high).

Yes, agree. Added a negative case.
 


Do you have any plans to hack on the second approach too? AFAICS those two
approaches are complementary (address different data sets / queries), and
it would be nice to have both. One of the things I've been wondering is if
we need to invent gset_id as a new concept, or if we could simply use the
existing GROUPING() function - that uniquely identifies the grouping set.


Yes, I'm planning to hack on the second approach in short future. I'm
also reconsidering the gset_id stuff since it brings a lot of complexity
for the second approach.  I agree with you that we can try GROUPING()
function to see if it can replace gset_id.

Thanks
Richard

0001-Support-for-parallel-grouping-sets.patch (86K) Download Attachment