

Hi all, Paul and I have been hacking recently to implement parallel grouping sets, and here we have two implementations. Implementation 1 ================ Attached is the patch and also there is a github branch [1] for this work. Parallel aggregation has already been supported in PostgreSQL and it is implemented by aggregating in two stages. First, each worker performs an aggregation step, producing a partial result for each group of which that process is aware. Second, the partial results are transferred to the leader via the Gather node. Finally, the leader merges the partial results and produces the final result for each group. We are implementing parallel grouping sets in the same way. The only difference is that in the final stage, the leader performs a grouping sets aggregation, rather than a normal aggregation. The plan looks like: # explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3)); QUERY PLAN  Finalize MixedAggregate Output: c1, c2, avg(c3), c3 Hash Key: t2.c2, t2.c3 Group Key: t2.c1, t2.c2 Group Key: t2.c1 > Gather Merge Output: c1, c2, c3, (PARTIAL avg(c3)) Workers Planned: 2 > Sort Output: c1, c2, c3, (PARTIAL avg(c3)) Sort Key: t2.c1, t2.c2 > Partial HashAggregate Output: c1, c2, c3, PARTIAL avg(c3) Group Key: t2.c1, t2.c2, t2.c3 > Parallel Seq Scan on public.t2 Output: c1, c2, c3 (16 rows)As the partial aggregation can be performed in parallel, we can expect a speedup if the number of groups seen by the Finalize Aggregate node is some less than the number of input rows. For example, for the table provided in the test case within the patch, running the above query in my Linux box: # explain analyze select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3));  without patch Planning Time: 0.123 ms Execution Time: 9459.362 ms# explain analyze select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3));  with patch Planning Time: 0.204 ms Execution Time: 1077.654 msBut sometimes we may not benefit from this patch. For example, in the worstcase scenario the number of groups seen by the Finalize Aggregate node could be as many as the number of input rows which were seen by all worker processes in the Partial Aggregate stage. This is prone to happening with this patch, because the group key for Partial Aggregate is all the columns involved in the grouping sets, such as in the above query, it is (c1, c2, c3). So, we have been working on another way to implement parallel grouping sets.
Implementation 2 ================ This work can be found in github branch [2]. As it contains some hacky codes and a list of TODO items, this is far from a patch. So please consider it as a PoC. The idea is instead of performing grouping sets aggregation in Finalize Aggregate, we perform it in Partial Aggregate. The plan looks like: # explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1)); QUERY PLAN  Finalize GroupAggregate Output: c1, c2, avg(c3), (gset_id) Group Key: t2.c1, t2.c2, (gset_id) > Gather Merge Output: c1, c2, (gset_id), (PARTIAL avg(c3)) Workers Planned: 2 > Sort Output: c1, c2, (gset_id), (PARTIAL avg(c3)) Sort Key: t2.c1, t2.c2, (gset_id) > Partial HashAggregate Output: c1, c2, gset_id, PARTIAL avg(c3) Hash Key: t2.c1, t2.c2 Hash Key: t2.c1 > Parallel Seq Scan on public.t2 Output: c1, c2, c3 (15 rows)With this method, there is a problem, i.e., in the final stage of aggregation, the leader does not have a way to distinguish which tuple comes from which grouping set, which turns out to be needed by leader for merging the partial results. For instance, suppose we have a table t(c1, c2, c3) containing one row (1, NULL, 3), and we are selecting agg(c3) group by grouping sets ((c1,c2), (c1)). Then the leader would get two tuples via Gather node for that row, both are (1, NULL, agg(3)), one is from group by (c1,c2) and one is from group by (c1). If the leader cannot tell that the two tuples are from two different grouping sets, it will merge them incorrectly. So we add a hidden column 'gset_id', representing grouping set id, to the targetlist of Partial Aggregate node, as well as to the group key for Finalize Aggregate node. So only tuples coming from the same grouping set can get merged in the final stage of aggregation. With this method, for grouping sets with multiple rollups, to simplify the implementation, we generate a separate aggregation path for each rollup, and then append them for the final path. References: [1] https://github.com/greenplumdb/postgres/tree/parallel_groupingsets[2] https://github.com/greenplumdb/postgres/tree/parallel_groupingsets_2
Any comments and feedback are welcome.
Thanks Richard


On Wed, 12 Jun 2019 at 14:59, Richard Guo < [hidden email]> wrote:
> Implementation 1
> Parallel aggregation has already been supported in PostgreSQL and it is
> implemented by aggregating in two stages. First, each worker performs an
> aggregation step, producing a partial result for each group of which
> that process is aware. Second, the partial results are transferred to
> the leader via the Gather node. Finally, the leader merges the partial
> results and produces the final result for each group.
>
> We are implementing parallel grouping sets in the same way. The only
> difference is that in the final stage, the leader performs a grouping
> sets aggregation, rather than a normal aggregation.
Hi Richard,
I think it was you an I that discussed #1 at unconference at PGCon 2
weeks ago. The good thing about #1 is that it can be implemented as
planneronly changes just by adding some additional paths and some
costing. #2 will be useful when we're unable to reduce the number of
inputs to the final aggregate node by doing the initial grouping.
However, since #1 is easier, then I'd suggest going with it first,
since it's the path of least resistance. #1 should be fine as long as
you properly cost the parallel agg and don't choose it when the number
of groups going into the final agg isn't reduced by the partial agg
node. Which brings me to:
You'll need to do further work with the dNumGroups value. Since you're
grouping by all the columns/exprs in the grouping sets you'll need the
number of groups to be an estimate of that.
Here's a quick test I did that shows the problem:
create table abc(a int, b int, c int);
insert into abc select a,b,1 from generate_Series(1,1000)
a,generate_Series(1,1000) b;
create statistics abc_a_b_stats (ndistinct) on a,b from abc;
analyze abc;
 Here the Partial HashAggregate really should estimate that there
will be 1 million rows.
explain analyze select a,b,sum(c) from abc group by grouping sets ((a),(b));
QUERY PLAN

Finalize HashAggregate (cost=14137.67..14177.67 rows=2000 width=16)
(actual time=1482.746..1483.203 rows=2000 loops=1)
Hash Key: a
Hash Key: b
> Gather (cost=13697.67..14117.67 rows=4000 width=16) (actual
time=442.140..765.931 rows=1000000 loops=1)
Workers Planned: 2
Workers Launched: 2
> Partial HashAggregate (cost=12697.67..12717.67 rows=2000
width=16) (actual time=402.917..526.045 rows=333333 loops=3)
Group Key: a, b
> Parallel Seq Scan on abc (cost=0.00..9572.67
rows=416667 width=12) (actual time=0.036..50.275 rows=333333 loops=3)
Planning Time: 0.140 ms
Execution Time: 1489.734 ms
(11 rows)
but really, likely the parallel plan should not be chosen in this case
since we're not really reducing the number of groups going into the
finalize aggregate node. That'll need to be factored into the costing
so that we don't choose the parallel plan when we're not going to
reduce the work in the finalize aggregate node. I'm unsure exactly how
that'll look. Logically, I think the choice parallelize or not to
parallelize needs to be if (cost_partial_agg + cost_gather +
cost_final_agg < cost_agg) { do it in parallel } else { do it in
serial }. If you build both a serial and parallel set of paths then
you should see which one is cheaper without actually constructing an
"if" test like the one above.
Here's a simple group by with the same group by clause items as you
have in the plan above that does get the estimated number of groups
perfectly. The plan above should have the same estimate.
explain analyze select a,b,sum(c) from abc group by a,b;
QUERY PLAN

GroupAggregate (cost=132154.34..152154.34 rows=1000000 width=16)
(actual time=404.304..1383.343 rows=1000000 loops=1)
Group Key: a, b
> Sort (cost=132154.34..134654.34 rows=1000000 width=12) (actual
time=404.291..620.774 rows=1000000 loops=1)
Sort Key: a, b
Sort Method: external merge Disk: 21584kB
> Seq Scan on abc (cost=0.00..15406.00 rows=1000000
width=12) (actual time=0.017..100.299 rows=1000000 loops=1)
Planning Time: 0.115 ms
Execution Time: 1412.034 ms
(8 rows)
Also, in the tests:
> insert into gstest select 1,10,100 from generate_series(1,1000000)i;
> insert into gstest select 1,10,200 from generate_series(1,1000000)i;
> insert into gstest select 1,20,30 from generate_series(1,1000000)i;
> insert into gstest select 2,30,40 from generate_series(1,1000000)i;
> insert into gstest select 2,40,50 from generate_series(1,1000000)i;
> insert into gstest select 3,50,60 from generate_series(1,1000000)i;
> insert into gstest select 1,NULL,000000 from generate_series(1,1000000)i;
> analyze gstest;
You'll likely want to reduce the number of rows being used just to
stop the regression tests becoming slow on older machines. I think
some of the other parallel aggregate tests use must fewer rows than
what you're using there. You might be able to use the standard set of
regression test tables too, tenk, tenk1 etc. That'll save the test
having to build and populate one of its own.

David Rowley


Hi David,
Yes. Thank you for the discussion at PGCon. I learned a lot from that. And glad to meet you here. :)
I agree with you on going with #1 first.
You'll need to do further work with the dNumGroups value. Since you're
grouping by all the columns/exprs in the grouping sets you'll need the
number of groups to be an estimate of that.
Exactly. The v1 patch estimates number of partial groups incorrectly, as it calculates the number of groups for each grouping set and then add them for dNumPartialPartialGroups, while we actually should calculate the number of groups for all the columns in the grouping sets. I have fixed this issue in v2 patch.
Here's a quick test I did that shows the problem:
create table abc(a int, b int, c int);
insert into abc select a,b,1 from generate_Series(1,1000)
a,generate_Series(1,1000) b;
create statistics abc_a_b_stats (ndistinct) on a,b from abc;
analyze abc;
 Here the Partial HashAggregate really should estimate that there
will be 1 million rows.
explain analyze select a,b,sum(c) from abc group by grouping sets ((a),(b));
QUERY PLAN

Finalize HashAggregate (cost=14137.67..14177.67 rows=2000 width=16)
(actual time=1482.746..1483.203 rows=2000 loops=1)
Hash Key: a
Hash Key: b
> Gather (cost=13697.67..14117.67 rows=4000 width=16) (actual
time=442.140..765.931 rows=1000000 loops=1)
Workers Planned: 2
Workers Launched: 2
> Partial HashAggregate (cost=12697.67..12717.67 rows=2000
width=16) (actual time=402.917..526.045 rows=333333 loops=3)
Group Key: a, b
> Parallel Seq Scan on abc (cost=0.00..9572.67
rows=416667 width=12) (actual time=0.036..50.275 rows=333333 loops=3)
Planning Time: 0.140 ms
Execution Time: 1489.734 ms
(11 rows)
but really, likely the parallel plan should not be chosen in this case
since we're not really reducing the number of groups going into the
finalize aggregate node. That'll need to be factored into the costing
so that we don't choose the parallel plan when we're not going to
reduce the work in the finalize aggregate node. I'm unsure exactly how
that'll look. Logically, I think the choice parallelize or not to
parallelize needs to be if (cost_partial_agg + cost_gather +
cost_final_agg < cost_agg) { do it in parallel } else { do it in
serial }. If you build both a serial and parallel set of paths then
you should see which one is cheaper without actually constructing an
"if" test like the one above.
Both the serial and parallel set of paths would be built and the cheaper one will be selected. So we don't need the 'if' test.
With v2 patch, the parallel plan will not be chosen for the above query:
# explain analyze select a,b,sum(c) from abc group by grouping sets ((a),(b)); QUERY PLAN  HashAggregate (cost=20406.00..25426.00 rows=2000 width=16) (actual time=935.048..935.697 rows=2000 loops=1) Hash Key: a Hash Key: b > Seq Scan on abc (cost=0.00..15406.00 rows=1000000 width=12) (actual time=0.041..170.906 rows=1000000 loops=1) Planning Time: 0.240 ms Execution Time: 935.978 ms (6 rows)
Here's a simple group by with the same group by clause items as you
have in the plan above that does get the estimated number of groups
perfectly. The plan above should have the same estimate.
explain analyze select a,b,sum(c) from abc group by a,b;
QUERY PLAN

GroupAggregate (cost=132154.34..152154.34 rows=1000000 width=16)
(actual time=404.304..1383.343 rows=1000000 loops=1)
Group Key: a, b
> Sort (cost=132154.34..134654.34 rows=1000000 width=12) (actual
time=404.291..620.774 rows=1000000 loops=1)
Sort Key: a, b
Sort Method: external merge Disk: 21584kB
> Seq Scan on abc (cost=0.00..15406.00 rows=1000000
width=12) (actual time=0.017..100.299 rows=1000000 loops=1)
Planning Time: 0.115 ms
Execution Time: 1412.034 ms
(8 rows)
Also, in the tests:
> insert into gstest select 1,10,100 from generate_series(1,1000000)i;
> insert into gstest select 1,10,200 from generate_series(1,1000000)i;
> insert into gstest select 1,20,30 from generate_series(1,1000000)i;
> insert into gstest select 2,30,40 from generate_series(1,1000000)i;
> insert into gstest select 2,40,50 from generate_series(1,1000000)i;
> insert into gstest select 3,50,60 from generate_series(1,1000000)i;
> insert into gstest select 1,NULL,000000 from generate_series(1,1000000)i;
> analyze gstest;
You'll likely want to reduce the number of rows being used just to
stop the regression tests becoming slow on older machines. I think
some of the other parallel aggregate tests use must fewer rows than
what you're using there. You might be able to use the standard set of
regression test tables too, tenk, tenk1 etc. That'll save the test
having to build and populate one of its own.
Yes, that makes sense. Table size has been reduced in v2 patch. Currently I do not use the standard regression test tables as I'd like to customize the table with some specific data for correctness verification. But we may switch to the standard test table later.
Also in v2 patch, I'v fixed two addition issues. One is about the sort key for sortbased grouping sets in Partial Aggregate, which should be all the columns in parse>groupClause. The other one is about GroupingFunc. Since Partial Aggregate will not handle multiple grouping sets at once, it does not need to evaluate GroupingFunc. So GroupingFunc is removed from the targetlists of Partial Aggregate.
On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote:
>Hi all,
>
>Paul and I have been hacking recently to implement parallel grouping
>sets, and here we have two implementations.
>
>Implementation 1
>================
>
>Attached is the patch and also there is a github branch [1] for this
>work.
>
>Parallel aggregation has already been supported in PostgreSQL and it is
>implemented by aggregating in two stages. First, each worker performs an
>aggregation step, producing a partial result for each group of which
>that process is aware. Second, the partial results are transferred to
>the leader via the Gather node. Finally, the leader merges the partial
>results and produces the final result for each group.
>
>We are implementing parallel grouping sets in the same way. The only
>difference is that in the final stage, the leader performs a grouping
>sets aggregation, rather than a normal aggregation.
>
>The plan looks like:
>
># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
>grouping sets((c1,c2), (c1), (c2,c3));
> QUERY PLAN
>
> Finalize MixedAggregate
> Output: c1, c2, avg(c3), c3
> Hash Key: t2.c2, t2.c3
> Group Key: t2.c1, t2.c2
> Group Key: t2.c1
> > Gather Merge
> Output: c1, c2, c3, (PARTIAL avg(c3))
> Workers Planned: 2
> > Sort
> Output: c1, c2, c3, (PARTIAL avg(c3))
> Sort Key: t2.c1, t2.c2
> > Partial HashAggregate
> Output: c1, c2, c3, PARTIAL avg(c3)
> Group Key: t2.c1, t2.c2, t2.c3
> > Parallel Seq Scan on public.t2
> Output: c1, c2, c3
>(16 rows)
>
>As the partial aggregation can be performed in parallel, we can expect a
>speedup if the number of groups seen by the Finalize Aggregate node is
>some less than the number of input rows.
>
>For example, for the table provided in the test case within the patch,
>running the above query in my Linux box:
>
># explain analyze select c1, c2, avg(c3) from t2 group by grouping
>sets((c1,c2), (c1), (c2,c3));  without patch
> Planning Time: 0.123 ms
> Execution Time: 9459.362 ms
>
># explain analyze select c1, c2, avg(c3) from t2 group by grouping
>sets((c1,c2), (c1), (c2,c3));  with patch
> Planning Time: 0.204 ms
> Execution Time: 1077.654 ms
>
Very nice. That's pretty much exactly how I imagined it'd work.
>But sometimes we may not benefit from this patch. For example, in the
>worstcase scenario the number of groups seen by the Finalize Aggregate
>node could be as many as the number of input rows which were seen by all
>worker processes in the Partial Aggregate stage. This is prone to
>happening with this patch, because the group key for Partial Aggregate
>is all the columns involved in the grouping sets, such as in the above
>query, it is (c1, c2, c3).
>
>So, we have been working on another way to implement parallel grouping
>sets.
>
>Implementation 2
>================
>
>This work can be found in github branch [2]. As it contains some hacky
>codes and a list of TODO items, this is far from a patch. So please
>consider it as a PoC.
>
>The idea is instead of performing grouping sets aggregation in Finalize
>Aggregate, we perform it in Partial Aggregate.
>
>The plan looks like:
>
># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
>grouping sets((c1,c2), (c1));
> QUERY PLAN
>
> Finalize GroupAggregate
> Output: c1, c2, avg(c3), (gset_id)
> Group Key: t2.c1, t2.c2, (gset_id)
> > Gather Merge
> Output: c1, c2, (gset_id), (PARTIAL avg(c3))
> Workers Planned: 2
> > Sort
> Output: c1, c2, (gset_id), (PARTIAL avg(c3))
> Sort Key: t2.c1, t2.c2, (gset_id)
> > Partial HashAggregate
> Output: c1, c2, gset_id, PARTIAL avg(c3)
> Hash Key: t2.c1, t2.c2
> Hash Key: t2.c1
> > Parallel Seq Scan on public.t2
> Output: c1, c2, c3
>(15 rows)
>
OK, I'm not sure I understand the point of this  can you give an
example which is supposed to benefit from this? Where does the speedup
came from?
regards

On Fri, 14 Jun 2019 at 11:45, Tomas Vondra < [hidden email]> wrote:
>
> On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote:
> ># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
> >grouping sets((c1,c2), (c1));
> > QUERY PLAN
> >
> > Finalize GroupAggregate
> > Output: c1, c2, avg(c3), (gset_id)
> > Group Key: t2.c1, t2.c2, (gset_id)
> > > Gather Merge
> > Output: c1, c2, (gset_id), (PARTIAL avg(c3))
> > Workers Planned: 2
> > > Sort
> > Output: c1, c2, (gset_id), (PARTIAL avg(c3))
> > Sort Key: t2.c1, t2.c2, (gset_id)
> > > Partial HashAggregate
> > Output: c1, c2, gset_id, PARTIAL avg(c3)
> > Hash Key: t2.c1, t2.c2
> > Hash Key: t2.c1
> > > Parallel Seq Scan on public.t2
> > Output: c1, c2, c3
> >(15 rows)
> >
>
> OK, I'm not sure I understand the point of this  can you give an
> example which is supposed to benefit from this? Where does the speedup
> came from?
I think this is a bad example since the first grouping set is a
superset of the 2nd. If those were independent and each grouping set
produced a reasonable number of groups then it may be better to do it
this way instead of grouping by all exprs in all grouping sets in the
first phase, as is done by #1. To do #2 would require that we tag
the aggregate state with the grouping set that belong to, which seem
to be what gset_id is in Richard's output.
In my example upthread the first phase of aggregation produced a group
per input row. Method #2 would work better for that case since it
would only produce 2000 groups instead of 1 million.
Likely both methods would be good to consider, but since #1 seems much
easier than #2, then to me it seems to make sense to start there.

On Fri, Jun 14, 2019 at 12:02:52PM +1200, David Rowley wrote:
>On Fri, 14 Jun 2019 at 11:45, Tomas Vondra < [hidden email]> wrote:
>>
>> On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote:
>
>> ># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
>> >grouping sets((c1,c2), (c1));
>> > QUERY PLAN
>> >
>> > Finalize GroupAggregate
>> > Output: c1, c2, avg(c3), (gset_id)
>> > Group Key: t2.c1, t2.c2, (gset_id)
>> > > Gather Merge
>> > Output: c1, c2, (gset_id), (PARTIAL avg(c3))
>> > Workers Planned: 2
>> > > Sort
>> > Output: c1, c2, (gset_id), (PARTIAL avg(c3))
>> > Sort Key: t2.c1, t2.c2, (gset_id)
>> > > Partial HashAggregate
>> > Output: c1, c2, gset_id, PARTIAL avg(c3)
>> > Hash Key: t2.c1, t2.c2
>> > Hash Key: t2.c1
>> > > Parallel Seq Scan on public.t2
>> > Output: c1, c2, c3
>> >(15 rows)
>> >
>>
>> OK, I'm not sure I understand the point of this  can you give an
>> example which is supposed to benefit from this? Where does the speedup
>> came from?
>
>I think this is a bad example since the first grouping set is a
>superset of the 2nd. If those were independent and each grouping set
>produced a reasonable number of groups then it may be better to do it
>this way instead of grouping by all exprs in all grouping sets in the
>first phase, as is done by #1. To do #2 would require that we tag
>the aggregate state with the grouping set that belong to, which seem
>to be what gset_id is in Richard's output.
>
Aha! So if we have grouping sets (a,b) and (c,d), then with the first
approach we'd do partial aggregate on (a,b,c,d)  which may produce
quite a few distinct groups, making it inefficient. But with the second
approach, we'd do just (a,b) and (c,d) and mark the rows with gset_id.
Neat!
>In my example upthread the first phase of aggregation produced a group
>per input row. Method #2 would work better for that case since it
>would only produce 2000 groups instead of 1 million.
>
>Likely both methods would be good to consider, but since #1 seems much
>easier than #2, then to me it seems to make sense to start there.
>
Yep. Thanks for the explanation.
regards

