Parallel grouping sets

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
28 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Parallel grouping sets

Richard Guo-2
Hi all,

Paul and I have been hacking recently to implement parallel grouping
sets, and here we have two implementations.

Implementation 1
================

Attached is the patch and also there is a github branch [1] for this
work.

Parallel aggregation has already been supported in PostgreSQL and it is
implemented by aggregating in two stages. First, each worker performs an
aggregation step, producing a partial result for each group of which
that process is aware. Second, the partial results are transferred to
the leader via the Gather node. Finally, the leader merges the partial
results and produces the final result for each group.

We are implementing parallel grouping sets in the same way. The only
difference is that in the final stage, the leader performs a grouping
sets aggregation, rather than a normal aggregation.

The plan looks like:

# explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3));
                       QUERY PLAN
---------------------------------------------------------
 Finalize MixedAggregate
   Output: c1, c2, avg(c3), c3
   Hash Key: t2.c2, t2.c3
   Group Key: t2.c1, t2.c2
   Group Key: t2.c1
   ->  Gather Merge
         Output: c1, c2, c3, (PARTIAL avg(c3))
         Workers Planned: 2
         ->  Sort
               Output: c1, c2, c3, (PARTIAL avg(c3))
               Sort Key: t2.c1, t2.c2
               ->  Partial HashAggregate
                     Output: c1, c2, c3, PARTIAL avg(c3)
                     Group Key: t2.c1, t2.c2, t2.c3
                     ->  Parallel Seq Scan on public.t2
                           Output: c1, c2, c3
(16 rows)


As the partial aggregation can be performed in parallel, we can expect a
speedup if the number of groups seen by the Finalize Aggregate node is
some less than the number of input rows.

For example, for the table provided in the test case within the patch,
running the above query in my Linux box:

# explain analyze select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3)); -- without patch
 Planning Time: 0.123 ms
 Execution Time: 9459.362 ms


# explain analyze select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1), (c2,c3)); -- with patch
 Planning Time: 0.204 ms
 Execution Time: 1077.654 ms


But sometimes we may not benefit from this patch. For example, in the
worst-case scenario the number of groups seen by the Finalize Aggregate
node could be as many as the number of input rows which were seen by all
worker processes in the Partial Aggregate stage. This is prone to
happening with this patch, because the group key for Partial Aggregate
is all the columns involved in the grouping sets, such as in the above
query, it is (c1, c2, c3).

So, we have been working on another way to implement parallel grouping
sets.

Implementation 2
================

This work can be found in github branch [2]. As it contains some hacky
codes and a list of TODO items, this is far from a patch. So please
consider it as a PoC.

The idea is instead of performing grouping sets aggregation in Finalize
Aggregate, we perform it in Partial Aggregate.

The plan looks like:

# explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by grouping sets((c1,c2), (c1));
                          QUERY PLAN
--------------------------------------------------------------
 Finalize GroupAggregate
   Output: c1, c2, avg(c3), (gset_id)
   Group Key: t2.c1, t2.c2, (gset_id)
   ->  Gather Merge
         Output: c1, c2, (gset_id), (PARTIAL avg(c3))
         Workers Planned: 2
         ->  Sort
               Output: c1, c2, (gset_id), (PARTIAL avg(c3))
               Sort Key: t2.c1, t2.c2, (gset_id)
               ->  Partial HashAggregate
                     Output: c1, c2, gset_id, PARTIAL avg(c3)
                     Hash Key: t2.c1, t2.c2
                     Hash Key: t2.c1
                     ->  Parallel Seq Scan on public.t2
                           Output: c1, c2, c3
(15 rows)


With this method, there is a problem, i.e., in the final stage of
aggregation, the leader does not have a way to distinguish which tuple
comes from which grouping set, which turns out to be needed by leader
for merging the partial results.

For instance, suppose we have a table t(c1, c2, c3) containing one row
(1, NULL, 3), and we are selecting agg(c3) group by grouping sets
((c1,c2), (c1)). Then the leader would get two tuples via Gather node
for that row, both are (1, NULL, agg(3)), one is from group by (c1,c2)
and one is from group by (c1). If the leader cannot tell that the
two tuples are from two different grouping sets, it will merge them
incorrectly.

So we add a hidden column 'gset_id', representing grouping set id, to
the targetlist of Partial Aggregate node, as well as to the group key
for Finalize Aggregate node. So only tuples coming from the same
grouping set can get merged in the final stage of aggregation.

With this method, for grouping sets with multiple rollups, to simplify
the implementation, we generate a separate aggregation path for each
rollup, and then append them for the final path.

References:
[1] https://github.com/greenplum-db/postgres/tree/parallel_groupingsets
[2] https://github.com/greenplum-db/postgres/tree/parallel_groupingsets_2

Any comments and feedback are welcome.

Thanks
Richard

v1-0001-Implementing-parallel-grouping-sets.patch (23K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

David Rowley-3
On Wed, 12 Jun 2019 at 14:59, Richard Guo <[hidden email]> wrote:
> Implementation 1

> Parallel aggregation has already been supported in PostgreSQL and it is
> implemented by aggregating in two stages. First, each worker performs an
> aggregation step, producing a partial result for each group of which
> that process is aware. Second, the partial results are transferred to
> the leader via the Gather node. Finally, the leader merges the partial
> results and produces the final result for each group.
>
> We are implementing parallel grouping sets in the same way. The only
> difference is that in the final stage, the leader performs a grouping
> sets aggregation, rather than a normal aggregation.

Hi Richard,

I think it was you an I that discussed #1 at unconference at PGCon 2
weeks ago. The good thing about #1 is that it can be implemented as
planner-only changes just by adding some additional paths and some
costing. #2 will be useful when we're unable to reduce the number of
inputs to the final aggregate node by doing the initial grouping.
However, since #1 is easier, then I'd suggest going with it first,
since it's the path of least resistance. #1 should be fine as long as
you properly cost the parallel agg and don't choose it when the number
of groups going into the final agg isn't reduced by the partial agg
node.  Which brings me to:

You'll need to do further work with the dNumGroups value. Since you're
grouping by all the columns/exprs in the grouping sets you'll need the
number of groups to be an estimate of that.

Here's a quick test I did that shows the problem:

create table abc(a int, b int, c int);
insert into abc select a,b,1 from generate_Series(1,1000)
a,generate_Series(1,1000) b;
create statistics abc_a_b_stats (ndistinct) on a,b from abc;
analyze abc;

-- Here the Partial HashAggregate really should estimate that there
will be 1 million rows.
explain analyze select a,b,sum(c) from abc group by grouping sets ((a),(b));
                                                              QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------
 Finalize HashAggregate  (cost=14137.67..14177.67 rows=2000 width=16)
(actual time=1482.746..1483.203 rows=2000 loops=1)
   Hash Key: a
   Hash Key: b
   ->  Gather  (cost=13697.67..14117.67 rows=4000 width=16) (actual
time=442.140..765.931 rows=1000000 loops=1)
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial HashAggregate  (cost=12697.67..12717.67 rows=2000
width=16) (actual time=402.917..526.045 rows=333333 loops=3)
               Group Key: a, b
               ->  Parallel Seq Scan on abc  (cost=0.00..9572.67
rows=416667 width=12) (actual time=0.036..50.275 rows=333333 loops=3)
 Planning Time: 0.140 ms
 Execution Time: 1489.734 ms
(11 rows)

but really, likely the parallel plan should not be chosen in this case
since we're not really reducing the number of groups going into the
finalize aggregate node.  That'll need to be factored into the costing
so that we don't choose the parallel plan when we're not going to
reduce the work in the finalize aggregate node. I'm unsure exactly how
that'll look. Logically, I think the choice parallelize or not to
parallelize needs to be if (cost_partial_agg + cost_gather +
cost_final_agg < cost_agg) { do it in parallel } else { do it in
serial }.  If you build both a serial and parallel set of paths then
you should see which one is cheaper without actually constructing an
"if" test like the one above.

Here's a simple group by with the same group by clause items as you
have in the plan above that does get the estimated number of groups
perfectly.  The plan above should have the same estimate.

explain analyze select a,b,sum(c) from abc group by a,b;
                                                         QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=132154.34..152154.34 rows=1000000 width=16)
(actual time=404.304..1383.343 rows=1000000 loops=1)
   Group Key: a, b
   ->  Sort  (cost=132154.34..134654.34 rows=1000000 width=12) (actual
time=404.291..620.774 rows=1000000 loops=1)
         Sort Key: a, b
         Sort Method: external merge  Disk: 21584kB
         ->  Seq Scan on abc  (cost=0.00..15406.00 rows=1000000
width=12) (actual time=0.017..100.299 rows=1000000 loops=1)
 Planning Time: 0.115 ms
 Execution Time: 1412.034 ms
(8 rows)

Also, in the tests:

> insert into gstest select 1,10,100 from generate_series(1,1000000)i;
> insert into gstest select 1,10,200 from generate_series(1,1000000)i;
> insert into gstest select 1,20,30 from generate_series(1,1000000)i;
> insert into gstest select 2,30,40 from generate_series(1,1000000)i;
> insert into gstest select 2,40,50 from generate_series(1,1000000)i;
> insert into gstest select 3,50,60 from generate_series(1,1000000)i;
> insert into gstest select 1,NULL,000000 from generate_series(1,1000000)i;
> analyze gstest;

You'll likely want to reduce the number of rows being used just to
stop the regression tests becoming slow on older machines. I think
some of the other parallel aggregate tests use must fewer rows than
what you're using there. You might be able to use the standard set of
regression test tables too, tenk, tenk1 etc. That'll save the test
having to build and populate one of its own.

--
 David Rowley                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services


Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Richard Guo-2
On Thu, Jun 13, 2019 at 12:29 PM David Rowley <[hidden email]> wrote:
On Wed, 12 Jun 2019 at 14:59, Richard Guo <[hidden email]> wrote:
> Implementation 1

> Parallel aggregation has already been supported in PostgreSQL and it is
> implemented by aggregating in two stages. First, each worker performs an
> aggregation step, producing a partial result for each group of which
> that process is aware. Second, the partial results are transferred to
> the leader via the Gather node. Finally, the leader merges the partial
> results and produces the final result for each group.
>
> We are implementing parallel grouping sets in the same way. The only
> difference is that in the final stage, the leader performs a grouping
> sets aggregation, rather than a normal aggregation.

Hi Richard,

I think it was you an I that discussed #1 at unconference at PGCon 2
weeks ago. The good thing about #1 is that it can be implemented as
planner-only changes just by adding some additional paths and some
costing. #2 will be useful when we're unable to reduce the number of
inputs to the final aggregate node by doing the initial grouping.
However, since #1 is easier, then I'd suggest going with it first,
since it's the path of least resistance. #1 should be fine as long as
you properly cost the parallel agg and don't choose it when the number
of groups going into the final agg isn't reduced by the partial agg
node.  Which brings me to:

Hi David,

Yes. Thank you for the discussion at PGCon. I learned a lot from that.
And glad to meet you here. :)

I agree with you on going with #1 first.
 

You'll need to do further work with the dNumGroups value. Since you're
grouping by all the columns/exprs in the grouping sets you'll need the
number of groups to be an estimate of that.

Exactly. The v1 patch estimates number of partial groups incorrectly, as
it calculates the number of groups for each grouping set and then add
them for dNumPartialPartialGroups, while we actually should calculate
the number of groups for all the columns in the grouping sets. I have
fixed this issue in v2 patch.
 

Here's a quick test I did that shows the problem:

create table abc(a int, b int, c int);
insert into abc select a,b,1 from generate_Series(1,1000)
a,generate_Series(1,1000) b;
create statistics abc_a_b_stats (ndistinct) on a,b from abc;
analyze abc;

-- Here the Partial HashAggregate really should estimate that there
will be 1 million rows.
explain analyze select a,b,sum(c) from abc group by grouping sets ((a),(b));
                                                              QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------
 Finalize HashAggregate  (cost=14137.67..14177.67 rows=2000 width=16)
(actual time=1482.746..1483.203 rows=2000 loops=1)
   Hash Key: a
   Hash Key: b
   ->  Gather  (cost=13697.67..14117.67 rows=4000 width=16) (actual
time=442.140..765.931 rows=1000000 loops=1)
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial HashAggregate  (cost=12697.67..12717.67 rows=2000
width=16) (actual time=402.917..526.045 rows=333333 loops=3)
               Group Key: a, b
               ->  Parallel Seq Scan on abc  (cost=0.00..9572.67
rows=416667 width=12) (actual time=0.036..50.275 rows=333333 loops=3)
 Planning Time: 0.140 ms
 Execution Time: 1489.734 ms
(11 rows)

but really, likely the parallel plan should not be chosen in this case
since we're not really reducing the number of groups going into the
finalize aggregate node.  That'll need to be factored into the costing
so that we don't choose the parallel plan when we're not going to
reduce the work in the finalize aggregate node. I'm unsure exactly how
that'll look. Logically, I think the choice parallelize or not to
parallelize needs to be if (cost_partial_agg + cost_gather +
cost_final_agg < cost_agg) { do it in parallel } else { do it in
serial }.  If you build both a serial and parallel set of paths then
you should see which one is cheaper without actually constructing an
"if" test like the one above.

Both the serial and parallel set of paths would be built and the cheaper
one will be selected. So we don't need the 'if' test.

With v2 patch, the parallel plan will not be chosen for the above query:

# explain analyze select a,b,sum(c) from abc group by grouping sets ((a),(b));
                                                      QUERY PLAN
----------------------------------------------------------------------------------------------------------------------
 HashAggregate  (cost=20406.00..25426.00 rows=2000 width=16) (actual time=935.048..935.697 rows=2000 loops=1)
   Hash Key: a
   Hash Key: b
   ->  Seq Scan on abc  (cost=0.00..15406.00 rows=1000000 width=12) (actual time=0.041..170.906 rows=1000000 loops=1)
 Planning Time: 0.240 ms
 Execution Time: 935.978 ms
(6 rows)

 

Here's a simple group by with the same group by clause items as you
have in the plan above that does get the estimated number of groups
perfectly.  The plan above should have the same estimate.

explain analyze select a,b,sum(c) from abc group by a,b;
                                                         QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------
 GroupAggregate  (cost=132154.34..152154.34 rows=1000000 width=16)
(actual time=404.304..1383.343 rows=1000000 loops=1)
   Group Key: a, b
   ->  Sort  (cost=132154.34..134654.34 rows=1000000 width=12) (actual
time=404.291..620.774 rows=1000000 loops=1)
         Sort Key: a, b
         Sort Method: external merge  Disk: 21584kB
         ->  Seq Scan on abc  (cost=0.00..15406.00 rows=1000000
width=12) (actual time=0.017..100.299 rows=1000000 loops=1)
 Planning Time: 0.115 ms
 Execution Time: 1412.034 ms
(8 rows)

Also, in the tests:

> insert into gstest select 1,10,100 from generate_series(1,1000000)i;
> insert into gstest select 1,10,200 from generate_series(1,1000000)i;
> insert into gstest select 1,20,30 from generate_series(1,1000000)i;
> insert into gstest select 2,30,40 from generate_series(1,1000000)i;
> insert into gstest select 2,40,50 from generate_series(1,1000000)i;
> insert into gstest select 3,50,60 from generate_series(1,1000000)i;
> insert into gstest select 1,NULL,000000 from generate_series(1,1000000)i;
> analyze gstest;

You'll likely want to reduce the number of rows being used just to
stop the regression tests becoming slow on older machines. I think
some of the other parallel aggregate tests use must fewer rows than
what you're using there. You might be able to use the standard set of
regression test tables too, tenk, tenk1 etc. That'll save the test
having to build and populate one of its own.

Yes, that makes sense. Table size has been reduced in v2 patch.
Currently I do not use the standard regression test tables as I'd like
to customize the table with some specific data for correctness
verification. But we may switch to the standard test table later.

Also in v2 patch, I'v fixed two addition issues. One is about the sort
key for sort-based grouping sets in Partial Aggregate, which should be
all the columns in parse->groupClause. The other one is about
GroupingFunc. Since Partial Aggregate will not handle multiple grouping
sets at once, it does not need to evaluate GroupingFunc. So GroupingFunc
is removed from the targetlists of Partial Aggregate.

Thanks
Richard 

v2-0001-Implementing-parallel-grouping-sets.patch (29K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Tomas Vondra-4
In reply to this post by Richard Guo-2
On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote:

>Hi all,
>
>Paul and I have been hacking recently to implement parallel grouping
>sets, and here we have two implementations.
>
>Implementation 1
>================
>
>Attached is the patch and also there is a github branch [1] for this
>work.
>
>Parallel aggregation has already been supported in PostgreSQL and it is
>implemented by aggregating in two stages. First, each worker performs an
>aggregation step, producing a partial result for each group of which
>that process is aware. Second, the partial results are transferred to
>the leader via the Gather node. Finally, the leader merges the partial
>results and produces the final result for each group.
>
>We are implementing parallel grouping sets in the same way. The only
>difference is that in the final stage, the leader performs a grouping
>sets aggregation, rather than a normal aggregation.
>
>The plan looks like:
>
># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
>grouping sets((c1,c2), (c1), (c2,c3));
>                       QUERY PLAN
>---------------------------------------------------------
> Finalize MixedAggregate
>   Output: c1, c2, avg(c3), c3
>   Hash Key: t2.c2, t2.c3
>   Group Key: t2.c1, t2.c2
>   Group Key: t2.c1
>   ->  Gather Merge
>         Output: c1, c2, c3, (PARTIAL avg(c3))
>         Workers Planned: 2
>         ->  Sort
>               Output: c1, c2, c3, (PARTIAL avg(c3))
>               Sort Key: t2.c1, t2.c2
>               ->  Partial HashAggregate
>                     Output: c1, c2, c3, PARTIAL avg(c3)
>                     Group Key: t2.c1, t2.c2, t2.c3
>                     ->  Parallel Seq Scan on public.t2
>                           Output: c1, c2, c3
>(16 rows)
>
>As the partial aggregation can be performed in parallel, we can expect a
>speedup if the number of groups seen by the Finalize Aggregate node is
>some less than the number of input rows.
>
>For example, for the table provided in the test case within the patch,
>running the above query in my Linux box:
>
># explain analyze select c1, c2, avg(c3) from t2 group by grouping
>sets((c1,c2), (c1), (c2,c3)); -- without patch
> Planning Time: 0.123 ms
> Execution Time: 9459.362 ms
>
># explain analyze select c1, c2, avg(c3) from t2 group by grouping
>sets((c1,c2), (c1), (c2,c3)); -- with patch
> Planning Time: 0.204 ms
> Execution Time: 1077.654 ms
>

Very nice. That's pretty much exactly how I imagined it'd work.

>But sometimes we may not benefit from this patch. For example, in the
>worst-case scenario the number of groups seen by the Finalize Aggregate
>node could be as many as the number of input rows which were seen by all
>worker processes in the Partial Aggregate stage. This is prone to
>happening with this patch, because the group key for Partial Aggregate
>is all the columns involved in the grouping sets, such as in the above
>query, it is (c1, c2, c3).
>
>So, we have been working on another way to implement parallel grouping
>sets.
>
>Implementation 2
>================
>
>This work can be found in github branch [2]. As it contains some hacky
>codes and a list of TODO items, this is far from a patch. So please
>consider it as a PoC.
>
>The idea is instead of performing grouping sets aggregation in Finalize
>Aggregate, we perform it in Partial Aggregate.
>
>The plan looks like:
>
># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
>grouping sets((c1,c2), (c1));
>                          QUERY PLAN
>--------------------------------------------------------------
> Finalize GroupAggregate
>   Output: c1, c2, avg(c3), (gset_id)
>   Group Key: t2.c1, t2.c2, (gset_id)
>   ->  Gather Merge
>         Output: c1, c2, (gset_id), (PARTIAL avg(c3))
>         Workers Planned: 2
>         ->  Sort
>               Output: c1, c2, (gset_id), (PARTIAL avg(c3))
>               Sort Key: t2.c1, t2.c2, (gset_id)
>               ->  Partial HashAggregate
>                     Output: c1, c2, gset_id, PARTIAL avg(c3)
>                     Hash Key: t2.c1, t2.c2
>                     Hash Key: t2.c1
>                     ->  Parallel Seq Scan on public.t2
>                           Output: c1, c2, c3
>(15 rows)
>

OK, I'm not sure I understand the point of this - can you give an
example which is supposed to benefit from this? Where does the speedup
came from?

regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services


Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

David Rowley-3
On Fri, 14 Jun 2019 at 11:45, Tomas Vondra <[hidden email]> wrote:
>
> On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote:

> ># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
> >grouping sets((c1,c2), (c1));
> >                          QUERY PLAN
> >--------------------------------------------------------------
> > Finalize GroupAggregate
> >   Output: c1, c2, avg(c3), (gset_id)
> >   Group Key: t2.c1, t2.c2, (gset_id)
> >   ->  Gather Merge
> >         Output: c1, c2, (gset_id), (PARTIAL avg(c3))
> >         Workers Planned: 2
> >         ->  Sort
> >               Output: c1, c2, (gset_id), (PARTIAL avg(c3))
> >               Sort Key: t2.c1, t2.c2, (gset_id)
> >               ->  Partial HashAggregate
> >                     Output: c1, c2, gset_id, PARTIAL avg(c3)
> >                     Hash Key: t2.c1, t2.c2
> >                     Hash Key: t2.c1
> >                     ->  Parallel Seq Scan on public.t2
> >                           Output: c1, c2, c3
> >(15 rows)
> >
>
> OK, I'm not sure I understand the point of this - can you give an
> example which is supposed to benefit from this? Where does the speedup
> came from?

I think this is a bad example since the first grouping set is a
superset of the 2nd. If those were independent and each grouping set
produced a reasonable number of groups then it may be better to do it
this way instead of grouping by all exprs in all grouping sets in the
first phase, as is done by #1.   To do #2 would require that we tag
the aggregate state with the grouping set that belong to, which seem
to be what gset_id is in Richard's output.

In my example upthread the first phase of aggregation produced a group
per input row. Method #2 would work better for that case since it
would only produce 2000 groups instead of 1 million.

Likely both methods would be good to consider, but since #1 seems much
easier than #2, then to me it seems to make sense to start there.

--
 David Rowley                   http://www.2ndQuadrant.com/
 PostgreSQL Development, 24x7 Support, Training & Services


Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Tomas Vondra-4
On Fri, Jun 14, 2019 at 12:02:52PM +1200, David Rowley wrote:

>On Fri, 14 Jun 2019 at 11:45, Tomas Vondra <[hidden email]> wrote:
>>
>> On Wed, Jun 12, 2019 at 10:58:44AM +0800, Richard Guo wrote:
>
>> ># explain (costs off, verbose) select c1, c2, avg(c3) from t2 group by
>> >grouping sets((c1,c2), (c1));
>> >                          QUERY PLAN
>> >--------------------------------------------------------------
>> > Finalize GroupAggregate
>> >   Output: c1, c2, avg(c3), (gset_id)
>> >   Group Key: t2.c1, t2.c2, (gset_id)
>> >   ->  Gather Merge
>> >         Output: c1, c2, (gset_id), (PARTIAL avg(c3))
>> >         Workers Planned: 2
>> >         ->  Sort
>> >               Output: c1, c2, (gset_id), (PARTIAL avg(c3))
>> >               Sort Key: t2.c1, t2.c2, (gset_id)
>> >               ->  Partial HashAggregate
>> >                     Output: c1, c2, gset_id, PARTIAL avg(c3)
>> >                     Hash Key: t2.c1, t2.c2
>> >                     Hash Key: t2.c1
>> >                     ->  Parallel Seq Scan on public.t2
>> >                           Output: c1, c2, c3
>> >(15 rows)
>> >
>>
>> OK, I'm not sure I understand the point of this - can you give an
>> example which is supposed to benefit from this? Where does the speedup
>> came from?
>
>I think this is a bad example since the first grouping set is a
>superset of the 2nd. If those were independent and each grouping set
>produced a reasonable number of groups then it may be better to do it
>this way instead of grouping by all exprs in all grouping sets in the
>first phase, as is done by #1.   To do #2 would require that we tag
>the aggregate state with the grouping set that belong to, which seem
>to be what gset_id is in Richard's output.
>

Aha! So if we have grouping sets (a,b) and (c,d), then with the first
approach we'd do partial aggregate on (a,b,c,d) - which may produce
quite a few distinct groups, making it inefficient. But with the second
approach, we'd do just (a,b) and (c,d) and mark the rows with gset_id.

Neat!

>In my example upthread the first phase of aggregation produced a group
>per input row. Method #2 would work better for that case since it
>would only produce 2000 groups instead of 1 million.
>
>Likely both methods would be good to consider, but since #1 seems much
>easier than #2, then to me it seems to make sense to start there.
>

Yep. Thanks for the explanation.


regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services


Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Richard Guo-2
In reply to this post by Richard Guo-2
On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <[hidden email]> wrote:
Hi all,

Paul and I have been hacking recently to implement parallel grouping
sets, and here we have two implementations.

Implementation 1
================

Attached is the patch and also there is a github branch [1] for this
work.

Rebased with the latest master.

Thanks
Richard 

v3-0001-Implementing-parallel-grouping-sets.patch (31K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Tomas Vondra-4
On Tue, Jul 30, 2019 at 03:50:32PM +0800, Richard Guo wrote:

>On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <[hidden email]> wrote:
>
>> Hi all,
>>
>> Paul and I have been hacking recently to implement parallel grouping
>> sets, and here we have two implementations.
>>
>> Implementation 1
>> ================
>>
>> Attached is the patch and also there is a github branch [1] for this
>> work.
>>
>
>Rebased with the latest master.
>

Hi Richard,

thanks for the rebased patch. I think the patch is mostly fine (at least I
don't see any serious issues). A couple minor comments:

1) I think get_number_of_groups() would deserve a short explanation why
it's OK to handle (non-partial) grouping sets and regular GROUP BY in the
same branch. Before these cases were clearly separated, now it seems a bit
mixed up and it may not be immediately obvious why it's OK.

2) There are new regression tests, but they are not added to any schedule
(parallel or serial), and so are not executed as part of "make check". I
suppose this is a mistake.

3) The regression tests do check plan and results like this:

    EXPLAIN (COSTS OFF, VERBOSE) SELECT ...;
    SELECT ... ORDER BY 1, 2, 3;

which however means that the query might easily use a different plan than
what's verified in the eplain (thanks to the additional ORDER BY clause).
So I think this should explain and execute the same query.

(In this case the plans seems to be the same, but that may easily change
in the future, and we could miss it here, failing to verify the results.)

4) It might be a good idea to check the negative case too, i.e. a query on
data set that we should not parallelize (because the number of partial
groups would be too high).


Do you have any plans to hack on the second approach too? AFAICS those two
approaches are complementary (address different data sets / queries), and
it would be nice to have both. One of the things I've been wondering is if
we need to invent gset_id as a new concept, or if we could simply use the
existing GROUPING() function - that uniquely identifies the grouping set.


regards

--
Tomas Vondra                  http://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Richard Guo-2
On Tue, Jul 30, 2019 at 11:05 PM Tomas Vondra <[hidden email]> wrote:
On Tue, Jul 30, 2019 at 03:50:32PM +0800, Richard Guo wrote:
>On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <[hidden email]> wrote:
>
>> Hi all,
>>
>> Paul and I have been hacking recently to implement parallel grouping
>> sets, and here we have two implementations.
>>
>> Implementation 1
>> ================
>>
>> Attached is the patch and also there is a github branch [1] for this
>> work.
>>
>
>Rebased with the latest master.
>

Hi Richard,

thanks for the rebased patch. I think the patch is mostly fine (at least I
don't see any serious issues). A couple minor comments:

Hi Tomas,

Thank you for reviewing this patch.
 

1) I think get_number_of_groups() would deserve a short explanation why
it's OK to handle (non-partial) grouping sets and regular GROUP BY in the
same branch. Before these cases were clearly separated, now it seems a bit
mixed up and it may not be immediately obvious why it's OK.

Added a short comment in get_number_of_groups() explaining the behavior
when doing partial aggregation for grouping sets.
 

2) There are new regression tests, but they are not added to any schedule
(parallel or serial), and so are not executed as part of "make check". I
suppose this is a mistake.

Yes, thanks. Added the new regression test in parallel_schedule and
serial_schedule.
 

3) The regression tests do check plan and results like this:

    EXPLAIN (COSTS OFF, VERBOSE) SELECT ...;
    SELECT ... ORDER BY 1, 2, 3;

which however means that the query might easily use a different plan than
what's verified in the eplain (thanks to the additional ORDER BY clause).
So I think this should explain and execute the same query.

(In this case the plans seems to be the same, but that may easily change
in the future, and we could miss it here, failing to verify the results.)

Thank you for pointing this out. Fixed it in V4 patch.
 

4) It might be a good idea to check the negative case too, i.e. a query on
data set that we should not parallelize (because the number of partial
groups would be too high).

Yes, agree. Added a negative case.
 


Do you have any plans to hack on the second approach too? AFAICS those two
approaches are complementary (address different data sets / queries), and
it would be nice to have both. One of the things I've been wondering is if
we need to invent gset_id as a new concept, or if we could simply use the
existing GROUPING() function - that uniquely identifies the grouping set.


Yes, I'm planning to hack on the second approach in short future. I'm
also reconsidering the gset_id stuff since it brings a lot of complexity
for the second approach.  I agree with you that we can try GROUPING()
function to see if it can replace gset_id.

Thanks
Richard

v4-0001-Implementing-parallel-grouping-sets.patch (33K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Pengzhou Tang
Hi Richard & Tomas:

I followed the idea of the second approach to add a gset_id in the targetlist of the first stage of 
grouping sets and uses it to combine the aggregate in final stage. gset_id stuff is still kept
because of GROUPING() cannot uniquely identify a grouping set, grouping sets may contain
duplicated set, eg: group by grouping sets((c1, c2), (c1,c2)).

There are some differences to implement the second approach comparing to the original idea from
Richard, gset_id is not used as additional group key in the final stage, instead, we use it to
dispatch the input tuple to the specified grouping set directly and then do the aggregate.
One advantage of this is that we can handle multiple rollups with better performance without APPEND node.

the plan now looks like:

gpadmin=# explain select c1, c2 from gstest group by grouping sets(rollup(c1, c2), rollup(c3));
                                         QUERY PLAN
--------------------------------------------------------------------------------------------
 Finalize MixedAggregate  (cost=1000.00..73108.57 rows=8842 width=12)
   Dispatched by: (GROUPINGSETID())
   Hash Key: c1, c2
   Hash Key: c1
   Hash Key: c3
   Group Key: ()
     Group Key: ()
   ->  Gather  (cost=1000.00..71551.48 rows=17684 width=16)
         Workers Planned: 2
         ->  Partial MixedAggregate  (cost=0.00..68783.08 rows=8842 width=16)
               Hash Key: c1, c2
               Hash Key: c1
               Hash Key: c3
               Group Key: ()
               Group Key: ()
               ->  Parallel Seq Scan on gstest  (cost=0.00..47861.33 rows=2083333 width=12)
(16 rows)

gpadmin=# set enable_hashagg to off;
gpadmin=# explain select c1, c2 from gstest group by grouping sets(rollup(c1, c2), rollup(c3));
                                               QUERY PLAN
--------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=657730.66..663207.45 rows=8842 width=12)
   Dispatched by: (GROUPINGSETID())
   Group Key: c1, c2
   Sort Key: c1
     Group Key: c1
     Group Key: ()
     Group Key: ()
   Sort Key: c3
     Group Key: c3
   ->  Sort  (cost=657730.66..657774.87 rows=17684 width=16)
         Sort Key: c1, c2
         ->  Gather  (cost=338722.94..656483.04 rows=17684 width=16)
               Workers Planned: 2
               ->  Partial GroupAggregate  (cost=337722.94..653714.64 rows=8842 width=16)
                     Group Key: c1, c2
                     Group Key: c1
                     Group Key: ()
                     Group Key: ()
                     Sort Key: c3
                       Group Key: c3
                     ->  Sort  (cost=337722.94..342931.28 rows=2083333 width=12)
                           Sort Key: c1, c2
                           ->  Parallel Seq Scan on gstest  (cost=0.00..47861.33 rows=2083333 width=12)



On Wed, Jul 31, 2019 at 4:07 PM Richard Guo <[hidden email]> wrote:
On Tue, Jul 30, 2019 at 11:05 PM Tomas Vondra <[hidden email]> wrote:
On Tue, Jul 30, 2019 at 03:50:32PM +0800, Richard Guo wrote:
>On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <[hidden email]> wrote:
>
>> Hi all,
>>
>> Paul and I have been hacking recently to implement parallel grouping
>> sets, and here we have two implementations.
>>
>> Implementation 1
>> ================
>>
>> Attached is the patch and also there is a github branch [1] for this
>> work.
>>
>
>Rebased with the latest master.
>

Hi Richard,

thanks for the rebased patch. I think the patch is mostly fine (at least I
don't see any serious issues). A couple minor comments:

Hi Tomas,

Thank you for reviewing this patch.
 

1) I think get_number_of_groups() would deserve a short explanation why
it's OK to handle (non-partial) grouping sets and regular GROUP BY in the
same branch. Before these cases were clearly separated, now it seems a bit
mixed up and it may not be immediately obvious why it's OK.

Added a short comment in get_number_of_groups() explaining the behavior
when doing partial aggregation for grouping sets.
 

2) There are new regression tests, but they are not added to any schedule
(parallel or serial), and so are not executed as part of "make check". I
suppose this is a mistake.

Yes, thanks. Added the new regression test in parallel_schedule and
serial_schedule.
 

3) The regression tests do check plan and results like this:

    EXPLAIN (COSTS OFF, VERBOSE) SELECT ...;
    SELECT ... ORDER BY 1, 2, 3;

which however means that the query might easily use a different plan than
what's verified in the eplain (thanks to the additional ORDER BY clause).
So I think this should explain and execute the same query.

(In this case the plans seems to be the same, but that may easily change
in the future, and we could miss it here, failing to verify the results.)

Thank you for pointing this out. Fixed it in V4 patch.
 

4) It might be a good idea to check the negative case too, i.e. a query on
data set that we should not parallelize (because the number of partial
groups would be too high).

Yes, agree. Added a negative case.
 


Do you have any plans to hack on the second approach too? AFAICS those two
approaches are complementary (address different data sets / queries), and
it would be nice to have both. One of the things I've been wondering is if
we need to invent gset_id as a new concept, or if we could simply use the
existing GROUPING() function - that uniquely identifies the grouping set.


Yes, I'm planning to hack on the second approach in short future. I'm
also reconsidering the gset_id stuff since it brings a lot of complexity
for the second approach.  I agree with you that we can try GROUPING()
function to see if it can replace gset_id.

Thanks
Richard

0001-Support-for-parallel-grouping-sets.patch (86K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Pengzhou Tang
Hi Hackers,

Richard pointed out that he get incorrect results with the patch I attached, there are bugs somewhere,
I fixed them now and attached the newest version, please refer to [1] for the fix.

Thanks,
Pengzhou

On Mon, Sep 30, 2019 at 5:41 PM Pengzhou Tang <[hidden email]> wrote:
Hi Richard & Tomas:

I followed the idea of the second approach to add a gset_id in the targetlist of the first stage of 
grouping sets and uses it to combine the aggregate in final stage. gset_id stuff is still kept
because of GROUPING() cannot uniquely identify a grouping set, grouping sets may contain
duplicated set, eg: group by grouping sets((c1, c2), (c1,c2)).

There are some differences to implement the second approach comparing to the original idea from
Richard, gset_id is not used as additional group key in the final stage, instead, we use it to
dispatch the input tuple to the specified grouping set directly and then do the aggregate.
One advantage of this is that we can handle multiple rollups with better performance without APPEND node.

the plan now looks like:

gpadmin=# explain select c1, c2 from gstest group by grouping sets(rollup(c1, c2), rollup(c3));
                                         QUERY PLAN
--------------------------------------------------------------------------------------------
 Finalize MixedAggregate  (cost=1000.00..73108.57 rows=8842 width=12)
   Dispatched by: (GROUPINGSETID())
   Hash Key: c1, c2
   Hash Key: c1
   Hash Key: c3
   Group Key: ()
     Group Key: ()
   ->  Gather  (cost=1000.00..71551.48 rows=17684 width=16)
         Workers Planned: 2
         ->  Partial MixedAggregate  (cost=0.00..68783.08 rows=8842 width=16)
               Hash Key: c1, c2
               Hash Key: c1
               Hash Key: c3
               Group Key: ()
               Group Key: ()
               ->  Parallel Seq Scan on gstest  (cost=0.00..47861.33 rows=2083333 width=12)
(16 rows)

gpadmin=# set enable_hashagg to off;
gpadmin=# explain select c1, c2 from gstest group by grouping sets(rollup(c1, c2), rollup(c3));
                                               QUERY PLAN
--------------------------------------------------------------------------------------------------------
 Finalize GroupAggregate  (cost=657730.66..663207.45 rows=8842 width=12)
   Dispatched by: (GROUPINGSETID())
   Group Key: c1, c2
   Sort Key: c1
     Group Key: c1
     Group Key: ()
     Group Key: ()
   Sort Key: c3
     Group Key: c3
   ->  Sort  (cost=657730.66..657774.87 rows=17684 width=16)
         Sort Key: c1, c2
         ->  Gather  (cost=338722.94..656483.04 rows=17684 width=16)
               Workers Planned: 2
               ->  Partial GroupAggregate  (cost=337722.94..653714.64 rows=8842 width=16)
                     Group Key: c1, c2
                     Group Key: c1
                     Group Key: ()
                     Group Key: ()
                     Sort Key: c3
                       Group Key: c3
                     ->  Sort  (cost=337722.94..342931.28 rows=2083333 width=12)
                           Sort Key: c1, c2
                           ->  Parallel Seq Scan on gstest  (cost=0.00..47861.33 rows=2083333 width=12)



On Wed, Jul 31, 2019 at 4:07 PM Richard Guo <[hidden email]> wrote:
On Tue, Jul 30, 2019 at 11:05 PM Tomas Vondra <[hidden email]> wrote:
On Tue, Jul 30, 2019 at 03:50:32PM +0800, Richard Guo wrote:
>On Wed, Jun 12, 2019 at 10:58 AM Richard Guo <[hidden email]> wrote:
>
>> Hi all,
>>
>> Paul and I have been hacking recently to implement parallel grouping
>> sets, and here we have two implementations.
>>
>> Implementation 1
>> ================
>>
>> Attached is the patch and also there is a github branch [1] for this
>> work.
>>
>
>Rebased with the latest master.
>

Hi Richard,

thanks for the rebased patch. I think the patch is mostly fine (at least I
don't see any serious issues). A couple minor comments:

Hi Tomas,

Thank you for reviewing this patch.
 

1) I think get_number_of_groups() would deserve a short explanation why
it's OK to handle (non-partial) grouping sets and regular GROUP BY in the
same branch. Before these cases were clearly separated, now it seems a bit
mixed up and it may not be immediately obvious why it's OK.

Added a short comment in get_number_of_groups() explaining the behavior
when doing partial aggregation for grouping sets.
 

2) There are new regression tests, but they are not added to any schedule
(parallel or serial), and so are not executed as part of "make check". I
suppose this is a mistake.

Yes, thanks. Added the new regression test in parallel_schedule and
serial_schedule.
 

3) The regression tests do check plan and results like this:

    EXPLAIN (COSTS OFF, VERBOSE) SELECT ...;
    SELECT ... ORDER BY 1, 2, 3;

which however means that the query might easily use a different plan than
what's verified in the eplain (thanks to the additional ORDER BY clause).
So I think this should explain and execute the same query.

(In this case the plans seems to be the same, but that may easily change
in the future, and we could miss it here, failing to verify the results.)

Thank you for pointing this out. Fixed it in V4 patch.
 

4) It might be a good idea to check the negative case too, i.e. a query on
data set that we should not parallelize (because the number of partial
groups would be too high).

Yes, agree. Added a negative case.
 


Do you have any plans to hack on the second approach too? AFAICS those two
approaches are complementary (address different data sets / queries), and
it would be nice to have both. One of the things I've been wondering is if
we need to invent gset_id as a new concept, or if we could simply use the
existing GROUPING() function - that uniquely identifies the grouping set.


Yes, I'm planning to hack on the second approach in short future. I'm
also reconsidering the gset_id stuff since it brings a lot of complexity
for the second approach.  I agree with you that we can try GROUPING()
function to see if it can replace gset_id.

Thanks
Richard

v2-0001-Support-for-parallel-grouping-sets.patch (87K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Michael Paquier-2
On Thu, Nov 28, 2019 at 07:07:22PM +0800, Pengzhou Tang wrote:
> Richard pointed out that he get incorrect results with the patch I
> attached, there are bugs somewhere,
> I fixed them now and attached the newest version, please refer to [1] for
> the fix.

Mr Robot is reporting that the latest patch fails to build at least on
Windows.  Could you please send a rebase?  I have moved for now the
patch to next CF, waiting on author.
--
Michael

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Richard Guo-2

On Sun, Dec 1, 2019 at 10:03 AM Michael Paquier <[hidden email]> wrote:
On Thu, Nov 28, 2019 at 07:07:22PM +0800, Pengzhou Tang wrote:
> Richard pointed out that he get incorrect results with the patch I
> attached, there are bugs somewhere,
> I fixed them now and attached the newest version, please refer to [1] for
> the fix.

Mr Robot is reporting that the latest patch fails to build at least on
Windows.  Could you please send a rebase?  I have moved for now the
patch to next CF, waiting on author.

Thanks for reporting this issue. Here is the rebase.

Thanks
Richard

v3-0001-Support-for-parallel-grouping-sets.patch (87K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Richard Guo-2
I realized that there are two patches in this thread that are
implemented according to different methods, which causes confusion. So I
decide to update this thread with only one patch, i.e. the patch for
'Implementation 1' as described in the first email and then move the
other patch to a separate thread.

With this idea, here is the patch for 'Implementation 1' that is rebased
with the latest master.

Thanks
Richard

On Wed, Jan 8, 2020 at 3:24 PM Richard Guo <[hidden email]> wrote:

On Sun, Dec 1, 2019 at 10:03 AM Michael Paquier <[hidden email]> wrote:
On Thu, Nov 28, 2019 at 07:07:22PM +0800, Pengzhou Tang wrote:
> Richard pointed out that he get incorrect results with the patch I
> attached, there are bugs somewhere,
> I fixed them now and attached the newest version, please refer to [1] for
> the fix.

Mr Robot is reporting that the latest patch fails to build at least on
Windows.  Could you please send a rebase?  I have moved for now the
patch to next CF, waiting on author.

Thanks for reporting this issue. Here is the rebase.

Thanks
Richard

v5-0001-Implementing-parallel-grouping-sets.patch (33K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

akapila
On Sun, Jan 19, 2020 at 2:23 PM Richard Guo <[hidden email]> wrote:
>
> I realized that there are two patches in this thread that are
> implemented according to different methods, which causes confusion.
>

Both the idea seems to be different.  Is the second approach [1]
inferior for any case as compared to the first approach?  Can we keep
both approaches for parallel grouping sets, if so how?  If not, then
won't the code by the first approach be useless once we commit second
approach?


[1] - https://www.postgresql.org/message-id/CAN_9JTwtTTnxhbr5AHuqVcriz3HxvPpx1JWE--DCSdJYuHrLtA%40mail.gmail.com

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Jesse Zhang
On Thu, Jan 23, 2020 at 2:47 AM Amit Kapila <[hidden email]> wrote:

>
> On Sun, Jan 19, 2020 at 2:23 PM Richard Guo <[hidden email]> wrote:
> >
> > I realized that there are two patches in this thread that are
> > implemented according to different methods, which causes confusion.
> >
>
> Both the idea seems to be different.  Is the second approach [1]
> inferior for any case as compared to the first approach?  Can we keep
> both approaches for parallel grouping sets, if so how?  If not, then
> won't the code by the first approach be useless once we commit second
> approach?
>
>
> [1] - https://www.postgresql.org/message-id/CAN_9JTwtTTnxhbr5AHuqVcriz3HxvPpx1JWE--DCSdJYuHrLtA%40mail.gmail.com
>

I glanced over both patches. Just the opposite, I have a hunch that v3
is always better than v5. Here's my 6-minute understanding of both.

v5 (the one with a simple partial aggregate) works by pushing a little
bit of partial aggregate onto workers, and perform grouping aggregate
above gather. This has two interesting outcomes: we can execute
unmodified partial aggregate on the workers, and execute almost
unmodified rollup aggreegate once the trans values are gathered. A
parallel plan for a query like

SELECT count(*) FROM foo GROUP BY GROUPING SETS (a), (b), (c), ();

can be

Finalize GroupAggregate
  Output: count(*)
  Group Key: a
  Group Key: b
  Group Key: c
  Group Key: ()
    Gather Merge
        Partial GroupAggregate
          Output: PARTIAL count(*)
          Group Key: a, b, c
            Sort
              Sort Key: a, b, c
                Parallel Seq Scan on foo


v3 ("the one with grouping set id") really turns the plan from a tree to
a multiplexed pipe: we can execute grouping aggregate on the workers,
but only partially. When we emit the trans values, also tag the tuple
with a group id. After gather, finalize the aggregates with a modified
grouping aggregate. Unlike a non-split grouping aggregate, the finalize
grouping aggregate does not "flow" the results from one rollup to the
next one. Instead, each group only advances on partial inputs tagged for
the group.

Finalize HashAggregate
  Output: count(*)
  Dispatched by: (GroupingSetID())
  Group Key: a
  Group Key: b
  Group Key: c
    Gather
        Partial GroupAggregate
          Output: PARTIAL count(*), GroupingSetID()
          Group Key: a
          Sort Key: b
          Group Key: b
          Sort Key: c
          Group Key: c
            Sort
              Sort Key: a
                Parallel Seq Scan on foo

Note that for the first approach to be viable, the partial aggregate
*has to* use a group key that's the union of all grouping sets. In cases
where individual columns have a low cardinality but joint cardinality is
high (say columns a, b, c each has 16 distinct values, but they are
independent, so there are 4096 distinct values on (a,b,c)), this results
in fairly high traffic through the shm tuple queue.

Cheers,
Jesse


Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

akapila
On Sat, Jan 25, 2020 at 4:22 AM Jesse Zhang <[hidden email]> wrote:

>
> On Thu, Jan 23, 2020 at 2:47 AM Amit Kapila <[hidden email]> wrote:
> >
> > On Sun, Jan 19, 2020 at 2:23 PM Richard Guo <[hidden email]> wrote:
> > >
> > > I realized that there are two patches in this thread that are
> > > implemented according to different methods, which causes confusion.
> > >
> >
> > Both the idea seems to be different.  Is the second approach [1]
> > inferior for any case as compared to the first approach?  Can we keep
> > both approaches for parallel grouping sets, if so how?  If not, then
> > won't the code by the first approach be useless once we commit second
> > approach?
> >
> >
> > [1] - https://www.postgresql.org/message-id/CAN_9JTwtTTnxhbr5AHuqVcriz3HxvPpx1JWE--DCSdJYuHrLtA%40mail.gmail.com
> >
>
> I glanced over both patches. Just the opposite, I have a hunch that v3
> is always better than v5.
>

This is what I also understood after reading this thread.  So, my
question is why not just review v3 and commit something on those lines
even though it would take a bit more time.  It is possible that if we
decide to go with v5, we can make it happen earlier, but later when we
try to get v3, the code committed as part of v5 might not be of any
use or if it is useful, then in which cases?


--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Richard Guo-2
In reply to this post by Jesse Zhang
Hi Jesse,

Thanks for reviewing these two patches.

On Sat, Jan 25, 2020 at 6:52 AM Jesse Zhang <[hidden email]> wrote:

I glanced over both patches. Just the opposite, I have a hunch that v3
is always better than v5. Here's my 6-minute understanding of both.

v5 (the one with a simple partial aggregate) works by pushing a little
bit of partial aggregate onto workers, and perform grouping aggregate
above gather. This has two interesting outcomes: we can execute
unmodified partial aggregate on the workers, and execute almost
unmodified rollup aggreegate once the trans values are gathered. A
parallel plan for a query like

SELECT count(*) FROM foo GROUP BY GROUPING SETS (a), (b), (c), ();

can be

Finalize GroupAggregate
  Output: count(*)
  Group Key: a
  Group Key: b
  Group Key: c
  Group Key: ()
    Gather Merge
        Partial GroupAggregate
          Output: PARTIAL count(*)
          Group Key: a, b, c
            Sort
              Sort Key: a, b, c
                Parallel Seq Scan on foo

Yes, this is the idea of v5 patch.

 
v3 ("the one with grouping set id") really turns the plan from a tree to
a multiplexed pipe: we can execute grouping aggregate on the workers,
but only partially. When we emit the trans values, also tag the tuple
with a group id. After gather, finalize the aggregates with a modified
grouping aggregate. Unlike a non-split grouping aggregate, the finalize
grouping aggregate does not "flow" the results from one rollup to the
next one. Instead, each group only advances on partial inputs tagged for
the group.

Finalize HashAggregate
  Output: count(*)
  Dispatched by: (GroupingSetID())
  Group Key: a
  Group Key: b
  Group Key: c
    Gather
        Partial GroupAggregate
          Output: PARTIAL count(*), GroupingSetID()
          Group Key: a
          Sort Key: b
          Group Key: b
          Sort Key: c
          Group Key: c
            Sort
              Sort Key: a
                Parallel Seq Scan on foo

Yes, this is what v3 patch does.

We (Pengzhou and I) had an offline discussion on this plan and we have
some other idea.  Since we have tagged 'GroupingSetId' for each tuple
produced by partial aggregate, why not then perform a normal grouping
sets aggregation in the final phase, with the 'GroupingSetId' included
in the group keys?  The plan looks like:

# explain (costs off, verbose)
select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1),(c2,c3));
                            QUERY PLAN
------------------------------------------------------------------
 Finalize GroupAggregate
   Output: c1, c2, c3, avg(c3)
   Group Key: (gset_id), gstest.c1, gstest.c2, gstest.c3
   ->  Sort
         Output: c1, c2, c3, (gset_id), (PARTIAL avg(c3))
         Sort Key: (gset_id), gstest.c1, gstest.c2, gstest.c3
         ->  Gather
               Output: c1, c2, c3, (gset_id), (PARTIAL avg(c3))
               Workers Planned: 4
               ->  Partial HashAggregate
                     Output: c1, c2, c3, gset_id, PARTIAL avg(c3)
                     Hash Key: gstest.c1, gstest.c2
                     Hash Key: gstest.c1
                     Hash Key: gstest.c2, gstest.c3
                     ->  Parallel Seq Scan on public.gstest
                           Output: c1, c2, c3

This plan should be able to give the correct results. We are still
thinking if it is a better plan than the 'multiplexed pipe' plan as in
v3. Inputs of thoughts here would be appreciated.


Note that for the first approach to be viable, the partial aggregate
*has to* use a group key that's the union of all grouping sets. In cases
where individual columns have a low cardinality but joint cardinality is
high (say columns a, b, c each has 16 distinct values, but they are
independent, so there are 4096 distinct values on (a,b,c)), this results
in fairly high traffic through the shm tuple queue.

Yes, you are right. This is the case mentioned by David earlier in [1].
In this case, ideally the parallel plan would fail when competing with
non-parallel plan in add_path() and so not be chosen.

 
Thanks
Richard
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Richard Guo-2
In reply to this post by akapila
Hi Amit,

Thanks for reviewing these two patches.

On Sat, Jan 25, 2020 at 6:31 PM Amit Kapila <[hidden email]> wrote:

This is what I also understood after reading this thread.  So, my
question is why not just review v3 and commit something on those lines
even though it would take a bit more time.  It is possible that if we
decide to go with v5, we can make it happen earlier, but later when we
try to get v3, the code committed as part of v5 might not be of any
use or if it is useful, then in which cases?

Yes, approach #2 (v3) would be generally better than approach #1 (v5) in
performance. I started with approach #1 because it is much easier.

If we decide to go with approach #2, I think we can now concentrate on
v3 patch.

For v3 patch, we have some other idea, which is to perform a normal
grouping sets aggregation in the final phase, with 'GroupingSetId'
included in the group keys (as described in the previous email). With
this idea, we can avoid a lot of hacky codes in current v3 patch.

Thanks
Richard
Reply | Threaded
Open this post in threaded view
|

Re: Parallel grouping sets

Jesse Zhang
In reply to this post by Richard Guo-2
On Mon, Feb 3, 2020 at 12:07 AM Richard Guo <[hidden email]> wrote:
>
> Hi Jesse,
>
> Thanks for reviewing these two patches.
I enjoyed it!

>
> On Sat, Jan 25, 2020 at 6:52 AM Jesse Zhang <[hidden email]> wrote:
>>
>>
>> I glanced over both patches. Just the opposite, I have a hunch that v3
>> is always better than v5. Here's my 6-minute understanding of both.
>>
>> v3 ("the one with grouping set id") really turns the plan from a tree to
>> a multiplexed pipe: we can execute grouping aggregate on the workers,
>> but only partially. When we emit the trans values, also tag the tuple
>> with a group id. After gather, finalize the aggregates with a modified
>> grouping aggregate. Unlike a non-split grouping aggregate, the finalize
>> grouping aggregate does not "flow" the results from one rollup to the
>> next one. Instead, each group only advances on partial inputs tagged for
>> the group.
>>
>
> Yes, this is what v3 patch does.
>
> We (Pengzhou and I) had an offline discussion on this plan and we have
> some other idea.  Since we have tagged 'GroupingSetId' for each tuple
> produced by partial aggregate, why not then perform a normal grouping
> sets aggregation in the final phase, with the 'GroupingSetId' included
> in the group keys?  The plan looks like:
>
> # explain (costs off, verbose)
> select c1, c2, c3, avg(c3) from gstest group by grouping sets((c1,c2),(c1),(c2,c3));
>                             QUERY PLAN
> ------------------------------------------------------------------
>  Finalize GroupAggregate
>    Output: c1, c2, c3, avg(c3)
>    Group Key: (gset_id), gstest.c1, gstest.c2, gstest.c3
>    ->  Sort
>          Output: c1, c2, c3, (gset_id), (PARTIAL avg(c3))
>          Sort Key: (gset_id), gstest.c1, gstest.c2, gstest.c3
>          ->  Gather
>                Output: c1, c2, c3, (gset_id), (PARTIAL avg(c3))
>                Workers Planned: 4
>                ->  Partial HashAggregate
>                      Output: c1, c2, c3, gset_id, PARTIAL avg(c3)
>                      Hash Key: gstest.c1, gstest.c2
>                      Hash Key: gstest.c1
>                      Hash Key: gstest.c2, gstest.c3
>                      ->  Parallel Seq Scan on public.gstest
>                            Output: c1, c2, c3
>
> This plan should be able to give the correct results. We are still
> thinking if it is a better plan than the 'multiplexed pipe' plan as in
> v3. Inputs of thoughts here would be appreciated.

Ha, I believe you meant to say a "normal aggregate", because what's
performed above gather is no longer "grouping sets", right?

The group key idea is clever in that it helps "discriminate" tuples by
their grouping set id. I haven't completely thought this through, but my
hunch is that this leaves some money on the table, for example, won't it
also lead to more expensive (and unnecessary) sorting and hashing? The
groupings with a few partials are now sharing the same tuplesort with
the groupings with a lot of groups even though we only want to tell
grouping 1 *apart from* grouping 10, not neccessarily that grouping 1
needs to come before grouping 10. That's why I like the multiplexed pipe
/ "dispatched by grouping set id" idea: we only pay for sorting (or
hashing) within each grouping. That said, I'm open to the criticism that
keeping multiple tuplesort and agg hash tabes running is expensive in
itself, memory-wise ...

Cheers,
Jesse


12