Parallel Foreign Scans - need advice

classic Classic list List threaded Threaded
8 messages Options
Reply | Threaded
Open this post in threaded view
|

Parallel Foreign Scans - need advice

Korry Douglas-2
Hi all, I’m working on an FDW that would benefit greatly from parallel foreign scan.  I have implemented the callbacks described here:https://www.postgresql.org/docs/devel/fdw-callbacks.html#FDW-CALLBACKS-PARALLEL. and I see a big improvement in certain plans.

My problem is that I can’t seem to get a parallel foreign scan in a query that does not contain an aggregate.

For example:
   SELECT count(*) FROM foreign table;
Gives me a parallel scan, but
   SELECT * FROM foreign table;
Does not.

I’ve been fiddling with the costing GUCs, foreign scan row estimates, and foreign scan cost estimates - I can force the cost of a partial path to be much lower than a sequential foreign scan, but no luck.

Any troubleshooting advice?

A second related question - how can I find the actual number of workers chose for my ForeignScan?  At the moment, I looking at ParallelContext->nworkers (inside of the InitializeDSMForeignScan() callback) because that seems to be the first callback function that might provide the worker count.  I need the *actual* worker count in order to evenly distribute my workload.  I can’t use the usual trick of having each worker grab the next available chunk (because I have to avoid seek operations on compressed data). In other words, it is of great advantage for each worker to read contiguous chunks of data - seeking to another part of the file is prohibitively expensive.

Thanks for all help.

            — Korry



Reply | Threaded
Open this post in threaded view
|

Re: Parallel Foreign Scans - need advice

Andres Freund
Hi,

On 2019-05-15 12:55:33 -0400, Korry Douglas wrote:
> Hi all, I’m working on an FDW that would benefit greatly from parallel foreign scan.  I have implemented the callbacks described here:https://www.postgresql.org/docs/devel/fdw-callbacks.html#FDW-CALLBACKS-PARALLEL. and I see a big improvement in certain plans.
>
> My problem is that I can’t seem to get a parallel foreign scan in a query that does not contain an aggregate.
>
> For example:
>    SELECT count(*) FROM foreign table;
> Gives me a parallel scan, but
>    SELECT * FROM foreign table;
> Does not.

Well, that'd be bound by the cost of transferring tuples between workers
and leader. You don't get, unless you fiddle heavily with the cost, a
parallel scan for the equivalent local table scan either. You can
probably force the planner's hand by setting parallel_setup_cost,
parallel_tuple_cost very low - but it's unlikely to be beneficial.

If you added a where clause that needs to be evaluated outside the FDW,
you'd probably see parallel scans without fiddling with the costs.


> A second related question - how can I find the actual number of
> workers chose for my ForeignScan?  At the moment, I looking at
> ParallelContext->nworkers (inside of the InitializeDSMForeignScan()
> callback) because that seems to be the first callback function that
> might provide the worker count.  I need the *actual* worker count in
> order to evenly distribute my workload.  I can’t use the usual trick
> of having each worker grab the next available chunk (because I have to
> avoid seek operations on compressed data). In other words, it is of
> great advantage for each worker to read contiguous chunks of data -
> seeking to another part of the file is prohibitively expensive.

Don't think - but am not sure - that there's a nicer way
currently. Although I'd use nworkers_launched, rather than nworkers.

Greetings,

Andres Freund


Reply | Threaded
Open this post in threaded view
|

Re: Parallel Foreign Scans - need advice

Korry Douglas-2
Thanks for the quick answer Andres.  You’re right - it was parallel_tuple_cost that was getting in my way; my query returns about 6 million rows  so I guess that can add up.

If I change parallel_tuple_scan from 0.1 to 0.0001, I get a parallel foreign scan.

With 4 workers, that reduces my execution time by about half.  

But, nworkers_launched is always set to 0 in InitializeDSMForeignScan(), so that won’t work.  Any other ideas?

                — Korry

> On May 15, 2019, at 1:08 PM, Andres Freund <[hidden email]> wrote:
>
> Hi,
>
> On 2019-05-15 12:55:33 -0400, Korry Douglas wrote:
>> Hi all, I’m working on an FDW that would benefit greatly from parallel foreign scan.  I have implemented the callbacks described here:https://www.postgresql.org/docs/devel/fdw-callbacks.html#FDW-CALLBACKS-PARALLEL. and I see a big improvement in certain plans.
>>
>> My problem is that I can’t seem to get a parallel foreign scan in a query that does not contain an aggregate.
>>
>> For example:
>>   SELECT count(*) FROM foreign table;
>> Gives me a parallel scan, but
>>   SELECT * FROM foreign table;
>> Does not.
>
> Well, that'd be bound by the cost of transferring tuples between workers
> and leader. You don't get, unless you fiddle heavily with the cost, a
> parallel scan for the equivalent local table scan either. You can
> probably force the planner's hand by setting parallel_setup_cost,
> parallel_tuple_cost very low - but it's unlikely to be beneficial.
>
> If you added a where clause that needs to be evaluated outside the FDW,
> you'd probably see parallel scans without fiddling with the costs.
>
>
>> A second related question - how can I find the actual number of
>> workers chose for my ForeignScan?  At the moment, I looking at
>> ParallelContext->nworkers (inside of the InitializeDSMForeignScan()
>> callback) because that seems to be the first callback function that
>> might provide the worker count.  I need the *actual* worker count in
>> order to evenly distribute my workload.  I can’t use the usual trick
>> of having each worker grab the next available chunk (because I have to
>> avoid seek operations on compressed data). In other words, it is of
>> great advantage for each worker to read contiguous chunks of data -
>> seeking to another part of the file is prohibitively expensive.
>
> Don't think - but am not sure - that there's a nicer way
> currently. Although I'd use nworkers_launched, rather than nworkers.
>
> Greetings,
>
> Andres Freund



Reply | Threaded
Open this post in threaded view
|

Re: Parallel Foreign Scans - need advice

Andres Freund
Hi,

Don't top quote on these list...

On 2019-05-15 13:31:59 -0400, Korry Douglas wrote:
> Thanks for the quick answer Andres.  You’re right - it was parallel_tuple_cost that was getting in my way; my query returns about 6 million rows  so I guess that can add up.
>
> If I change parallel_tuple_scan from 0.1 to 0.0001, I get a parallel foreign scan.
>
> With 4 workers, that reduces my execution time by about half.  

Then you probably need to adjust the scan costs you have.


> But, nworkers_launched is always set to 0 in
> InitializeDSMForeignScan(), so that won’t work.  Any other ideas?

At that state it's simply not yet known how many workers will be
actually launched (they might not start successfully or such). Why do
you need to know it there and not later?

- Andres


Reply | Threaded
Open this post in threaded view
|

Re: Parallel Foreign Scans - need advice

Korry Douglas-2

>> But, nworkers_launched is always set to 0 in
>> InitializeDSMForeignScan(), so that won’t work.  Any other ideas?
>
> At that state it's simply not yet known how many workers will be
> actually launched (they might not start successfully or such). Why do
> you need to know it there and not later?
>
> - Andres

I need to know at some point *before* I actually start scanning.  The ParallelContext pointer is only available in EstimateDSMForeignScan(), InitializeDSMForeignScan(), and ReInitializeDSMForeignScan().  

If there is some other way to discover the actual worker count, I’m open to that.  The three functions above are not particularly helpful to me so I’m happy to look somewhere else.

         — Korry

Reply | Threaded
Open this post in threaded view
|

Re: Parallel Foreign Scans - need advice

Thomas Munro-5
On Thu, May 16, 2019 at 5:46 AM Korry Douglas <[hidden email]> wrote:

> >> But, nworkers_launched is always set to 0 in
> >> InitializeDSMForeignScan(), so that won’t work.  Any other ideas?
> >
> > At that state it's simply not yet known how many workers will be
> > actually launched (they might not start successfully or such). Why do
> > you need to know it there and not later?
> >
> > - Andres
>
> I need to know at some point *before* I actually start scanning.  The ParallelContext pointer is only available in EstimateDSMForeignScan(), InitializeDSMForeignScan(), and ReInitializeDSMForeignScan().

Hi Korry,

That's only a superficial problem.  You don't even know if or when the
workers that are launched will all finish up running your particular
node, because (for example) they might be sent to different children
of a Parallel Append node above you (AFAICS there is no way for a
participant to indicate "I've finished all the work allocated to me,
but I happen to know that some other worker #3 is needed here" -- as
soon as any participant reports that it has executed the plan to
completion, pa_finished[] will prevent new workers from picking that
node to execute).  Suppose we made a rule that *every* worker must
visit *every* partial child of a Parallel Append and run it to
completion (and any similar node in the future must do the same)...
then I think there is still a higher level design problem: if you do
allocate work up front rather than on demand, then work could be
unevenly distributed, and parallel query would be weakened.

So I think you ideally need a simple get-next-chunk work allocator
(like Parallel Seq Scan and like the file_fdw patch I posted[1]), or a
pass-the-baton work allocator when there is a dependency between
chunks (like Parallel Index Scan for btrees), or a more complicated
multi-phase system that counts participants arriving and joining in
(like Parallel Hash) so that participants can coordinate and wait for
each other in controlled circumstances.

If this compressed data doesn't have natural chunks designed for this
purpose (like, say, ORC stripes), perhaps you could have a dedicated
workers streaming data (compressed? decompressed?) into shared memory,
and parallel query participants coordinating to consume chunks of
that?

[1] https://www.postgresql.org/message-id/CA%2BhUKG%2BqK3E2RF75PKfsV0sn2s018%2Bft--hUuCmd2R_yQ9tmPQ%40mail.gmail.com

--
Thomas Munro
https://enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel Foreign Scans - need advice

Korry Douglas-2

> That's only a superficial problem.  You don't even know if or when the
> workers that are launched will all finish up running your particular
> node, because (for example) they might be sent to different children
> of a Parallel Append node above you (AFAICS there is no way for a
> participant to indicate "I've finished all the work allocated to me,
> but I happen to know that some other worker #3 is needed here" -- as
> soon as any participant reports that it has executed the plan to
> completion, pa_finished[] will prevent new workers from picking that
> node to execute).  Suppose we made a rule that *every* worker must
> visit *every* partial child of a Parallel Append and run it to
> completion (and any similar node in the future must do the same)...
> then I think there is still a higher level design problem: if you do
> allocate work up front rather than on demand, then work could be
> unevenly distributed, and parallel query would be weakened.

What I really need (for the scheme I’m using at the moment) is to know how many workers will be used to execute my particular Plan.  I understand that some workers will naturally end up idle while the last (busy) worker finishes up.  I’m dividing the workload (the number of row groups to scan) by the number of workers to get an even distribution.  

I’m willing to pay that price (at least, I haven’t seen a problem so far… famous last words)

I do plan to switch over to get-next-chunk allocator as you mentioned below, but I’d like to get the minimized-seek mechanism working first.

It sounds like there is no reliable way to get the information that I’m looking for, is that right?

> So I think you ideally need a simple get-next-chunk work allocator
> (like Parallel Seq Scan and like the file_fdw patch I posted[1]), or a
> pass-the-baton work allocator when there is a dependency between
> chunks (like Parallel Index Scan for btrees), or a more complicated
> multi-phase system that counts participants arriving and joining in
> (like Parallel Hash) so that participants can coordinate and wait for
> each other in controlled circumstances.

I haven’t looked at Parallel Hash - will try to understand that next.

> If this compressed data doesn't have natural chunks designed for this
> purpose (like, say, ORC stripes), perhaps you could have a dedicated
> workers streaming data (compressed? decompressed?) into shared memory,
> and parallel query participants coordinating to consume chunks of
> that?


I’ll give that some thought.  Thanks for the ideas.

                    — Korry



Reply | Threaded
Open this post in threaded view
|

Re: Parallel Foreign Scans - need advice

Thomas Munro-5
On Fri, May 17, 2019 at 12:45 AM Korry Douglas <[hidden email]> wrote:
> It sounds like there is no reliable way to get the information that I’m looking for, is that right?

Correct.  And if there were, it could only be used to write bugs.  Let
me see if I can demonstrate...  I'll use the file_fdw patch from the
link I gave before, and I'll add an elog(LOG) message to show when
fileIterateForeignScan() runs.

$ echo 1 > /tmp/t2

postgres=# create table t1 as select generate_series(1, 1000000)::int i;
SELECT 1000000
postgres=# create server files foreign data wrapper file_fdw;
CREATE SERVER
postgres=# create foreign table t2 (n int) server files
  options (filename '/tmp/t2', format 'csv');
CREATE FOREIGN TABLE

The relevant EXPLAIN output is harder to understand if the parallel
leader participates, but it changes nothing important, so I'll turn
that off first, and then see how it is run:

postgres=# set parallel_leader_participation = off;
SET
postgres=# explain (analyze, verbose) select count(*) from (select *
from t1 union all select * from t2) ss;

QUERY PLAN
---------------------------------------------------------------------------------------------------------------------------------------------------
 Finalize Aggregate  (cost=14176.32..14176.33 rows=1 width=8) (actual
time=234.023..234.023 rows=1 loops=1)
   Output: count(*)
   ->  Gather  (cost=14176.10..14176.31 rows=2 width=8) (actual
time=233.840..235.079 rows=2 loops=1)
         Output: (PARTIAL count(*))
         Workers Planned: 2
         Workers Launched: 2
         ->  Partial Aggregate  (cost=13176.10..13176.11 rows=1
width=8) (actual time=223.550..223.555 rows=1 loops=2)
               Output: PARTIAL count(*)
               Worker 0: actual time=223.432..223.443 rows=1 loops=1
               Worker 1: actual time=223.667..223.668 rows=1 loops=1
               ->  Parallel Append  (cost=0.00..11926.10 rows=500000
width=0) (actual time=0.087..166.669 rows=500000 loops=2)
                     Worker 0: actual time=0.083..166.366 rows=499687 loops=1
                     Worker 1: actual time=0.092..166.972 rows=500314 loops=1
                     ->  Parallel Seq Scan on public.t1
(cost=0.00..9425.00 rows=500000 width=0) (actual time=0.106..103.384
rows=500000 loops=2)
                           Worker 0: actual time=0.123..103.106
rows=499686 loops=1
                           Worker 1: actual time=0.089..103.662
rows=500314 loops=1
                     ->  Parallel Foreign Scan on public.t2
(cost=0.00..1.10 rows=1 width=0) (actual time=0.079..0.096 rows=1
loops=1)
                           Foreign File: /tmp/numbers
                           Foreign File Size: 2 b
                           Worker 0: actual time=0.079..0.096 rows=1 loops=1
 Planning Time: 0.219 ms
 Execution Time: 235.262 ms
(22 rows)

You can see the that Parallel Foreign Scan was only actually run by
one worker.  So if you were somehow expecting both of them to show up
in order to produce the correct results, you have a bug.  The reason
that happened is because Parallal Append sent one worker to chew on
t1, and another to chew on t2, but the scan of t2 was finished very
quickly, so that worker then went to help out with t1.  And for
further proof of that, here's what I see in my server log (note only
ever called twice, and in the same process):

2019-05-17 10:51:42.248 NZST [52158] LOG:  fileIterateForeignScan
2019-05-17 10:51:42.248 NZST [52158] STATEMENT:  explain analyze
select count(*) from (select * from t1 union all select * from t2) ss;
2019-05-17 10:51:42.249 NZST [52158] LOG:  fileIterateForeignScan
2019-05-17 10:51:42.249 NZST [52158] STATEMENT:  explain analyze
select count(*) from (select * from t1 union all select * from t2) ss;

Therefore you can't allocate the work up front based on expected
number of workers, even if it works in simple examples.  Your node
isn't necessarily the only node in the plan, and higher up nodes get
to decide when, if at all, you run, in each worker.

--
Thomas Munro
https://enterprisedb.com