Append with naive multiplexing of FDWs

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
15 messages Options
Reply | Threaded
Open this post in threaded view
|

Append with naive multiplexing of FDWs

Thomas Munro-5
Hello,

A few years back[1] I experimented with a simple readiness API that
would allow Append to start emitting tuples from whichever Foreign
Scan has data available, when working with FDW-based sharding.  I used
that primarily as a way to test Andres's new WaitEventSet stuff and my
kqueue implementation of that, but I didn't pursue it seriously
because I knew we wanted a more ambitious async executor rewrite and
many people had ideas about that, with schedulers capable of jumping
all over the tree etc.

Anyway, Stephen Frost pinged me off-list to ask about that patch, and
asked why we don't just do this naive thing until we have something
better.  It's a very localised feature that works only between Append
and its immediate children.  The patch makes it work for postgres_fdw,
but it should work for any FDW that can get its hands on a socket.

Here's a quick rebase of that old POC patch, along with a demo.  Since
2016, Parallel Append landed, but I didn't have time to think about
how to integrate with that so I did a quick "sledgehammer" rebase that
disables itself if parallelism is in the picture.

=== demo ===

create table t (a text, b text);

create or replace function slow_data(name text) returns setof t as
$$
begin
  perform pg_sleep(random());
  return query select name, generate_series(1, 100)::text as i;
end;
$$
language plpgsql;

create view t1 as select * from slow_data('t1');
create view t2 as select * from slow_data('t2');
create view t3 as select * from slow_data('t3');

create extension postgres_fdw;
create server server1 foreign data wrapper postgres_fdw options
(dbname 'postgres');
create server server2 foreign data wrapper postgres_fdw options
(dbname 'postgres');
create server server3 foreign data wrapper postgres_fdw options
(dbname 'postgres');
create user mapping for current_user server server1;
create user mapping for current_user server server2;
create user mapping for current_user server server3;
create foreign table ft1 (a text, b text) server server1 options
(table_name 't1');
create foreign table ft2 (a text, b text) server server2 options
(table_name 't2');
create foreign table ft3 (a text, b text) server server3 options
(table_name 't3');

-- create three remote shards
create table pt (a text, b text) partition by list (a);
alter table pt attach partition ft1 for values in ('ft1');
alter table pt attach partition ft2 for values in ('ft2');
alter table pt attach partition ft3 for values in ('ft3');

-- see that tuples come back in the order that they're ready

select * from pt where b like '42';

[1] https://www.postgresql.org/message-id/CAEepm%3D1CuAWfxDk%3D%3DjZ7pgCDCv52fiUnDSpUvmznmVmRKU5zpA%40mail.gmail.com

--
Thomas Munro
https://enterprisedb.com

0001-Multiplexing-Append-POC.patch (36K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Bruce Momjian
On Wed, Sep  4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:

> Hello,
>
> A few years back[1] I experimented with a simple readiness API that
> would allow Append to start emitting tuples from whichever Foreign
> Scan has data available, when working with FDW-based sharding.  I used
> that primarily as a way to test Andres's new WaitEventSet stuff and my
> kqueue implementation of that, but I didn't pursue it seriously
> because I knew we wanted a more ambitious async executor rewrite and
> many people had ideas about that, with schedulers capable of jumping
> all over the tree etc.
>
> Anyway, Stephen Frost pinged me off-list to ask about that patch, and
> asked why we don't just do this naive thing until we have something
> better.  It's a very localised feature that works only between Append
> and its immediate children.  The patch makes it work for postgres_fdw,
> but it should work for any FDW that can get its hands on a socket.
>
> Here's a quick rebase of that old POC patch, along with a demo.  Since
> 2016, Parallel Append landed, but I didn't have time to think about
> how to integrate with that so I did a quick "sledgehammer" rebase that
> disables itself if parallelism is in the picture.

Yes, sharding has been waiting on parallel FDW scans.  Would this work
for parallel partition scans if the partitions were FDWs?

--
  Bruce Momjian  <[hidden email]>        http://momjian.us
  EnterpriseDB                             http://enterprisedb.com

+ As you are, so once was I.  As I am, so you will be. +
+                      Ancient Roman grave inscription +


Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Thomas Munro-5
On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <[hidden email]> wrote:

> On Wed, Sep  4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
> > A few years back[1] I experimented with a simple readiness API that
> > would allow Append to start emitting tuples from whichever Foreign
> > Scan has data available, when working with FDW-based sharding.  I used
> > that primarily as a way to test Andres's new WaitEventSet stuff and my
> > kqueue implementation of that, but I didn't pursue it seriously
> > because I knew we wanted a more ambitious async executor rewrite and
> > many people had ideas about that, with schedulers capable of jumping
> > all over the tree etc.
> >
> > Anyway, Stephen Frost pinged me off-list to ask about that patch, and
> > asked why we don't just do this naive thing until we have something
> > better.  It's a very localised feature that works only between Append
> > and its immediate children.  The patch makes it work for postgres_fdw,
> > but it should work for any FDW that can get its hands on a socket.
> >
> > Here's a quick rebase of that old POC patch, along with a demo.  Since
> > 2016, Parallel Append landed, but I didn't have time to think about
> > how to integrate with that so I did a quick "sledgehammer" rebase that
> > disables itself if parallelism is in the picture.
>
> Yes, sharding has been waiting on parallel FDW scans.  Would this work
> for parallel partition scans if the partitions were FDWs?

Yeah, this works for partitions that are FDWs (as shown), but only for
Append, not for Parallel Append.  So you'd have parallelism in the
sense that your N remote shard servers are all doing stuff at the same
time, but it couldn't be in a parallel query on your 'home' server,
which is probably good for things that push down aggregation and bring
back just a few tuples from each shard, but bad for anything wanting
to ship back millions of tuples to chew on locally.  Do you think
that'd be useful enough on its own?

The problem is that parallel safe non-partial plans (like postgres_fdw
scans) are exclusively 'claimed' by one process under Parallel Append,
so with the patch as posted, if you modify it to allow parallelism
then it'll probably give correct answers but nothing prevents a single
process from claiming and starting all the scans and then waiting for
them to be ready, while the other processes miss out on doing any work
at all.  There's probably some kludgy solution involving not letting
any one worker start more than X, and some space cadet solution
involving passing sockets around and teaching libpq to hand over
connections at certain controlled phases of the protocol (due to lack
of threads), but nothing like that has jumped out as the right path so
far.

One idea that seems promising but requires a bunch more infrastructure
is to offload the libpq multiplexing to a background worker that owns
all the sockets, and have it push tuples into a multi-consumer shared
memory queue that regular executor processes could read from.  I have
been wondering if that would be best done by each FDW implementation,
or if there is a way to make a generic infrastructure for converting
parallel-safe executor nodes into partial plans by the use of a
'Scatter' (opposite of Gather) node that can spread the output of any
node over many workers.

If you had that, you'd still want a way for Parallel Append to be
readiness-based, but it would probably look a bit different to this
patch because it'd need to use (vapourware) multiconsumer shm queue
readiness, not fd readiness.  And another kind of fd-readiness
multiplexing would be going on inside the new (vapourware) worker that
handles all the libpq connections (and maybe other kinds of work for
other FDWs that are able to expose a socket).


Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Bruce Momjian
On Sun, Nov 17, 2019 at 09:54:55PM +1300, Thomas Munro wrote:

> On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <[hidden email]> wrote:
> > On Wed, Sep  4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
> > > A few years back[1] I experimented with a simple readiness API that
> > > would allow Append to start emitting tuples from whichever Foreign
> > > Scan has data available, when working with FDW-based sharding.  I used
> > > that primarily as a way to test Andres's new WaitEventSet stuff and my
> > > kqueue implementation of that, but I didn't pursue it seriously
> > > because I knew we wanted a more ambitious async executor rewrite and
> > > many people had ideas about that, with schedulers capable of jumping
> > > all over the tree etc.
> > >
> > > Anyway, Stephen Frost pinged me off-list to ask about that patch, and
> > > asked why we don't just do this naive thing until we have something
> > > better.  It's a very localised feature that works only between Append
> > > and its immediate children.  The patch makes it work for postgres_fdw,
> > > but it should work for any FDW that can get its hands on a socket.
> > >
> > > Here's a quick rebase of that old POC patch, along with a demo.  Since
> > > 2016, Parallel Append landed, but I didn't have time to think about
> > > how to integrate with that so I did a quick "sledgehammer" rebase that
> > > disables itself if parallelism is in the picture.
> >
> > Yes, sharding has been waiting on parallel FDW scans.  Would this work
> > for parallel partition scans if the partitions were FDWs?
>
> Yeah, this works for partitions that are FDWs (as shown), but only for
> Append, not for Parallel Append.  So you'd have parallelism in the
> sense that your N remote shard servers are all doing stuff at the same
> time, but it couldn't be in a parallel query on your 'home' server,
> which is probably good for things that push down aggregation and bring
> back just a few tuples from each shard, but bad for anything wanting
> to ship back millions of tuples to chew on locally.  Do you think
> that'd be useful enough on its own?

Yes, I think so.  There are many data warehouse queries that want to
return only aggregate values, or filter for a small number of rows.
Even OLTP queries might return only a few rows from multiple partitions.
This would allow for a proof-of-concept implementation so we can see how
realistic this approach is.

> The problem is that parallel safe non-partial plans (like postgres_fdw
> scans) are exclusively 'claimed' by one process under Parallel Append,
> so with the patch as posted, if you modify it to allow parallelism
> then it'll probably give correct answers but nothing prevents a single
> process from claiming and starting all the scans and then waiting for
> them to be ready, while the other processes miss out on doing any work
> at all.  There's probably some kludgy solution involving not letting
> any one worker start more than X, and some space cadet solution
> involving passing sockets around and teaching libpq to hand over
> connections at certain controlled phases of the protocol (due to lack
> of threads), but nothing like that has jumped out as the right path so
> far.

I am unclear how many queries can do any meaningful work until all
shards have giving their full results.

--
  Bruce Momjian  <[hidden email]>        http://momjian.us
  EnterpriseDB                             http://enterprisedb.com

+ As you are, so once was I.  As I am, so you will be. +
+                      Ancient Roman grave inscription +


Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Kyotaro Horiguchi-4
Hello.

At Sat, 30 Nov 2019 14:26:11 -0500, Bruce Momjian <[hidden email]> wrote in

> On Sun, Nov 17, 2019 at 09:54:55PM +1300, Thomas Munro wrote:
> > On Sat, Sep 28, 2019 at 4:20 AM Bruce Momjian <[hidden email]> wrote:
> > > On Wed, Sep  4, 2019 at 06:18:31PM +1200, Thomas Munro wrote:
> > > > A few years back[1] I experimented with a simple readiness API that
> > > > would allow Append to start emitting tuples from whichever Foreign
> > > > Scan has data available, when working with FDW-based sharding.  I used
> > > > that primarily as a way to test Andres's new WaitEventSet stuff and my
> > > > kqueue implementation of that, but I didn't pursue it seriously
> > > > because I knew we wanted a more ambitious async executor rewrite and
> > > > many people had ideas about that, with schedulers capable of jumping
> > > > all over the tree etc.
> > > >
> > > > Anyway, Stephen Frost pinged me off-list to ask about that patch, and
> > > > asked why we don't just do this naive thing until we have something
> > > > better.  It's a very localised feature that works only between Append
> > > > and its immediate children.  The patch makes it work for postgres_fdw,
> > > > but it should work for any FDW that can get its hands on a socket.
> > > >
> > > > Here's a quick rebase of that old POC patch, along with a demo.  Since
> > > > 2016, Parallel Append landed, but I didn't have time to think about
> > > > how to integrate with that so I did a quick "sledgehammer" rebase that
> > > > disables itself if parallelism is in the picture.
> > >
> > > Yes, sharding has been waiting on parallel FDW scans.  Would this work
> > > for parallel partition scans if the partitions were FDWs?
> >
> > Yeah, this works for partitions that are FDWs (as shown), but only for
> > Append, not for Parallel Append.  So you'd have parallelism in the
> > sense that your N remote shard servers are all doing stuff at the same
> > time, but it couldn't be in a parallel query on your 'home' server,
> > which is probably good for things that push down aggregation and bring
> > back just a few tuples from each shard, but bad for anything wanting
> > to ship back millions of tuples to chew on locally.  Do you think
> > that'd be useful enough on its own?
>
> Yes, I think so.  There are many data warehouse queries that want to
> return only aggregate values, or filter for a small number of rows.
> Even OLTP queries might return only a few rows from multiple partitions.
> This would allow for a proof-of-concept implementation so we can see how
> realistic this approach is.
>
> > The problem is that parallel safe non-partial plans (like postgres_fdw
> > scans) are exclusively 'claimed' by one process under Parallel Append,
> > so with the patch as posted, if you modify it to allow parallelism
> > then it'll probably give correct answers but nothing prevents a single
> > process from claiming and starting all the scans and then waiting for
> > them to be ready, while the other processes miss out on doing any work
> > at all.  There's probably some kludgy solution involving not letting
> > any one worker start more than X, and some space cadet solution
> > involving passing sockets around and teaching libpq to hand over
> > connections at certain controlled phases of the protocol (due to lack
> > of threads), but nothing like that has jumped out as the right path so
> > far.
>
> I am unclear how many queries can do any meaningful work until all
> shards have giving their full results.

There's my pending (somewhat stale) patch, which allows to run local
scans while waiting for remote servers.

https://www.postgresql.org/message-id/20180515.202945.69332784.horiguchi.kyotaro@...

I (or we) wanted to introduce the asynchronous node mechanism as the
basis of async-capable postgres_fdw. The reason why it is stopping is
that we are seeing and I am waiting the executor change that makes
executor push-up style, on which the async-node mechanism will be
constructed. If that won't happen shortly, I'd like to continue that
work..

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center


Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Thomas Munro-5
On Thu, Dec 5, 2019 at 4:26 PM Kyotaro Horiguchi
<[hidden email]> wrote:

> There's my pending (somewhat stale) patch, which allows to run local
> scans while waiting for remote servers.
>
> https://www.postgresql.org/message-id/20180515.202945.69332784.horiguchi.kyotaro@...
>
> I (or we) wanted to introduce the asynchronous node mechanism as the
> basis of async-capable postgres_fdw. The reason why it is stopping is
> that we are seeing and I am waiting the executor change that makes
> executor push-up style, on which the async-node mechanism will be
> constructed. If that won't happen shortly, I'd like to continue that
> work..

After rereading some threads to remind myself what happened here...
right, my little patch began life in March 2016[1] when I wanted a
test case to test Andres's work on WaitEventSets, and your patch set
started a couple of months later and is vastly more ambitious[2][3].
It wants to escape from the volcano give-me-one-tuple-or-give-me-EOF
model.  And I totally agree that there are lots of reason to want to
do that (including yielding to other parts of the plan instead of
waiting for I/O, locks and some parallelism primitives enabling new
kinds of parallelism), and I'm hoping to help with some small pieces
of that if I can.

My patch set (rebased upthread) was extremely primitive, with no new
planner concepts, and added only a very simple new executor node
method: ExecReady().  Append used that to try to ask its children if
they'd like some time to warm up.  By default, ExecReady() says "I
don't know what you're talking about, go away", but FDWs can provide
an implementation that says "yes, please call me again when this fd is
ready" or "yes, I am ready, please call ExecProc() now".  It doesn't
deal with anything more complicated than that, and in particular it
doesn't work if there are extra planner nodes in between Append and
the foreign scan.  (It also doesn't mix particularly well with
parallelism, as mentioned.)

The reason I reposted this unambitious work is because Stephen keeps
asking me why we don't consider the stupidly simple thing that would
help with simple foreign partition-based queries today, instead of
waiting for someone to redesign the entire executor, because that's
... really hard.

[1] https://www.postgresql.org/message-id/CAEepm%3D1CuAWfxDk%3D%3DjZ7pgCDCv52fiUnDSpUvmznmVmRKU5zpA%40mail.gmail.com
[2] https://www.postgresql.org/message-id/flat/CA%2BTgmobx8su_bYtAa3DgrqB%2BR7xZG6kHRj0ccMUUshKAQVftww%40mail.gmail.com
[3] https://www.postgresql.org/message-id/flat/CA%2BTgmoaXQEt4tZ03FtQhnzeDEMzBck%2BLrni0UWHVVgOTnA6C1w%40mail.gmail.com


Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Bruce Momjian
On Thu, Dec  5, 2019 at 05:45:24PM +1300, Thomas Munro wrote:

> My patch set (rebased upthread) was extremely primitive, with no new
> planner concepts, and added only a very simple new executor node
> method: ExecReady().  Append used that to try to ask its children if
> they'd like some time to warm up.  By default, ExecReady() says "I
> don't know what you're talking about, go away", but FDWs can provide
> an implementation that says "yes, please call me again when this fd is
> ready" or "yes, I am ready, please call ExecProc() now".  It doesn't
> deal with anything more complicated than that, and in particular it
> doesn't work if there are extra planner nodes in between Append and
> the foreign scan.  (It also doesn't mix particularly well with
> parallelism, as mentioned.)
>
> The reason I reposted this unambitious work is because Stephen keeps
> asking me why we don't consider the stupidly simple thing that would
> help with simple foreign partition-based queries today, instead of
> waiting for someone to redesign the entire executor, because that's
> ... really hard.

I agree with Stephen's request.  We have been waiting for the executor
rewrite for a while, so let's just do something simple and see how it
performs.

--
  Bruce Momjian  <[hidden email]>        http://momjian.us
  EnterpriseDB                             http://enterprisedb.com

+ As you are, so once was I.  As I am, so you will be. +
+                      Ancient Roman grave inscription +


Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Robert Haas
On Thu, Dec 5, 2019 at 1:12 PM Bruce Momjian <[hidden email]> wrote:
> I agree with Stephen's request.  We have been waiting for the executor
> rewrite for a while, so let's just do something simple and see how it
> performs.

I'm sympathetic to the frustration here, and I think it would be great
if we could find a way forward that doesn't involve waiting for a full
rewrite of the executor.  However, I seem to remember that when we
tested the various patches that various people had written for this
feature (I wrote one, too) they all had a noticeable performance
penalty in the case of a plain old Append that involved no FDWs and
nothing asynchronous. I don't think it's OK to have, say, a 2%
regression on every query that involves an Append, because especially
now that we have partitioning, that's a lot of queries.

I don't know whether this patch has that kind of problem. If it
doesn't, I would consider that a promising sign.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Thomas Munro-5
On Fri, Dec 6, 2019 at 9:20 AM Robert Haas <[hidden email]> wrote:

>
> On Thu, Dec 5, 2019 at 1:12 PM Bruce Momjian <[hidden email]> wrote:
> > I agree with Stephen's request.  We have been waiting for the executor
> > rewrite for a while, so let's just do something simple and see how it
> > performs.
>
> I'm sympathetic to the frustration here, and I think it would be great
> if we could find a way forward that doesn't involve waiting for a full
> rewrite of the executor.  However, I seem to remember that when we
> tested the various patches that various people had written for this
> feature (I wrote one, too) they all had a noticeable performance
> penalty in the case of a plain old Append that involved no FDWs and
> nothing asynchronous. I don't think it's OK to have, say, a 2%
> regression on every query that involves an Append, because especially
> now that we have partitioning, that's a lot of queries.
>
> I don't know whether this patch has that kind of problem. If it
> doesn't, I would consider that a promising sign.

I'll look into that.  If there is a measurable impact, I suspect it
can be avoided by, for example, installing a different ExecProcNode
function.


Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Kyotaro Horiguchi-4
At Fri, 6 Dec 2019 10:03:44 +1300, Thomas Munro <[hidden email]> wrote in
> On Fri, Dec 6, 2019 at 9:20 AM Robert Haas <[hidden email]> wrote:
> > I don't know whether this patch has that kind of problem. If it
> > doesn't, I would consider that a promising sign.
>
> I'll look into that.  If there is a measurable impact, I suspect it
> can be avoided by, for example, installing a different ExecProcNode
> function.

Replacing ExecProcNode perfectly isolates additional process in
ExecAppendAsync. Thus, for pure local appends, the patch can impact
performance through only planner and execinit. But I don't believe it
cannot be as large as observable in a large scan.

As the mail pointed upthread, the patch acceleartes all remote cases
when fetch_size is >= 200. The problem was that local scans seemed
slightly slowed down. I dusted off the old patch (FWIW I attached it)
and.. will re-run on the current development environment. (And
re-check the code.).

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

From 9f5dc3720ddade94cd66713f4aa79da575b09e31 Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <[hidden email]>
Date: Mon, 22 May 2017 12:42:58 +0900
Subject: [PATCH v1 1/3] Allow wait event set to be registered to resource
 owner

WaitEventSet needs to be released using resource owner for a certain
case. This change adds WaitEventSet reowner and allow the creator of a
WaitEventSet to specify a resource owner.
---
 src/backend/libpq/pqcomm.c                    |  2 +-
 src/backend/storage/ipc/latch.c               | 18 ++++-
 src/backend/storage/lmgr/condition_variable.c |  2 +-
 src/backend/utils/resowner/resowner.c         | 67 +++++++++++++++++++
 src/include/storage/latch.h                   |  4 +-
 src/include/utils/resowner_private.h          |  8 +++
 6 files changed, 96 insertions(+), 5 deletions(-)

diff --git a/src/backend/libpq/pqcomm.c b/src/backend/libpq/pqcomm.c
index cd517e8bb4..3912b8b3a0 100644
--- a/src/backend/libpq/pqcomm.c
+++ b/src/backend/libpq/pqcomm.c
@@ -220,7 +220,7 @@ pq_init(void)
  (errmsg("could not set socket to nonblocking mode: %m")));
 #endif
 
- FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, 3);
+ FeBeWaitSet = CreateWaitEventSet(TopMemoryContext, NULL, 3);
  AddWaitEventToSet(FeBeWaitSet, WL_SOCKET_WRITEABLE, MyProcPort->sock,
   NULL, NULL);
  AddWaitEventToSet(FeBeWaitSet, WL_LATCH_SET, -1, MyLatch, NULL);
diff --git a/src/backend/storage/ipc/latch.c b/src/backend/storage/ipc/latch.c
index 2426cbcf8e..dc04ee5f6f 100644
--- a/src/backend/storage/ipc/latch.c
+++ b/src/backend/storage/ipc/latch.c
@@ -52,6 +52,7 @@
 #include "storage/latch.h"
 #include "storage/pmsignal.h"
 #include "storage/shmem.h"
+#include "utils/resowner_private.h"
 
 /*
  * Select the fd readiness primitive to use. Normally the "most modern"
@@ -78,6 +79,8 @@ struct WaitEventSet
  int nevents; /* number of registered events */
  int nevents_space; /* maximum number of events in this set */
 
+ ResourceOwner resowner; /* Resource owner */
+
  /*
  * Array, of nevents_space length, storing the definition of events this
  * set is waiting for.
@@ -372,7 +375,7 @@ WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock,
  int ret = 0;
  int rc;
  WaitEvent event;
- WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, 3);
+ WaitEventSet *set = CreateWaitEventSet(CurrentMemoryContext, NULL, 3);
 
  if (wakeEvents & WL_TIMEOUT)
  Assert(timeout >= 0);
@@ -539,12 +542,15 @@ ResetLatch(Latch *latch)
  * WaitEventSetWait().
  */
 WaitEventSet *
-CreateWaitEventSet(MemoryContext context, int nevents)
+CreateWaitEventSet(MemoryContext context, ResourceOwner res, int nevents)
 {
  WaitEventSet *set;
  char   *data;
  Size sz = 0;
 
+ if (res)
+ ResourceOwnerEnlargeWESs(res);
+
  /*
  * Use MAXALIGN size/alignment to guarantee that later uses of memory are
  * aligned correctly. E.g. epoll_event might need 8 byte alignment on some
@@ -614,6 +620,11 @@ CreateWaitEventSet(MemoryContext context, int nevents)
  StaticAssertStmt(WSA_INVALID_EVENT == NULL, "");
 #endif
 
+ /* Register this wait event set if requested */
+ set->resowner = res;
+ if (res)
+ ResourceOwnerRememberWES(set->resowner, set);
+
  return set;
 }
 
@@ -655,6 +666,9 @@ FreeWaitEventSet(WaitEventSet *set)
  }
 #endif
 
+ if (set->resowner != NULL)
+ ResourceOwnerForgetWES(set->resowner, set);
+
  pfree(set);
 }
 
diff --git a/src/backend/storage/lmgr/condition_variable.c b/src/backend/storage/lmgr/condition_variable.c
index e08507f0cc..5e88c48a1c 100644
--- a/src/backend/storage/lmgr/condition_variable.c
+++ b/src/backend/storage/lmgr/condition_variable.c
@@ -70,7 +70,7 @@ ConditionVariablePrepareToSleep(ConditionVariable *cv)
  {
  WaitEventSet *new_event_set;
 
- new_event_set = CreateWaitEventSet(TopMemoryContext, 2);
+ new_event_set = CreateWaitEventSet(TopMemoryContext, NULL, 2);
  AddWaitEventToSet(new_event_set, WL_LATCH_SET, PGINVALID_SOCKET,
   MyLatch, NULL);
  AddWaitEventToSet(new_event_set, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
diff --git a/src/backend/utils/resowner/resowner.c b/src/backend/utils/resowner/resowner.c
index 7be11c48ab..829034516f 100644
--- a/src/backend/utils/resowner/resowner.c
+++ b/src/backend/utils/resowner/resowner.c
@@ -128,6 +128,7 @@ typedef struct ResourceOwnerData
  ResourceArray filearr; /* open temporary files */
  ResourceArray dsmarr; /* dynamic shmem segments */
  ResourceArray jitarr; /* JIT contexts */
+ ResourceArray wesarr; /* wait event sets */
 
  /* We can remember up to MAX_RESOWNER_LOCKS references to local locks. */
  int nlocks; /* number of owned locks */
@@ -175,6 +176,7 @@ static void PrintTupleDescLeakWarning(TupleDesc tupdesc);
 static void PrintSnapshotLeakWarning(Snapshot snapshot);
 static void PrintFileLeakWarning(File file);
 static void PrintDSMLeakWarning(dsm_segment *seg);
+static void PrintWESLeakWarning(WaitEventSet *events);
 
 
 /*****************************************************************************
@@ -444,6 +446,7 @@ ResourceOwnerCreate(ResourceOwner parent, const char *name)
  ResourceArrayInit(&(owner->filearr), FileGetDatum(-1));
  ResourceArrayInit(&(owner->dsmarr), PointerGetDatum(NULL));
  ResourceArrayInit(&(owner->jitarr), PointerGetDatum(NULL));
+ ResourceArrayInit(&(owner->wesarr), PointerGetDatum(NULL));
 
  return owner;
 }
@@ -553,6 +556,16 @@ ResourceOwnerReleaseInternal(ResourceOwner owner,
 
  jit_release_context(context);
  }
+
+ /* Ditto for wait event sets */
+ while (ResourceArrayGetAny(&(owner->wesarr), &foundres))
+ {
+ WaitEventSet *event = (WaitEventSet *) DatumGetPointer(foundres);
+
+ if (isCommit)
+ PrintWESLeakWarning(event);
+ FreeWaitEventSet(event);
+ }
  }
  else if (phase == RESOURCE_RELEASE_LOCKS)
  {
@@ -701,6 +714,7 @@ ResourceOwnerDelete(ResourceOwner owner)
  Assert(owner->filearr.nitems == 0);
  Assert(owner->dsmarr.nitems == 0);
  Assert(owner->jitarr.nitems == 0);
+ Assert(owner->wesarr.nitems == 0);
  Assert(owner->nlocks == 0 || owner->nlocks == MAX_RESOWNER_LOCKS + 1);
 
  /*
@@ -728,6 +742,7 @@ ResourceOwnerDelete(ResourceOwner owner)
  ResourceArrayFree(&(owner->filearr));
  ResourceArrayFree(&(owner->dsmarr));
  ResourceArrayFree(&(owner->jitarr));
+ ResourceArrayFree(&(owner->wesarr));
 
  pfree(owner);
 }
@@ -1346,3 +1361,55 @@ ResourceOwnerForgetJIT(ResourceOwner owner, Datum handle)
  elog(ERROR, "JIT context %p is not owned by resource owner %s",
  DatumGetPointer(handle), owner->name);
 }
+
+/*
+ * wait event set reference array.
+ *
+ * This is separate from actually inserting an entry because if we run out
+ * of memory, it's critical to do so *before* acquiring the resource.
+ */
+void
+ResourceOwnerEnlargeWESs(ResourceOwner owner)
+{
+ ResourceArrayEnlarge(&(owner->wesarr));
+}
+
+/*
+ * Remember that a wait event set is owned by a ResourceOwner
+ *
+ * Caller must have previously done ResourceOwnerEnlargeWESs()
+ */
+void
+ResourceOwnerRememberWES(ResourceOwner owner, WaitEventSet *events)
+{
+ ResourceArrayAdd(&(owner->wesarr), PointerGetDatum(events));
+}
+
+/*
+ * Forget that a wait event set is owned by a ResourceOwner
+ */
+void
+ResourceOwnerForgetWES(ResourceOwner owner, WaitEventSet *events)
+{
+ /*
+ * XXXX: There's no property to show as an identier of a wait event set,
+ * use its pointer instead.
+ */
+ if (!ResourceArrayRemove(&(owner->wesarr), PointerGetDatum(events)))
+ elog(ERROR, "wait event set %p is not owned by resource owner %s",
+ events, owner->name);
+}
+
+/*
+ * Debugging subroutine
+ */
+static void
+PrintWESLeakWarning(WaitEventSet *events)
+{
+ /*
+ * XXXX: There's no property to show as an identier of a wait event set,
+ * use its pointer instead.
+ */
+ elog(WARNING, "wait event set leak: %p still referenced",
+ events);
+}
diff --git a/src/include/storage/latch.h b/src/include/storage/latch.h
index bd7af11a8a..d136614587 100644
--- a/src/include/storage/latch.h
+++ b/src/include/storage/latch.h
@@ -101,6 +101,7 @@
 #define LATCH_H
 
 #include <signal.h>
+#include "utils/resowner.h"
 
 /*
  * Latch structure should be treated as opaque and only accessed through
@@ -163,7 +164,8 @@ extern void DisownLatch(Latch *latch);
 extern void SetLatch(Latch *latch);
 extern void ResetLatch(Latch *latch);
 
-extern WaitEventSet *CreateWaitEventSet(MemoryContext context, int nevents);
+extern WaitEventSet *CreateWaitEventSet(MemoryContext context,
+ ResourceOwner res, int nevents);
 extern void FreeWaitEventSet(WaitEventSet *set);
 extern int AddWaitEventToSet(WaitEventSet *set, uint32 events, pgsocket fd,
   Latch *latch, void *user_data);
diff --git a/src/include/utils/resowner_private.h b/src/include/utils/resowner_private.h
index b8261ad866..9c9c845784 100644
--- a/src/include/utils/resowner_private.h
+++ b/src/include/utils/resowner_private.h
@@ -18,6 +18,7 @@
 
 #include "storage/dsm.h"
 #include "storage/fd.h"
+#include "storage/latch.h"
 #include "storage/lock.h"
 #include "utils/catcache.h"
 #include "utils/plancache.h"
@@ -95,4 +96,11 @@ extern void ResourceOwnerRememberJIT(ResourceOwner owner,
 extern void ResourceOwnerForgetJIT(ResourceOwner owner,
    Datum handle);
 
+/* support for wait event set management */
+extern void ResourceOwnerEnlargeWESs(ResourceOwner owner);
+extern void ResourceOwnerRememberWES(ResourceOwner owner,
+ WaitEventSet *);
+extern void ResourceOwnerForgetWES(ResourceOwner owner,
+   WaitEventSet *);
+
 #endif /* RESOWNER_PRIVATE_H */
--
2.23.0


From 65fc69bb64f4596aff0c37a2d090ddf1609120bb Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <[hidden email]>
Date: Tue, 15 May 2018 20:21:32 +0900
Subject: [PATCH v1 2/3] infrastructure for asynchronous execution

This patch add an infrastructure for asynchronous execution. As a PoC
this makes only Append capable to handle asynchronously executable
subnodes.
---
 src/backend/commands/explain.c          |  17 ++
 src/backend/executor/Makefile           |   1 +
 src/backend/executor/execAsync.c        | 145 ++++++++++++
 src/backend/executor/nodeAppend.c       | 286 +++++++++++++++++++++---
 src/backend/executor/nodeForeignscan.c  |  22 +-
 src/backend/nodes/bitmapset.c           |  72 ++++++
 src/backend/nodes/copyfuncs.c           |   2 +
 src/backend/nodes/outfuncs.c            |   2 +
 src/backend/nodes/readfuncs.c           |   2 +
 src/backend/optimizer/plan/createplan.c |  83 +++++++
 src/backend/postmaster/pgstat.c         |   3 +
 src/backend/postmaster/syslogger.c      |   2 +-
 src/backend/utils/adt/ruleutils.c       |   8 +-
 src/include/executor/execAsync.h        |  23 ++
 src/include/executor/executor.h         |   1 +
 src/include/executor/nodeForeignscan.h  |   3 +
 src/include/foreign/fdwapi.h            |  11 +
 src/include/nodes/bitmapset.h           |   1 +
 src/include/nodes/execnodes.h           |  20 +-
 src/include/nodes/plannodes.h           |   9 +
 src/include/pgstat.h                    |   3 +-
 21 files changed, 676 insertions(+), 40 deletions(-)
 create mode 100644 src/backend/executor/execAsync.c
 create mode 100644 src/include/executor/execAsync.h

diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c
index 62fb3434a3..9f06e1fbdc 100644
--- a/src/backend/commands/explain.c
+++ b/src/backend/commands/explain.c
@@ -82,6 +82,7 @@ static void show_sort_keys(SortState *sortstate, List *ancestors,
    ExplainState *es);
 static void show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
    ExplainState *es);
+static void show_append_info(AppendState *astate, ExplainState *es);
 static void show_agg_keys(AggState *astate, List *ancestors,
   ExplainState *es);
 static void show_grouping_sets(PlanState *planstate, Agg *agg,
@@ -1319,6 +1320,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
  }
  if (plan->parallel_aware)
  appendStringInfoString(es->str, "Parallel ");
+ if (plan->async_capable)
+ appendStringInfoString(es->str, "Async ");
  appendStringInfoString(es->str, pname);
  es->indent++;
  }
@@ -1860,6 +1863,11 @@ ExplainNode(PlanState *planstate, List *ancestors,
  case T_Hash:
  show_hash_info(castNode(HashState, planstate), es);
  break;
+
+ case T_Append:
+ show_append_info(castNode(AppendState, planstate), es);
+ break;
+
  default:
  break;
  }
@@ -2197,6 +2205,15 @@ show_merge_append_keys(MergeAppendState *mstate, List *ancestors,
  ancestors, es);
 }
 
+static void
+show_append_info(AppendState *astate, ExplainState *es)
+{
+ Append *plan = (Append *) astate->ps.plan;
+
+ if (plan->nasyncplans > 0)
+ ExplainPropertyInteger("Async subplans", "", plan->nasyncplans, es);
+}
+
 /*
  * Show the grouping keys for an Agg node.
  */
diff --git a/src/backend/executor/Makefile b/src/backend/executor/Makefile
index a983800e4b..8a2d6e9961 100644
--- a/src/backend/executor/Makefile
+++ b/src/backend/executor/Makefile
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = \
  execAmi.o \
+ execAsync.o \
  execCurrent.o \
  execExpr.o \
  execExprInterp.o \
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644
index 0000000000..db477e2cf6
--- /dev/null
+++ b/src/backend/executor/execAsync.c
@@ -0,0 +1,145 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ *  Support routines for asynchronous execution.
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *  src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+#include "miscadmin.h"
+#include "pgstat.h"
+#include "utils/memutils.h"
+#include "utils/resowner.h"
+
+void ExecAsyncSetState(PlanState *pstate, AsyncState status)
+{
+ pstate->asyncstate = status;
+}
+
+bool
+ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+   void *data, bool reinit)
+{
+ switch (nodeTag(node))
+ {
+ case T_ForeignScanState:
+ return ExecForeignAsyncConfigureWait((ForeignScanState *)node,
+ wes, data, reinit);
+ break;
+ default:
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(node));
+ }
+}
+
+/*
+ * struct for memory context callback argument used in ExecAsyncEventWait
+ */
+typedef struct {
+ int **p_refind;
+ int *p_refindsize;
+} ExecAsync_mcbarg;
+
+/*
+ * callback function to reset static variables pointing to the memory in
+ * TopTransactionContext in ExecAsyncEventWait.
+ */
+static void ExecAsyncMemoryContextCallback(void *arg)
+{
+ /* arg is the address of the variable refind in ExecAsyncEventWait */
+ ExecAsync_mcbarg *mcbarg = (ExecAsync_mcbarg *) arg;
+ *mcbarg->p_refind = NULL;
+ *mcbarg->p_refindsize = 0;
+}
+
+#define EVENT_BUFFER_SIZE 16
+
+Bitmapset *
+ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes, long timeout)
+{
+ static int *refind = NULL;
+ static int refindsize = 0;
+ WaitEventSet *wes;
+ WaitEvent   occurred_event[EVENT_BUFFER_SIZE];
+ int noccurred = 0;
+ Bitmapset *fired_events = NULL;
+ int i;
+ int n;
+
+ n = bms_num_members(waitnodes);
+ wes = CreateWaitEventSet(TopTransactionContext,
+ TopTransactionResourceOwner, n);
+ if (refindsize < n)
+ {
+ if (refindsize == 0)
+ refindsize = EVENT_BUFFER_SIZE; /* XXX */
+ while (refindsize < n)
+ refindsize *= 2;
+ if (refind)
+ refind = (int *) repalloc(refind, refindsize * sizeof(int));
+ else
+ {
+ static ExecAsync_mcbarg mcb_arg =
+ { &refind, &refindsize };
+ static MemoryContextCallback mcb =
+ { ExecAsyncMemoryContextCallback, (void *)&mcb_arg, NULL };
+ MemoryContext oldctxt =
+ MemoryContextSwitchTo(TopTransactionContext);
+
+ /*
+ * refind points to a memory block in
+ * TopTransactionContext. Register a callback to reset it.
+ */
+ MemoryContextRegisterResetCallback(TopTransactionContext, &mcb);
+ refind = (int *) palloc(refindsize * sizeof(int));
+ MemoryContextSwitchTo(oldctxt);
+ }
+ }
+
+ n = 0;
+ for (i = bms_next_member(waitnodes, -1) ; i >= 0 ;
+ i = bms_next_member(waitnodes, i))
+ {
+ refind[i] = i;
+ if (ExecAsyncConfigureWait(wes, nodes[i], refind + i, true))
+ n++;
+ }
+
+ if (n == 0)
+ {
+ FreeWaitEventSet(wes);
+ return NULL;
+ }
+
+ noccurred = WaitEventSetWait(wes, timeout, occurred_event,
+ EVENT_BUFFER_SIZE,
+ WAIT_EVENT_ASYNC_WAIT);
+ FreeWaitEventSet(wes);
+ if (noccurred == 0)
+ return NULL;
+
+ for (i = 0 ; i < noccurred ; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ if ((w->events & (WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE)) != 0)
+ {
+ int n = *(int*)w->user_data;
+
+ fired_events = bms_add_member(fired_events, n);
+ }
+ }
+
+ return fired_events;
+}
diff --git a/src/backend/executor/nodeAppend.c b/src/backend/executor/nodeAppend.c
index 5ff986ac7d..03dec4d648 100644
--- a/src/backend/executor/nodeAppend.c
+++ b/src/backend/executor/nodeAppend.c
@@ -60,6 +60,7 @@
 #include "executor/execdebug.h"
 #include "executor/execPartition.h"
 #include "executor/nodeAppend.h"
+#include "executor/execAsync.h"
 #include "miscadmin.h"
 
 /* Shared state for parallel-aware Append. */
@@ -81,6 +82,7 @@ struct ParallelAppendState
 #define NO_MATCHING_SUBPLANS -2
 
 static TupleTableSlot *ExecAppend(PlanState *pstate);
+static TupleTableSlot *ExecAppendAsync(PlanState *pstate);
 static bool choose_next_subplan_locally(AppendState *node);
 static bool choose_next_subplan_for_leader(AppendState *node);
 static bool choose_next_subplan_for_worker(AppendState *node);
@@ -104,22 +106,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
  PlanState **appendplanstates;
  Bitmapset  *validsubplans;
  int nplans;
+ int nasyncplans;
  int firstvalid;
  int i,
  j;
 
  /* check for unsupported flags */
- Assert(!(eflags & EXEC_FLAG_MARK));
+ Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_ASYNC)));
 
  /*
  * create new AppendState for our append node
  */
  appendstate->ps.plan = (Plan *) node;
  appendstate->ps.state = estate;
- appendstate->ps.ExecProcNode = ExecAppend;
+
+ /* choose appropriate version of Exec function */
+ if (node->nasyncplans == 0)
+ appendstate->ps.ExecProcNode = ExecAppend;
+ else
+ appendstate->ps.ExecProcNode = ExecAppendAsync;
 
  /* Let choose_next_subplan_* function handle setting the first subplan */
- appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+ appendstate->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
 
  /* If run-time partition pruning is enabled, then set that up now */
  if (node->part_prune_info != NULL)
@@ -152,7 +160,7 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
  */
  if (bms_is_empty(validsubplans))
  {
- appendstate->as_whichplan = NO_MATCHING_SUBPLANS;
+ appendstate->as_whichsyncplan = NO_MATCHING_SUBPLANS;
 
  /* Mark the first as valid so that it's initialized below */
  validsubplans = bms_make_singleton(0);
@@ -212,10 +220,20 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
  */
  j = 0;
  firstvalid = nplans;
+ nasyncplans = 0;
+
  i = -1;
  while ((i = bms_next_member(validsubplans, i)) >= 0)
  {
  Plan   *initNode = (Plan *) list_nth(node->appendplans, i);
+ int sub_eflags = eflags;
+
+ /* Let async-capable subplans run asynchronously */
+ if (i < node->nasyncplans)
+ {
+ sub_eflags |= EXEC_FLAG_ASYNC;
+ nasyncplans++;
+ }
 
  /*
  * Record the lowest appendplans index which is a valid partial plan.
@@ -223,13 +241,28 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
  if (i >= node->first_partial_plan && j < firstvalid)
  firstvalid = j;
 
- appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
+ appendplanstates[j++] = ExecInitNode(initNode, estate, sub_eflags);
  }
 
  appendstate->as_first_partial_plan = firstvalid;
  appendstate->appendplans = appendplanstates;
  appendstate->as_nplans = nplans;
 
+ /* fill in async stuff */
+ appendstate->as_nasyncplans = nasyncplans;
+ appendstate->as_syncdone = (nasyncplans == nplans);
+
+ if (appendstate->as_nasyncplans)
+ {
+ appendstate->as_asyncresult = (TupleTableSlot **)
+ palloc0(node->nasyncplans * sizeof(TupleTableSlot *));
+
+ /* initially, all async requests need a request */
+ for (i = 0; i < appendstate->as_nasyncplans; ++i)
+ appendstate->as_needrequest =
+ bms_add_member(appendstate->as_needrequest, i);
+ }
+
  /*
  * Miscellaneous initialization
  */
@@ -253,21 +286,23 @@ ExecAppend(PlanState *pstate)
 {
  AppendState *node = castNode(AppendState, pstate);
 
- if (node->as_whichplan < 0)
+ if (node->as_whichsyncplan < 0)
  {
  /*
  * If no subplan has been chosen, we must choose one before
  * proceeding.
  */
- if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
+ if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
  !node->choose_next_subplan(node))
  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 
  /* Nothing to do if there are no matching subplans */
- else if (node->as_whichplan == NO_MATCHING_SUBPLANS)
+ else if (node->as_whichsyncplan == NO_MATCHING_SUBPLANS)
  return ExecClearTuple(node->ps.ps_ResultTupleSlot);
  }
 
+ Assert(node->as_nasyncplans == 0);
+
  for (;;)
  {
  PlanState  *subnode;
@@ -278,8 +313,9 @@ ExecAppend(PlanState *pstate)
  /*
  * figure out which subplan we are currently processing
  */
- Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
- subnode = node->appendplans[node->as_whichplan];
+ Assert(node->as_whichsyncplan >= 0 &&
+   node->as_whichsyncplan < node->as_nplans);
+ subnode = node->appendplans[node->as_whichsyncplan];
 
  /*
  * get a tuple from the subplan
@@ -302,6 +338,175 @@ ExecAppend(PlanState *pstate)
  }
 }
 
+static TupleTableSlot *
+ExecAppendAsync(PlanState *pstate)
+{
+ AppendState *node = castNode(AppendState, pstate);
+ Bitmapset *needrequest;
+ int i;
+
+ Assert(node->as_nasyncplans > 0);
+
+restart:
+ if (node->as_nasyncresult > 0)
+ {
+ --node->as_nasyncresult;
+ return node->as_asyncresult[node->as_nasyncresult];
+ }
+
+ needrequest = node->as_needrequest;
+ node->as_needrequest = NULL;
+ while ((i = bms_first_member(needrequest)) >= 0)
+ {
+ TupleTableSlot *slot;
+ PlanState *subnode = node->appendplans[i];
+
+ slot = ExecProcNode(subnode);
+ if (subnode->asyncstate == AS_AVAILABLE)
+ {
+ if (!TupIsNull(slot))
+ {
+ node->as_asyncresult[node->as_nasyncresult++] = slot;
+ node->as_needrequest = bms_add_member(node->as_needrequest, i);
+ }
+ }
+ else
+ node->as_pending_async = bms_add_member(node->as_pending_async, i);
+ }
+ bms_free(needrequest);
+
+ for (;;)
+ {
+ TupleTableSlot *result;
+
+ /* return now if a result is available */
+ if (node->as_nasyncresult > 0)
+ {
+ --node->as_nasyncresult;
+ return node->as_asyncresult[node->as_nasyncresult];
+ }
+
+ while (!bms_is_empty(node->as_pending_async))
+ {
+ long timeout = node->as_syncdone ? -1 : 0;
+ Bitmapset *fired;
+ int i;
+
+ fired = ExecAsyncEventWait(node->appendplans,
+   node->as_pending_async,
+   timeout);
+
+ if (bms_is_empty(fired) && node->as_syncdone)
+ {
+ /*
+ * No subplan fired. This happens when even in normal
+ * operation where the subnode already prepared results before
+ * waiting. as_pending_result is storing stale information so
+ * restart from the beginning.
+ */
+ node->as_needrequest = node->as_pending_async;
+ node->as_pending_async = NULL;
+ goto restart;
+ }
+
+ while ((i = bms_first_member(fired)) >= 0)
+ {
+ TupleTableSlot *slot;
+ PlanState *subnode = node->appendplans[i];
+ slot = ExecProcNode(subnode);
+ if (subnode->asyncstate == AS_AVAILABLE)
+ {
+ if (!TupIsNull(slot))
+ {
+ node->as_asyncresult[node->as_nasyncresult++] = slot;
+ node->as_needrequest =
+ bms_add_member(node->as_needrequest, i);
+ }
+ node->as_pending_async =
+ bms_del_member(node->as_pending_async, i);
+ }
+ }
+ bms_free(fired);
+
+ /* return now if a result is available */
+ if (node->as_nasyncresult > 0)
+ {
+ --node->as_nasyncresult;
+ return node->as_asyncresult[node->as_nasyncresult];
+ }
+
+ if (!node->as_syncdone)
+ break;
+ }
+
+ /*
+ * If there is no asynchronous activity still pending and the
+ * synchronous activity is also complete, we're totally done scanning
+ * this node.  Otherwise, we're done with the asynchronous stuff but
+ * must continue scanning the synchronous children.
+ */
+ if (node->as_syncdone)
+ {
+ Assert(bms_is_empty(node->as_pending_async));
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+
+ /*
+ * get a tuple from the subplan
+ */
+
+ if (node->as_whichsyncplan < 0)
+ {
+ /*
+ * If no subplan has been chosen, we must choose one before
+ * proceeding.
+ */
+ if (node->as_whichsyncplan == INVALID_SUBPLAN_INDEX &&
+ !node->choose_next_subplan(node))
+ {
+ node->as_syncdone = true;
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+
+ /* Nothing to do if there are no matching subplans */
+ else if (node->as_whichsyncplan == NO_MATCHING_SUBPLANS)
+ {
+ node->as_syncdone = true;
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+ }
+
+ result = ExecProcNode(node->appendplans[node->as_whichsyncplan]);
+
+ if (!TupIsNull(result))
+ {
+ /*
+ * If the subplan gave us something then return it as-is. We do
+ * NOT make use of the result slot that was set up in
+ * ExecInitAppend; there's no need for it.
+ */
+ return result;
+ }
+
+ /*
+ * Go on to the "next" subplan. If no more subplans, return the empty
+ * slot set up for us by ExecInitAppend, unless there are async plans
+ * we have yet to finish.
+ */
+ if (!node->choose_next_subplan(node))
+ {
+ node->as_syncdone = true;
+ if (bms_is_empty(node->as_pending_async))
+ {
+ Assert(bms_is_empty(node->as_needrequest));
+ return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ }
+ }
+
+ /* Else loop back and try to get a tuple from the new subplan */
+ }
+}
+
 /* ----------------------------------------------------------------
  * ExecEndAppend
  *
@@ -348,6 +553,15 @@ ExecReScanAppend(AppendState *node)
  node->as_valid_subplans = NULL;
  }
 
+ /* Reset async state. */
+ for (i = 0; i < node->as_nasyncplans; ++i)
+ {
+ ExecShutdownNode(node->appendplans[i]);
+ node->as_needrequest = bms_add_member(node->as_needrequest, i);
+ }
+ node->as_nasyncresult = 0;
+ node->as_syncdone = (node->as_nasyncplans == node->as_nplans);
+
  for (i = 0; i < node->as_nplans; i++)
  {
  PlanState  *subnode = node->appendplans[i];
@@ -368,7 +582,7 @@ ExecReScanAppend(AppendState *node)
  }
 
  /* Let choose_next_subplan_* function handle setting the first subplan */
- node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
 }
 
 /* ----------------------------------------------------------------
@@ -456,7 +670,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
 static bool
 choose_next_subplan_locally(AppendState *node)
 {
- int whichplan = node->as_whichplan;
+ int whichplan = node->as_whichsyncplan;
  int nextplan;
 
  /* We should never be called when there are no subplans */
@@ -475,6 +689,10 @@ choose_next_subplan_locally(AppendState *node)
  node->as_valid_subplans =
  ExecFindMatchingSubPlans(node->as_prune_state);
 
+ /* Exclude async plans */
+ if (node->as_nasyncplans > 0)
+ bms_del_range(node->as_valid_subplans, 0, node->as_nasyncplans - 1);
+
  whichplan = -1;
  }
 
@@ -489,7 +707,7 @@ choose_next_subplan_locally(AppendState *node)
  if (nextplan < 0)
  return false;
 
- node->as_whichplan = nextplan;
+ node->as_whichsyncplan = nextplan;
 
  return true;
 }
@@ -511,19 +729,19 @@ choose_next_subplan_for_leader(AppendState *node)
  Assert(ScanDirectionIsForward(node->ps.state->es_direction));
 
  /* We should never be called when there are no subplans */
- Assert(node->as_whichplan != NO_MATCHING_SUBPLANS);
+ Assert(node->as_whichsyncplan != NO_MATCHING_SUBPLANS);
 
  LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
 
- if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
+ if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
  {
  /* Mark just-completed subplan as finished. */
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
  }
  else
  {
  /* Start with last subplan. */
- node->as_whichplan = node->as_nplans - 1;
+ node->as_whichsyncplan = node->as_nplans - 1;
 
  /*
  * If we've yet to determine the valid subplans then do so now.  If
@@ -544,12 +762,12 @@ choose_next_subplan_for_leader(AppendState *node)
  }
 
  /* Loop until we find a subplan to execute. */
- while (pstate->pa_finished[node->as_whichplan])
+ while (pstate->pa_finished[node->as_whichsyncplan])
  {
- if (node->as_whichplan == 0)
+ if (node->as_whichsyncplan == 0)
  {
  pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
- node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_whichsyncplan = INVALID_SUBPLAN_INDEX;
  LWLockRelease(&pstate->pa_lock);
  return false;
  }
@@ -558,12 +776,12 @@ choose_next_subplan_for_leader(AppendState *node)
  * We needn't pay attention to as_valid_subplans here as all invalid
  * plans have been marked as finished.
  */
- node->as_whichplan--;
+ node->as_whichsyncplan--;
  }
 
  /* If non-partial, immediately mark as finished. */
- if (node->as_whichplan < node->as_first_partial_plan)
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ if (node->as_whichsyncplan < node->as_first_partial_plan)
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
  LWLockRelease(&pstate->pa_lock);
 
@@ -592,13 +810,13 @@ choose_next_subplan_for_worker(AppendState *node)
  Assert(ScanDirectionIsForward(node->ps.state->es_direction));
 
  /* We should never be called when there are no subplans */
- Assert(node->as_whichplan != NO_MATCHING_SUBPLANS);
+ Assert(node->as_whichsyncplan != NO_MATCHING_SUBPLANS);
 
  LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
 
  /* Mark just-completed subplan as finished. */
- if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ if (node->as_whichsyncplan != INVALID_SUBPLAN_INDEX)
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
  /*
  * If we've yet to determine the valid subplans then do so now.  If
@@ -620,7 +838,7 @@ choose_next_subplan_for_worker(AppendState *node)
  }
 
  /* Save the plan from which we are starting the search. */
- node->as_whichplan = pstate->pa_next_plan;
+ node->as_whichsyncplan = pstate->pa_next_plan;
 
  /* Loop until we find a valid subplan to execute. */
  while (pstate->pa_finished[pstate->pa_next_plan])
@@ -634,7 +852,7 @@ choose_next_subplan_for_worker(AppendState *node)
  /* Advance to the next valid plan. */
  pstate->pa_next_plan = nextplan;
  }
- else if (node->as_whichplan > node->as_first_partial_plan)
+ else if (node->as_whichsyncplan > node->as_first_partial_plan)
  {
  /*
  * Try looping back to the first valid partial plan, if there is
@@ -643,7 +861,7 @@ choose_next_subplan_for_worker(AppendState *node)
  nextplan = bms_next_member(node->as_valid_subplans,
    node->as_first_partial_plan - 1);
  pstate->pa_next_plan =
- nextplan < 0 ? node->as_whichplan : nextplan;
+ nextplan < 0 ? node->as_whichsyncplan : nextplan;
  }
  else
  {
@@ -651,10 +869,10 @@ choose_next_subplan_for_worker(AppendState *node)
  * At last plan, and either there are no partial plans or we've
  * tried them all.  Arrange to bail out.
  */
- pstate->pa_next_plan = node->as_whichplan;
+ pstate->pa_next_plan = node->as_whichsyncplan;
  }
 
- if (pstate->pa_next_plan == node->as_whichplan)
+ if (pstate->pa_next_plan == node->as_whichsyncplan)
  {
  /* We've tried everything! */
  pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
@@ -664,7 +882,7 @@ choose_next_subplan_for_worker(AppendState *node)
  }
 
  /* Pick the plan we found, and advance pa_next_plan one more time. */
- node->as_whichplan = pstate->pa_next_plan;
+ node->as_whichsyncplan = pstate->pa_next_plan;
  pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
    pstate->pa_next_plan);
 
@@ -691,8 +909,8 @@ choose_next_subplan_for_worker(AppendState *node)
  }
 
  /* If non-partial, immediately mark as finished. */
- if (node->as_whichplan < node->as_first_partial_plan)
- node->as_pstate->pa_finished[node->as_whichplan] = true;
+ if (node->as_whichsyncplan < node->as_first_partial_plan)
+ node->as_pstate->pa_finished[node->as_whichsyncplan] = true;
 
  LWLockRelease(&pstate->pa_lock);
 
diff --git a/src/backend/executor/nodeForeignscan.c b/src/backend/executor/nodeForeignscan.c
index 52af1dac5c..1a54383ec8 100644
--- a/src/backend/executor/nodeForeignscan.c
+++ b/src/backend/executor/nodeForeignscan.c
@@ -117,7 +117,6 @@ ExecForeignScan(PlanState *pstate)
  (ExecScanRecheckMtd) ForeignRecheck);
 }
 
-
 /* ----------------------------------------------------------------
  * ExecInitForeignScan
  * ----------------------------------------------------------------
@@ -141,6 +140,10 @@ ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags)
  scanstate->ss.ps.plan = (Plan *) node;
  scanstate->ss.ps.state = estate;
  scanstate->ss.ps.ExecProcNode = ExecForeignScan;
+ scanstate->ss.ps.asyncstate = AS_AVAILABLE;
+
+ if ((eflags & EXEC_FLAG_ASYNC) != 0)
+ scanstate->fs_async = true;
 
  /*
  * Miscellaneous initialization
@@ -384,3 +387,20 @@ ExecShutdownForeignScan(ForeignScanState *node)
  if (fdwroutine->ShutdownForeignScan)
  fdwroutine->ShutdownForeignScan(node);
 }
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanConfigureWait
+ *
+ * In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+bool
+ExecForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+  void *caller_data, bool reinit)
+{
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+ return fdwroutine->ForeignAsyncConfigureWait(node, wes,
+ caller_data, reinit);
+}
diff --git a/src/backend/nodes/bitmapset.c b/src/backend/nodes/bitmapset.c
index 665149defe..5d4e19a052 100644
--- a/src/backend/nodes/bitmapset.c
+++ b/src/backend/nodes/bitmapset.c
@@ -895,6 +895,78 @@ bms_add_range(Bitmapset *a, int lower, int upper)
  return a;
 }
 
+/*
+ * bms_del_range
+ * Delete members in the range of 'lower' to 'upper' from the set.
+ *
+ * Note this could also be done by calling bms_del_member in a loop, however,
+ * using this function will be faster when the range is large as we work at
+ * the bitmapword level rather than at bit level.
+ */
+Bitmapset *
+bms_del_range(Bitmapset *a, int lower, int upper)
+{
+ int lwordnum,
+ lbitnum,
+ uwordnum,
+ ushiftbits,
+ wordnum;
+
+ if (lower < 0 || upper < 0)
+ elog(ERROR, "negative bitmapset member not allowed");
+ if (lower > upper)
+ elog(ERROR, "lower range must not be above upper range");
+ uwordnum = WORDNUM(upper);
+
+ if (a == NULL)
+ {
+ a = (Bitmapset *) palloc0(BITMAPSET_SIZE(uwordnum + 1));
+ a->nwords = uwordnum + 1;
+ }
+
+ /* ensure we have enough words to store the upper bit */
+ else if (uwordnum >= a->nwords)
+ {
+ int oldnwords = a->nwords;
+ int i;
+
+ a = (Bitmapset *) repalloc(a, BITMAPSET_SIZE(uwordnum + 1));
+ a->nwords = uwordnum + 1;
+ /* zero out the enlarged portion */
+ for (i = oldnwords; i < a->nwords; i++)
+ a->words[i] = 0;
+ }
+
+ wordnum = lwordnum = WORDNUM(lower);
+
+ lbitnum = BITNUM(lower);
+ ushiftbits = BITNUM(upper) + 1;
+
+ /*
+ * Special case when lwordnum is the same as uwordnum we must perform the
+ * upper and lower masking on the word.
+ */
+ if (lwordnum == uwordnum)
+ {
+ a->words[lwordnum] &= ((bitmapword) (((bitmapword) 1 << lbitnum) - 1)
+ | (~(bitmapword) 0) << ushiftbits);
+ }
+ else
+ {
+ /* turn off lbitnum and all bits left of it */
+ a->words[wordnum++] &= (bitmapword) (((bitmapword) 1 << lbitnum) - 1);
+
+ /* turn off all bits for any intermediate words */
+ while (wordnum < uwordnum)
+ a->words[wordnum++] = (bitmapword) 0;
+
+ /* turn off upper's bit and all bits right of it. */
+ a->words[uwordnum] &= (~(bitmapword) 0) << ushiftbits;
+ }
+
+ return a;
+}
+
 /*
  * bms_int_members - like bms_intersect, but left input is recycled
  */
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index a74b56bb59..a266904010 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -244,6 +244,8 @@ _copyAppend(const Append *from)
  COPY_NODE_FIELD(appendplans);
  COPY_SCALAR_FIELD(first_partial_plan);
  COPY_NODE_FIELD(part_prune_info);
+ COPY_SCALAR_FIELD(nasyncplans);
+ COPY_SCALAR_FIELD(referent);
 
  return newnode;
 }
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index a80eccc2c1..bf87e721a5 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -434,6 +434,8 @@ _outAppend(StringInfo str, const Append *node)
  WRITE_NODE_FIELD(appendplans);
  WRITE_INT_FIELD(first_partial_plan);
  WRITE_NODE_FIELD(part_prune_info);
+ WRITE_INT_FIELD(nasyncplans);
+ WRITE_INT_FIELD(referent);
 }
 
 static void
diff --git a/src/backend/nodes/readfuncs.c b/src/backend/nodes/readfuncs.c
index 764e3bb90c..25b84b3f15 100644
--- a/src/backend/nodes/readfuncs.c
+++ b/src/backend/nodes/readfuncs.c
@@ -1639,6 +1639,8 @@ _readAppend(void)
  READ_NODE_FIELD(appendplans);
  READ_INT_FIELD(first_partial_plan);
  READ_NODE_FIELD(part_prune_info);
+ READ_INT_FIELD(nasyncplans);
+ READ_INT_FIELD(referent);
 
  READ_DONE();
 }
diff --git a/src/backend/optimizer/plan/createplan.c b/src/backend/optimizer/plan/createplan.c
index aee81bd755..c676980a30 100644
--- a/src/backend/optimizer/plan/createplan.c
+++ b/src/backend/optimizer/plan/createplan.c
@@ -207,6 +207,9 @@ static NamedTuplestoreScan *make_namedtuplestorescan(List *qptlist, List *qpqual
  Index scanrelid, char *enrname);
 static WorkTableScan *make_worktablescan(List *qptlist, List *qpqual,
  Index scanrelid, int wtParam);
+static Append *make_append(List *appendplans, int first_partial_plan,
+   int nasyncplans, int referent,
+   List *tlist, PartitionPruneInfo *partpruneinfos);
 static RecursiveUnion *make_recursive_union(List *tlist,
  Plan *lefttree,
  Plan *righttree,
@@ -292,6 +295,7 @@ static ModifyTable *make_modifytable(PlannerInfo *root,
  List *rowMarks, OnConflictExpr *onconflict, int epqParam);
 static GatherMerge *create_gather_merge_plan(PlannerInfo *root,
  GatherMergePath *best_path);
+static bool is_async_capable_path(Path *path);
 
 
 /*
@@ -1069,6 +1073,8 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
  bool tlist_was_changed = false;
  List   *pathkeys = best_path->path.pathkeys;
  List   *subplans = NIL;
+ List   *asyncplans = NIL;
+ List   *syncplans = NIL;
  ListCell   *subpaths;
  RelOptInfo *rel = best_path->path.parent;
  PartitionPruneInfo *partpruneinfo = NULL;
@@ -1077,6 +1083,9 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
  Oid   *nodeSortOperators = NULL;
  Oid   *nodeCollations = NULL;
  bool   *nodeNullsFirst = NULL;
+ int nasyncplans = 0;
+ bool first = true;
+ bool referent_is_sync = true;
 
  /*
  * The subpaths list could be empty, if every child was proven empty by
@@ -1206,6 +1215,23 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
  }
 
  subplans = lappend(subplans, subplan);
+
+ /*
+ * Classify as async-capable or not. If we have decided to run the
+ * chidlren in parallel, we cannot any one of them run asynchronously.
+ */
+ if (!best_path->path.parallel_safe && is_async_capable_path(subpath))
+ {
+ subplan->async_capable = true;
+ asyncplans = lappend(asyncplans, subplan);
+ ++nasyncplans;
+ if (first)
+ referent_is_sync = false;
+ }
+ else
+ syncplans = lappend(syncplans, subplan);
+
+ first = false;
  }
 
  /*
@@ -1244,6 +1270,18 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
  plan->first_partial_plan = best_path->first_partial_path;
  plan->part_prune_info = partpruneinfo;
 
+ /*
+ * XXX ideally, if there's just one child, we'd not bother to generate an
+ * Append node but just return the single child.  At the moment this does
+ * not work because the varno of the child scan plan won't match the
+ * parent-rel Vars it'll be asked to emit.
+ */
+
+ plan = make_append(list_concat(asyncplans, syncplans),
+   best_path->first_partial_path, nasyncplans,
+   referent_is_sync ? nasyncplans : 0, tlist,
+   partpruneinfo);
+
  copy_generic_path_info(&plan->plan, (Path *) best_path);
 
  /*
@@ -5462,6 +5500,27 @@ make_foreignscan(List *qptlist,
  return node;
 }
 
+static Append *
+make_append(List *appendplans, int first_partial_plan, int nasyncplans,
+ int referent, List *tlist, PartitionPruneInfo *partpruneinfo)
+{
+ Append   *node = makeNode(Append);
+ Plan   *plan = &node->plan;
+
+ plan->targetlist = tlist;
+ plan->qual = NIL;
+ plan->lefttree = NULL;
+ plan->righttree = NULL;
+
+ node->appendplans = appendplans;
+ node->first_partial_plan = first_partial_plan;
+ node->part_prune_info = partpruneinfo;
+ node->nasyncplans = nasyncplans;
+ node->referent = referent;
+
+ return node;
+}
+
 static RecursiveUnion *
 make_recursive_union(List *tlist,
  Plan *lefttree,
@@ -6836,3 +6895,27 @@ is_projection_capable_plan(Plan *plan)
  }
  return true;
 }
+
+/*
+ * is_projection_capable_path
+ * Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+ switch (nodeTag(path))
+ {
+ case T_ForeignPath:
+ {
+ FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+ Assert(fdwroutine != NULL);
+ if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+ fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+ return true;
+ }
+ default:
+ break;
+ }
+ return false;
+}
diff --git a/src/backend/postmaster/pgstat.c b/src/backend/postmaster/pgstat.c
index fabcf31de8..575ccd5def 100644
--- a/src/backend/postmaster/pgstat.c
+++ b/src/backend/postmaster/pgstat.c
@@ -3853,6 +3853,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
  case WAIT_EVENT_SYNC_REP:
  event_name = "SyncRep";
  break;
+ case WAIT_EVENT_ASYNC_WAIT:
+ event_name = "AsyncExecWait";
+ break;
  /* no default case, so that compiler will warn */
  }
 
diff --git a/src/backend/postmaster/syslogger.c b/src/backend/postmaster/syslogger.c
index bb2baff763..7669e6ff53 100644
--- a/src/backend/postmaster/syslogger.c
+++ b/src/backend/postmaster/syslogger.c
@@ -306,7 +306,7 @@ SysLoggerMain(int argc, char *argv[])
  * syslog pipe, which implies that all other backends have exited
  * (including the postmaster).
  */
- wes = CreateWaitEventSet(CurrentMemoryContext, 2);
+ wes = CreateWaitEventSet(CurrentMemoryContext, NULL, 2);
  AddWaitEventToSet(wes, WL_LATCH_SET, PGINVALID_SOCKET, MyLatch, NULL);
 #ifndef WIN32
  AddWaitEventToSet(wes, WL_SOCKET_READABLE, syslogPipe[0], NULL, NULL);
diff --git a/src/backend/utils/adt/ruleutils.c b/src/backend/utils/adt/ruleutils.c
index 13685a0a0e..60b749f062 100644
--- a/src/backend/utils/adt/ruleutils.c
+++ b/src/backend/utils/adt/ruleutils.c
@@ -4640,7 +4640,7 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps)
  dpns->planstate = ps;
 
  /*
- * We special-case Append and MergeAppend to pretend that the first child
+ * We special-case Append and MergeAppend to pretend that a specific child
  * plan is the OUTER referent; we have to interpret OUTER Vars in their
  * tlists according to one of the children, and the first one is the most
  * natural choice.  Likewise special-case ModifyTable to pretend that the
@@ -4648,7 +4648,11 @@ set_deparse_planstate(deparse_namespace *dpns, PlanState *ps)
  * lists containing references to non-target relations.
  */
  if (IsA(ps, AppendState))
- dpns->outer_planstate = ((AppendState *) ps)->appendplans[0];
+ {
+ AppendState *aps = (AppendState *) ps;
+ Append *app = (Append *) ps->plan;
+ dpns->outer_planstate = aps->appendplans[app->referent];
+ }
  else if (IsA(ps, MergeAppendState))
  dpns->outer_planstate = ((MergeAppendState *) ps)->mergeplans[0];
  else if (IsA(ps, ModifyTableState))
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644
index 0000000000..5fd67d9004
--- /dev/null
+++ b/src/include/executor/execAsync.h
@@ -0,0 +1,23 @@
+/*--------------------------------------------------------------------
+ * execAsync.c
+ * Support functions for asynchronous query execution
+ *
+ * Portions Copyright (c) 1996-2017, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *--------------------------------------------------------------------
+ */
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+#include "storage/latch.h"
+
+extern void ExecAsyncSetState(PlanState *pstate, AsyncState status);
+extern bool ExecAsyncConfigureWait(WaitEventSet *wes, PlanState *node,
+   void *data, bool reinit);
+extern Bitmapset *ExecAsyncEventWait(PlanState **nodes, Bitmapset *waitnodes,
+ long timeout);
+#endif   /* EXECASYNC_H */
diff --git a/src/include/executor/executor.h b/src/include/executor/executor.h
index 6298c7c8ca..4adb2efe76 100644
--- a/src/include/executor/executor.h
+++ b/src/include/executor/executor.h
@@ -59,6 +59,7 @@
 #define EXEC_FLAG_MARK 0x0008 /* need mark/restore */
 #define EXEC_FLAG_SKIP_TRIGGERS 0x0010 /* skip AfterTrigger calls */
 #define EXEC_FLAG_WITH_NO_DATA 0x0020 /* rel scannability doesn't matter */
+#define EXEC_FLAG_ASYNC 0x0040 /* request async execution */
 
 
 /* Hook for plugins to get control in ExecutorStart() */
diff --git a/src/include/executor/nodeForeignscan.h b/src/include/executor/nodeForeignscan.h
index ca7723c899..81791033af 100644
--- a/src/include/executor/nodeForeignscan.h
+++ b/src/include/executor/nodeForeignscan.h
@@ -30,5 +30,8 @@ extern void ExecForeignScanReInitializeDSM(ForeignScanState *node,
 extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
  ParallelWorkerContext *pwcxt);
 extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern bool ExecForeignAsyncConfigureWait(ForeignScanState *node,
+  WaitEventSet *wes,
+  void *caller_data, bool reinit);
 
 #endif /* NODEFOREIGNSCAN_H */
diff --git a/src/include/foreign/fdwapi.h b/src/include/foreign/fdwapi.h
index 822686033e..851cd15e65 100644
--- a/src/include/foreign/fdwapi.h
+++ b/src/include/foreign/fdwapi.h
@@ -169,6 +169,11 @@ typedef bool (*IsForeignScanParallelSafe_function) (PlannerInfo *root,
 typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
  List *fdw_private,
  RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+typedef bool (*ForeignAsyncConfigureWait_function) (ForeignScanState *node,
+ WaitEventSet *wes,
+ void *caller_data,
+ bool reinit);
 
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
@@ -190,6 +195,7 @@ typedef struct FdwRoutine
  GetForeignPlan_function GetForeignPlan;
  BeginForeignScan_function BeginForeignScan;
  IterateForeignScan_function IterateForeignScan;
+ IterateForeignScan_function IterateForeignScanAsync;
  ReScanForeignScan_function ReScanForeignScan;
  EndForeignScan_function EndForeignScan;
 
@@ -242,6 +248,11 @@ typedef struct FdwRoutine
  InitializeDSMForeignScan_function InitializeDSMForeignScan;
  ReInitializeDSMForeignScan_function ReInitializeDSMForeignScan;
  InitializeWorkerForeignScan_function InitializeWorkerForeignScan;
+
+ /* Support functions for asynchronous execution */
+ IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+ ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+
  ShutdownForeignScan_function ShutdownForeignScan;
 
  /* Support functions for path reparameterization. */
diff --git a/src/include/nodes/bitmapset.h b/src/include/nodes/bitmapset.h
index 0c645628e5..d97a4c2235 100644
--- a/src/include/nodes/bitmapset.h
+++ b/src/include/nodes/bitmapset.h
@@ -107,6 +107,7 @@ extern Bitmapset *bms_add_members(Bitmapset *a, const Bitmapset *b);
 extern Bitmapset *bms_add_range(Bitmapset *a, int lower, int upper);
 extern Bitmapset *bms_int_members(Bitmapset *a, const Bitmapset *b);
 extern Bitmapset *bms_del_members(Bitmapset *a, const Bitmapset *b);
+extern Bitmapset *bms_del_range(Bitmapset *a, int lower, int upper);
 extern Bitmapset *bms_join(Bitmapset *a, Bitmapset *b);
 
 /* support for iterating through the integer elements of a set: */
diff --git a/src/include/nodes/execnodes.h b/src/include/nodes/execnodes.h
index 6eb647290b..46d7fbab3a 100644
--- a/src/include/nodes/execnodes.h
+++ b/src/include/nodes/execnodes.h
@@ -932,6 +932,12 @@ typedef TupleTableSlot *(*ExecProcNodeMtd) (struct PlanState *pstate);
  * abstract superclass for all PlanState-type nodes.
  * ----------------
  */
+typedef enum AsyncState
+{
+ AS_AVAILABLE,
+ AS_WAITING
+} AsyncState;
+
 typedef struct PlanState
 {
  NodeTag type;
@@ -1020,6 +1026,11 @@ typedef struct PlanState
  bool outeropsset;
  bool inneropsset;
  bool resultopsset;
+
+ /* Async subnode execution sutff */
+ AsyncState asyncstate;
+
+ int32 padding; /* to keep alignment of derived types */
 } PlanState;
 
 /* ----------------
@@ -1216,14 +1227,20 @@ struct AppendState
  PlanState ps; /* its first field is NodeTag */
  PlanState **appendplans; /* array of PlanStates for my inputs */
  int as_nplans;
- int as_whichplan;
+ int as_whichsyncplan; /* which sync plan is being executed  */
  int as_first_partial_plan; /* Index of 'appendplans' containing
  * the first partial plan */
+ int as_nasyncplans; /* # of async-capable children */
  ParallelAppendState *as_pstate; /* parallel coordination info */
  Size pstate_len; /* size of parallel coordination info */
  struct PartitionPruneState *as_prune_state;
  Bitmapset  *as_valid_subplans;
  bool (*choose_next_subplan) (AppendState *);
+ bool as_syncdone; /* all synchronous plans done? */
+ Bitmapset  *as_needrequest; /* async plans needing a new request */
+ Bitmapset  *as_pending_async; /* pending async plans */
+ TupleTableSlot **as_asyncresult; /* unreturned results of async plans */
+ int as_nasyncresult; /* # of valid entries in as_asyncresult */
 };
 
 /* ----------------
@@ -1786,6 +1803,7 @@ typedef struct ForeignScanState
  Size pscan_len; /* size of parallel coordination information */
  /* use struct pointer to avoid including fdwapi.h here */
  struct FdwRoutine *fdwroutine;
+ bool fs_async;
  void   *fdw_state; /* foreign-data wrapper can keep state here */
 } ForeignScanState;
 
diff --git a/src/include/nodes/plannodes.h b/src/include/nodes/plannodes.h
index 8e6594e355..26810915e5 100644
--- a/src/include/nodes/plannodes.h
+++ b/src/include/nodes/plannodes.h
@@ -133,6 +133,11 @@ typedef struct Plan
  bool parallel_aware; /* engage parallel-aware logic? */
  bool parallel_safe; /* OK to use as part of parallel plan? */
 
+ /*
+ * information needed for asynchronous execution
+ */
+ bool async_capable;  /* engage asyncronous execution logic? */
+
  /*
  * Common structural data for all Plan types.
  */
@@ -259,6 +264,10 @@ typedef struct Append
 
  /* Info for run-time subplan pruning; NULL if we're not doing that */
  struct PartitionPruneInfo *part_prune_info;
+
+ /* Async child node execution stuff */
+ int nasyncplans; /* # async subplans, always at start of list */
+ int referent; /* index of inheritance tree referent */
 } Append;
 
 /* ----------------
diff --git a/src/include/pgstat.h b/src/include/pgstat.h
index fe076d823d..d57ef809fc 100644
--- a/src/include/pgstat.h
+++ b/src/include/pgstat.h
@@ -853,7 +853,8 @@ typedef enum
  WAIT_EVENT_REPLICATION_ORIGIN_DROP,
  WAIT_EVENT_REPLICATION_SLOT_DROP,
  WAIT_EVENT_SAFE_SNAPSHOT,
- WAIT_EVENT_SYNC_REP
+ WAIT_EVENT_SYNC_REP,
+ WAIT_EVENT_ASYNC_WAIT
 } WaitEventIPC;
 
 /* ----------
--
2.23.0


From 8984d4e3360eaf79fc0f23b3e6817770be13fcfd Mon Sep 17 00:00:00 2001
From: Kyotaro Horiguchi <[hidden email]>
Date: Thu, 19 Oct 2017 17:24:07 +0900
Subject: [PATCH v1 3/3] async postgres_fdw

---
 contrib/postgres_fdw/connection.c             |  26 +
 .../postgres_fdw/expected/postgres_fdw.out    | 222 ++++---
 contrib/postgres_fdw/postgres_fdw.c           | 615 ++++++++++++++++--
 contrib/postgres_fdw/postgres_fdw.h           |   2 +
 contrib/postgres_fdw/sql/postgres_fdw.sql     |  20 +-
 5 files changed, 711 insertions(+), 174 deletions(-)

diff --git a/contrib/postgres_fdw/connection.c b/contrib/postgres_fdw/connection.c
index 27b86a03f8..1afd99cad8 100644
--- a/contrib/postgres_fdw/connection.c
+++ b/contrib/postgres_fdw/connection.c
@@ -56,6 +56,7 @@ typedef struct ConnCacheEntry
  bool invalidated; /* true if reconnect is pending */
  uint32 server_hashvalue; /* hash value of foreign server OID */
  uint32 mapping_hashvalue; /* hash value of user mapping OID */
+ void *storage; /* connection specific storage */
 } ConnCacheEntry;
 
 /*
@@ -200,6 +201,7 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
 
  elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
  entry->conn, server->servername, user->umid, user->userid);
+ entry->storage = NULL;
  }
 
  /*
@@ -213,6 +215,30 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
  return entry->conn;
 }
 
+/*
+ * Rerturns the connection specific storage for this user. Allocate with
+ * initsize if not exists.
+ */
+void *
+GetConnectionSpecificStorage(UserMapping *user, size_t initsize)
+{
+ bool found;
+ ConnCacheEntry *entry;
+ ConnCacheKey key;
+
+ key = user->umid;
+ entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
+ Assert(found);
+
+ if (entry->storage == NULL)
+ {
+ entry->storage = MemoryContextAlloc(CacheMemoryContext, initsize);
+ memset(entry->storage, 0, initsize);
+ }
+
+ return entry->storage;
+}
+
 /*
  * Connect to remote server using specified server and user mapping properties.
  */
diff --git a/contrib/postgres_fdw/expected/postgres_fdw.out b/contrib/postgres_fdw/expected/postgres_fdw.out
index 48282ab151..bd2e835d2d 100644
--- a/contrib/postgres_fdw/expected/postgres_fdw.out
+++ b/contrib/postgres_fdw/expected/postgres_fdw.out
@@ -6900,7 +6900,7 @@ INSERT INTO a(aa) VALUES('aaaaa');
 INSERT INTO b(aa) VALUES('bbb');
 INSERT INTO b(aa) VALUES('bbbb');
 INSERT INTO b(aa) VALUES('bbbbb');
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |  aa  
 ----------+-------
  a        | aaa
@@ -6928,7 +6928,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa  
 ----------+--------
  a        | aaa
@@ -6956,7 +6956,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE b SET aa = 'new';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa  
 ----------+--------
  a        | aaa
@@ -6984,7 +6984,7 @@ SELECT tableoid::regclass, * FROM ONLY a;
 (3 rows)
 
 UPDATE a SET aa = 'newtoo';
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
  tableoid |   aa  
 ----------+--------
  a        | newtoo
@@ -7054,35 +7054,41 @@ insert into bar2 values(3,33,33);
 insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-                                          QUERY PLAN                                          
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+                                                   QUERY PLAN                                                    
+-----------------------------------------------------------------------------------------------------------------
  LockRows
    Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
-   ->  Hash Join
+   ->  Merge Join
          Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
          Inner Unique: true
-         Hash Cond: (bar.f1 = foo.f1)
-         ->  Append
-               ->  Seq Scan on public.bar
+         Merge Cond: (bar.f1 = foo.f1)
+         ->  Merge Append
+               Sort Key: bar.f1
+               ->  Sort
                      Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
+                     Sort Key: bar.f1
+                     ->  Seq Scan on public.bar
+                           Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
                ->  Foreign Scan on public.bar2 bar_1
                      Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
-                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR UPDATE
-         ->  Hash
+                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR UPDATE
+         ->  Sort
                Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+               Sort Key: foo.f1
                ->  HashAggregate
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
-                           ->  Foreign Scan on public.foo2 foo_1
+                           Async subplans: 1
+                           ->  Async Foreign Scan on public.foo2 foo_1
                                  Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+(29 rows)
 
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
  f1 | f2
 ----+----
   1 | 11
@@ -7092,35 +7098,41 @@ select * from bar where f1 in (select f1 from foo) for update;
 (4 rows)
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-                                          QUERY PLAN                                          
-----------------------------------------------------------------------------------------------
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+                                                   QUERY PLAN                                                  
+----------------------------------------------------------------------------------------------------------------
  LockRows
    Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
-   ->  Hash Join
+   ->  Merge Join
          Output: bar.f1, bar.f2, bar.ctid, foo.ctid, bar.*, bar.tableoid, foo.*, foo.tableoid
          Inner Unique: true
-         Hash Cond: (bar.f1 = foo.f1)
-         ->  Append
-               ->  Seq Scan on public.bar
+         Merge Cond: (bar.f1 = foo.f1)
+         ->  Merge Append
+               Sort Key: bar.f1
+               ->  Sort
                      Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
+                     Sort Key: bar.f1
+                     ->  Seq Scan on public.bar
+                           Output: bar.f1, bar.f2, bar.ctid, bar.*, bar.tableoid
                ->  Foreign Scan on public.bar2 bar_1
                      Output: bar_1.f1, bar_1.f2, bar_1.ctid, bar_1.*, bar_1.tableoid
-                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 FOR SHARE
-         ->  Hash
+                     Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct2 ORDER BY f1 ASC NULLS LAST FOR SHARE
+         ->  Sort
                Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+               Sort Key: foo.f1
                ->  HashAggregate
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
-                           ->  Foreign Scan on public.foo2 foo_1
+                           Async subplans: 1
+                           ->  Async Foreign Scan on public.foo2 foo_1
                                  Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(23 rows)
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+(29 rows)
 
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
  f1 | f2
 ----+----
   1 | 11
@@ -7150,11 +7162,12 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
-                           ->  Foreign Scan on public.foo2 foo_1
+                           Async subplans: 1
+                           ->  Async Foreign Scan on public.foo2 foo_1
                                  Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
    ->  Hash Join
          Output: bar_1.f1, (bar_1.f2 + 100), bar_1.f3, bar_1.ctid, foo.ctid, foo.*, foo.tableoid
          Inner Unique: true
@@ -7168,12 +7181,13 @@ update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
                      Output: foo.ctid, foo.f1, foo.*, foo.tableoid
                      Group Key: foo.f1
                      ->  Append
-                           ->  Seq Scan on public.foo
-                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
-                           ->  Foreign Scan on public.foo2 foo_1
+                           Async subplans: 1
+                           ->  Async Foreign Scan on public.foo2 foo_1
                                  Output: foo_1.ctid, foo_1.f1, foo_1.*, foo_1.tableoid
                                  Remote SQL: SELECT f1, f2, f3, ctid FROM public.loct1
-(39 rows)
+                           ->  Seq Scan on public.foo
+                                 Output: foo.ctid, foo.f1, foo.*, foo.tableoid
+(41 rows)
 
 update bar set f2 = f2 + 100 where f1 in (select f1 from foo);
 select tableoid::regclass, * from bar order by 1,2;
@@ -7203,16 +7217,17 @@ where bar.f1 = ss.f1;
          Output: bar.f1, (bar.f2 + 100), bar.ctid, (ROW(foo.f1))
          Hash Cond: (foo.f1 = bar.f1)
          ->  Append
-               ->  Seq Scan on public.foo
-                     Output: ROW(foo.f1), foo.f1
-               ->  Foreign Scan on public.foo2 foo_1
+               Async subplans: 2
+               ->  Async Foreign Scan on public.foo2 foo_1
                      Output: ROW(foo_1.f1), foo_1.f1
                      Remote SQL: SELECT f1 FROM public.loct1
-               ->  Seq Scan on public.foo foo_2
-                     Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
-               ->  Foreign Scan on public.foo2 foo_3
+               ->  Async Foreign Scan on public.foo2 foo_3
                      Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
                      Remote SQL: SELECT f1 FROM public.loct1
+               ->  Seq Scan on public.foo
+                     Output: ROW(foo.f1), foo.f1
+               ->  Seq Scan on public.foo foo_2
+                     Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
          ->  Hash
                Output: bar.f1, bar.f2, bar.ctid
                ->  Seq Scan on public.bar
@@ -7230,17 +7245,18 @@ where bar.f1 = ss.f1;
                Output: (ROW(foo.f1)), foo.f1
                Sort Key: foo.f1
                ->  Append
-                     ->  Seq Scan on public.foo
-                           Output: ROW(foo.f1), foo.f1
-                     ->  Foreign Scan on public.foo2 foo_1
+                     Async subplans: 2
+                     ->  Async Foreign Scan on public.foo2 foo_1
                            Output: ROW(foo_1.f1), foo_1.f1
                            Remote SQL: SELECT f1 FROM public.loct1
-                     ->  Seq Scan on public.foo foo_2
-                           Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
-                     ->  Foreign Scan on public.foo2 foo_3
+                     ->  Async Foreign Scan on public.foo2 foo_3
                            Output: ROW((foo_3.f1 + 3)), (foo_3.f1 + 3)
                            Remote SQL: SELECT f1 FROM public.loct1
-(45 rows)
+                     ->  Seq Scan on public.foo
+                           Output: ROW(foo.f1), foo.f1
+                     ->  Seq Scan on public.foo foo_2
+                           Output: ROW((foo_2.f1 + 3)), (foo_2.f1 + 3)
+(47 rows)
 
 update bar set f2 = f2 + 100
 from
@@ -7390,27 +7406,33 @@ delete from foo where f1 < 5 returning *;
 (5 rows)
 
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-                                  QUERY PLAN                                  
-------------------------------------------------------------------------------
- Update on public.bar
-   Output: bar.f1, bar.f2
-   Update on public.bar
-   Foreign Update on public.bar2 bar_1
-   ->  Seq Scan on public.bar
-         Output: bar.f1, (bar.f2 + 100), bar.ctid
-   ->  Foreign Update on public.bar2 bar_1
-         Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
-(8 rows)
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+                                      QUERY PLAN                                      
+--------------------------------------------------------------------------------------
+ Sort
+   Output: u.f1, u.f2
+   Sort Key: u.f1
+   CTE u
+     ->  Update on public.bar
+           Output: bar.f1, bar.f2
+           Update on public.bar
+           Foreign Update on public.bar2 bar_1
+           ->  Seq Scan on public.bar
+                 Output: bar.f1, (bar.f2 + 100), bar.ctid
+           ->  Foreign Update on public.bar2 bar_1
+                 Remote SQL: UPDATE public.loct2 SET f2 = (f2 + 100) RETURNING f1, f2
+   ->  CTE Scan on u
+         Output: u.f1, u.f2
+(14 rows)
 
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
  f1 | f2  
 ----+-----
   1 | 311
   2 | 322
-  6 | 266
   3 | 333
   4 | 344
+  6 | 266
   7 | 277
 (6 rows)
 
@@ -8485,11 +8507,12 @@ SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER J
  Sort
    Sort Key: t1.a, t3.c
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 2
+         ->  Async Foreign Scan
                Relations: ((ftprt1_p1 t1) INNER JOIN (ftprt2_p1 t2)) INNER JOIN (ftprt1_p1 t3)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: ((ftprt1_p2 t1_1) INNER JOIN (ftprt2_p2 t2_1)) INNER JOIN (ftprt1_p2 t3_1)
-(7 rows)
+(8 rows)
 
 SELECT t1.a,t2.b,t3.c FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) INNER JOIN fprt1 t3 ON (t2.b = t3.a) WHERE t1.a % 25 =0 ORDER BY 1,2,3;
   a  |  b  |  c  
@@ -8524,20 +8547,22 @@ SELECT t1.a,t2.b,t2.c FROM fprt1 t1 LEFT JOIN (SELECT * FROM fprt2 WHERE a < 10)
 -- with whole-row reference; partitionwise join does not apply
 EXPLAIN (COSTS OFF)
 SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2 WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
-                       QUERY PLAN                      
---------------------------------------------------------
+                          QUERY PLAN                          
+--------------------------------------------------------------
  Sort
    Sort Key: ((t1.*)::fprt1), ((t2.*)::fprt2)
    ->  Hash Full Join
          Hash Cond: (t1.a = t2.b)
          ->  Append
-               ->  Foreign Scan on ftprt1_p1 t1
-               ->  Foreign Scan on ftprt1_p2 t1_1
+               Async subplans: 2
+               ->  Async Foreign Scan on ftprt1_p1 t1
+               ->  Async Foreign Scan on ftprt1_p2 t1_1
          ->  Hash
                ->  Append
-                     ->  Foreign Scan on ftprt2_p1 t2
-                     ->  Foreign Scan on ftprt2_p2 t2_1
-(11 rows)
+                     Async subplans: 2
+                     ->  Async Foreign Scan on ftprt2_p1 t2
+                     ->  Async Foreign Scan on ftprt2_p2 t2_1
+(13 rows)
 
 SELECT t1.wr, t2.wr FROM (SELECT t1 wr, a FROM fprt1 t1 WHERE t1.a % 25 = 0) t1 FULL JOIN (SELECT t2 wr, b FROM fprt2 t2 WHERE t2.b % 25 = 0) t2 ON (t1.a = t2.b) ORDER BY 1,2;
        wr       |       wr      
@@ -8566,11 +8591,12 @@ SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t
  Sort
    Sort Key: t1.a, t1.b
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 2
+         ->  Async Foreign Scan
                Relations: (ftprt1_p1 t1) INNER JOIN (ftprt2_p1 t2)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: (ftprt1_p2 t1_1) INNER JOIN (ftprt2_p2 t2_1)
-(7 rows)
+(8 rows)
 
 SELECT t1.a,t1.b FROM fprt1 t1, LATERAL (SELECT t2.a, t2.b FROM fprt2 t2 WHERE t1.a = t2.b AND t1.b = t2.a) q WHERE t1.a%25 = 0 ORDER BY 1,2;
   a  |  b  
@@ -8623,21 +8649,23 @@ SELECT t1.a, t1.phv, t2.b, t2.phv FROM (SELECT 't1_phv' phv, * FROM fprt1 WHERE
 -- test FOR UPDATE; partitionwise join does not apply
 EXPLAIN (COSTS OFF)
 SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1;
-                          QUERY PLAN                          
---------------------------------------------------------------
+                             QUERY PLAN                            
+--------------------------------------------------------------------
  LockRows
    ->  Sort
          Sort Key: t1.a
          ->  Hash Join
                Hash Cond: (t2.b = t1.a)
                ->  Append
-                     ->  Foreign Scan on ftprt2_p1 t2
-                     ->  Foreign Scan on ftprt2_p2 t2_1
+                     Async subplans: 2
+                     ->  Async Foreign Scan on ftprt2_p1 t2
+                     ->  Async Foreign Scan on ftprt2_p2 t2_1
                ->  Hash
                      ->  Append
-                           ->  Foreign Scan on ftprt1_p1 t1
-                           ->  Foreign Scan on ftprt1_p2 t1_1
-(12 rows)
+                           Async subplans: 2
+                           ->  Async Foreign Scan on ftprt1_p1 t1
+                           ->  Async Foreign Scan on ftprt1_p2 t1_1
+(14 rows)
 
 SELECT t1.a, t2.b FROM fprt1 t1 INNER JOIN fprt2 t2 ON (t1.a = t2.b) WHERE t1.a % 25 = 0 ORDER BY 1,2 FOR UPDATE OF t1;
   a  |  b  
@@ -8672,18 +8700,19 @@ ANALYZE fpagg_tab_p3;
 SET enable_partitionwise_aggregate TO false;
 EXPLAIN (COSTS OFF)
 SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
-                        QUERY PLAN                        
------------------------------------------------------------
+                           QUERY PLAN                            
+-----------------------------------------------------------------
  Sort
    Sort Key: pagg_tab.a
    ->  HashAggregate
          Group Key: pagg_tab.a
          Filter: (avg(pagg_tab.b) < '22'::numeric)
          ->  Append
-               ->  Foreign Scan on fpagg_tab_p1 pagg_tab
-               ->  Foreign Scan on fpagg_tab_p2 pagg_tab_1
-               ->  Foreign Scan on fpagg_tab_p3 pagg_tab_2
-(9 rows)
+               Async subplans: 3
+               ->  Async Foreign Scan on fpagg_tab_p1 pagg_tab
+               ->  Async Foreign Scan on fpagg_tab_p2 pagg_tab_1
+               ->  Async Foreign Scan on fpagg_tab_p3 pagg_tab_2
+(10 rows)
 
 -- Plan with partitionwise aggregates is enabled
 SET enable_partitionwise_aggregate TO true;
@@ -8694,13 +8723,14 @@ SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 O
  Sort
    Sort Key: pagg_tab.a
    ->  Append
-         ->  Foreign Scan
+         Async subplans: 3
+         ->  Async Foreign Scan
                Relations: Aggregate on (fpagg_tab_p1 pagg_tab)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: Aggregate on (fpagg_tab_p2 pagg_tab_1)
-         ->  Foreign Scan
+         ->  Async Foreign Scan
                Relations: Aggregate on (fpagg_tab_p3 pagg_tab_2)
-(9 rows)
+(10 rows)
 
 SELECT a, sum(b), min(b), count(*) FROM pagg_tab GROUP BY a HAVING avg(b) < 22 ORDER BY 1;
  a  | sum  | min | count
diff --git a/contrib/postgres_fdw/postgres_fdw.c b/contrib/postgres_fdw/postgres_fdw.c
index bdc21b36d1..f3212aac90 100644
--- a/contrib/postgres_fdw/postgres_fdw.c
+++ b/contrib/postgres_fdw/postgres_fdw.c
@@ -21,6 +21,8 @@
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
+#include "executor/execAsync.h"
+#include "executor/nodeForeignscan.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -35,6 +37,7 @@
 #include "optimizer/restrictinfo.h"
 #include "optimizer/tlist.h"
 #include "parser/parsetree.h"
+#include "pgstat.h"
 #include "postgres_fdw.h"
 #include "utils/builtins.h"
 #include "utils/float.h"
@@ -56,6 +59,9 @@ PG_MODULE_MAGIC;
 /* If no remote estimates, assume a sort costs 20% extra */
 #define DEFAULT_FDW_SORT_MULTIPLIER 1.2
 
+/* Retrive PgFdwScanState struct from ForeginScanState */
+#define GetPgFdwScanState(n) ((PgFdwScanState *)(n)->fdw_state)
+
 /*
  * Indexes of FDW-private information stored in fdw_private lists.
  *
@@ -122,11 +128,28 @@ enum FdwDirectModifyPrivateIndex
  FdwDirectModifyPrivateSetProcessed
 };
 
+/*
+ * Connection private area structure.
+ */
+typedef struct PgFdwConnpriv
+{
+ ForeignScanState   *leader; /* leader node of this connection */
+ bool busy; /* true if this connection is busy */
+} PgFdwConnpriv;
+
+/* Execution state base type */
+typedef struct PgFdwState
+{
+ PGconn   *conn; /* connection for the scan */
+ PgFdwConnpriv *connpriv; /* connection private memory */
+} PgFdwState;
+
 /*
  * Execution state of a foreign scan using postgres_fdw.
  */
 typedef struct PgFdwScanState
 {
+ PgFdwState s; /* common structure */
  Relation rel; /* relcache entry for the foreign table. NULL
  * for a foreign join scan. */
  TupleDesc tupdesc; /* tuple descriptor of scan */
@@ -137,7 +160,7 @@ typedef struct PgFdwScanState
  List   *retrieved_attrs; /* list of retrieved attribute numbers */
 
  /* for remote query execution */
- PGconn   *conn; /* connection for the scan */
+ bool result_ready;
  unsigned int cursor_number; /* quasi-unique ID for my cursor */
  bool cursor_exists; /* have we created the cursor? */
  int numParams; /* number of parameters passed to query */
@@ -153,6 +176,12 @@ typedef struct PgFdwScanState
  /* batch-level state, for optimizing rewinds and avoiding useless fetch */
  int fetch_ct_2; /* Min(# of fetches done, 2) */
  bool eof_reached; /* true if last fetch reached EOF */
+ bool run_async; /* true if run asynchronously */
+ bool inqueue; /* true if this node is in waiter queue */
+ ForeignScanState *waiter; /* Next node to run a query among nodes
+ * sharing the same connection */
+ ForeignScanState *last_waiter; /* last waiting node in waiting queue.
+ * valid only on the leader node */
 
  /* working memory contexts */
  MemoryContext batch_cxt; /* context holding current batch of tuples */
@@ -166,11 +195,11 @@ typedef struct PgFdwScanState
  */
 typedef struct PgFdwModifyState
 {
+ PgFdwState s; /* common structure */
  Relation rel; /* relcache entry for the foreign table */
  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
 
  /* for remote query execution */
- PGconn   *conn; /* connection for the scan */
  char   *p_name; /* name of prepared statement, if created */
 
  /* extracted fdw_private data */
@@ -197,6 +226,7 @@ typedef struct PgFdwModifyState
  */
 typedef struct PgFdwDirectModifyState
 {
+ PgFdwState s; /* common structure */
  Relation rel; /* relcache entry for the foreign table */
  AttInMetadata *attinmeta; /* attribute datatype conversion metadata */
 
@@ -326,6 +356,7 @@ static void postgresBeginForeignScan(ForeignScanState *node, int eflags);
 static TupleTableSlot *postgresIterateForeignScan(ForeignScanState *node);
 static void postgresReScanForeignScan(ForeignScanState *node);
 static void postgresEndForeignScan(ForeignScanState *node);
+static void postgresShutdownForeignScan(ForeignScanState *node);
 static void postgresAddForeignUpdateTargets(Query *parsetree,
  RangeTblEntry *target_rte,
  Relation target_relation);
@@ -391,6 +422,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
  RelOptInfo *input_rel,
  RelOptInfo *output_rel,
  void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static bool postgresForeignAsyncConfigureWait(ForeignScanState *node,
+  WaitEventSet *wes,
+  void *caller_data, bool reinit);
 
 /*
  * Helper functions
@@ -419,7 +454,9 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
   EquivalenceClass *ec, EquivalenceMember *em,
   void *arg);
 static void create_cursor(ForeignScanState *node);
-static void fetch_more_data(ForeignScanState *node);
+static void request_more_data(ForeignScanState *node);
+static void fetch_received_data(ForeignScanState *node);
+static void vacate_connection(PgFdwState *fdwconn, bool clear_queue);
 static void close_cursor(PGconn *conn, unsigned int cursor_number);
 static PgFdwModifyState *create_foreign_modify(EState *estate,
    RangeTblEntry *rte,
@@ -522,6 +559,7 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
  routine->IterateForeignScan = postgresIterateForeignScan;
  routine->ReScanForeignScan = postgresReScanForeignScan;
  routine->EndForeignScan = postgresEndForeignScan;
+ routine->ShutdownForeignScan = postgresShutdownForeignScan;
 
  /* Functions for updating foreign tables */
  routine->AddForeignUpdateTargets = postgresAddForeignUpdateTargets;
@@ -558,6 +596,10 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
  /* Support functions for upper relation push-down */
  routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
 
+ /* Support functions for async execution */
+ routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+ routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+
  PG_RETURN_POINTER(routine);
 }
 
@@ -1434,12 +1476,22 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
  * Get connection to the foreign server.  Connection manager will
  * establish new connection if necessary.
  */
- fsstate->conn = GetConnection(user, false);
+ fsstate->s.conn = GetConnection(user, false);
+ fsstate->s.connpriv = (PgFdwConnpriv *)
+ GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
+ fsstate->s.connpriv->leader = NULL;
+ fsstate->s.connpriv->busy = false;
+ fsstate->waiter = NULL;
+ fsstate->last_waiter = node;
 
  /* Assign a unique ID for my cursor */
- fsstate->cursor_number = GetCursorNumber(fsstate->conn);
+ fsstate->cursor_number = GetCursorNumber(fsstate->s.conn);
  fsstate->cursor_exists = false;
 
+ /* Initialize async execution status */
+ fsstate->run_async = false;
+ fsstate->inqueue = false;
+
  /* Get private info created by planner functions. */
  fsstate->query = strVal(list_nth(fsplan->fdw_private,
  FdwScanPrivateSelectSql));
@@ -1487,40 +1539,259 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
  &fsstate->param_values);
 }
 
+/*
+ * Async queue manipuration functions
+ */
+
+/*
+ * add_async_waiter:
+ *
+ * adds the node to the end of waiter queue. Immediately starts the node if no
+ * node is running
+ */
+static inline void
+add_async_waiter(ForeignScanState *node)
+{
+ PgFdwScanState   *fsstate = GetPgFdwScanState(node);
+ ForeignScanState *leader = fsstate->s.connpriv->leader;
+
+ /* do nothing if the node is already in the queue or already eof'ed */
+ if (leader == node || fsstate->inqueue || fsstate->eof_reached)
+ return;
+
+ if (leader == NULL)
+ {
+ /* immediately send request if not busy */
+ request_more_data(node);
+ }
+ else
+ {
+ PgFdwScanState   *leader_state = GetPgFdwScanState(leader);
+ PgFdwScanState   *last_waiter_state
+ = GetPgFdwScanState(leader_state->last_waiter);
+
+ last_waiter_state->waiter = node;
+ leader_state->last_waiter = node;
+ fsstate->inqueue = true;
+ }
+}
+
+/*
+ * move_to_next_waiter:
+ *
+ * Makes the first waiter be next leader
+ * Returns the new leader or NULL if there's no waiter.
+ */
+static inline ForeignScanState *
+move_to_next_waiter(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+ ForeignScanState *ret = fsstate->waiter;
+
+ Assert(fsstate->s.connpriv->leader = node);
+
+ if (ret)
+ {
+ PgFdwScanState *retstate = GetPgFdwScanState(ret);
+ fsstate->waiter = NULL;
+ retstate->last_waiter = fsstate->last_waiter;
+ retstate->inqueue = false;
+ }
+
+ fsstate->s.connpriv->leader = ret;
+
+ return ret;
+}
+
+/*
+ * remove the node from waiter queue
+ *
+ * This is a bit different from the two above in the sense that this can
+ * operate on connection leader. The result is absorbed when this is called on
+ * active leader.
+ *
+ * Returns true if the node was found.
+ */
+static inline bool
+remove_async_node(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+ ForeignScanState *leader = fsstate->s.connpriv->leader;
+ PgFdwScanState *leader_state;
+ ForeignScanState *prev;
+ PgFdwScanState *prev_state;
+ ForeignScanState *cur;
+
+ /* no need to remove me */
+ if (!leader || !fsstate->inqueue)
+ return false;
+
+ leader_state = GetPgFdwScanState(leader);
+
+ /* Remove the leader node */
+ if (leader == node)
+ {
+ ForeignScanState *next_leader;
+
+ if (leader_state->s.connpriv->busy)
+ {
+ /*
+ * this node is waiting for result, absorb the result first so
+ * that the following commands can be sent on the connection.
+ */
+ PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+ PGconn *conn = leader_state->s.conn;
+
+ while(PQisBusy(conn))
+ PQclear(PQgetResult(conn));
+
+ leader_state->s.connpriv->busy = false;
+ }
+
+ /* Make the first waiter the leader */
+ if (leader_state->waiter)
+ {
+ PgFdwScanState *next_leader_state;
+
+ next_leader = leader_state->waiter;
+ next_leader_state = GetPgFdwScanState(next_leader);
+
+ leader_state->s.connpriv->leader = next_leader;
+ next_leader_state->last_waiter = leader_state->last_waiter;
+ }
+ leader_state->waiter = NULL;
+
+ return true;
+ }
+
+ /*
+ * Just remove the node in queue
+ *
+ * This function is called on the shutdown path. We don't bother
+ * considering faster way to do this.
+ */
+ prev = leader;
+ prev_state = leader_state;
+ cur =  GetPgFdwScanState(prev)->waiter;
+ while (cur)
+ {
+ PgFdwScanState *curstate = GetPgFdwScanState(cur);
+
+ if (cur == node)
+ {
+ prev_state->waiter = curstate->waiter;
+ if (leader_state->last_waiter == cur)
+ leader_state->last_waiter = prev;
+ else
+ leader_state->last_waiter = cur;
+
+ fsstate->inqueue = false;
+
+ return true;
+ }
+ prev = cur;
+ prev_state = curstate;
+ cur = curstate->waiter;
+ }
+
+ return false;
+}
+
 /*
  * postgresIterateForeignScan
- * Retrieve next row from the result set, or clear tuple slot to indicate
- * EOF.
+ * Retrieve next row from the result set.
+ *
+ * For synchronous nodes, returns clear tuples slot to indicte EOF.
+ *
+ * If the node is asynchronous one, clear tuple slot has two meanings.
+ * If the caller receives clear tuple slot, asyncstate indicates wheter
+ * the node is EOF (AS_AVAILABLE) or waiting for data to
+ * come(AS_WAITING).
  */
 static TupleTableSlot *
 postgresIterateForeignScan(ForeignScanState *node)
 {
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
  TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
 
- /*
- * If this is the first call after Begin or ReScan, we need to create the
- * cursor on the remote side.
- */
- if (!fsstate->cursor_exists)
- create_cursor(node);
+ if (fsstate->next_tuple >= fsstate->num_tuples && !fsstate->eof_reached)
+ {
+ /* we've run out, get some more tuples */
+ if (!node->fs_async)
+ {
+ /* finish running query to send my command */
+ if (!fsstate->s.connpriv->busy)
+ vacate_connection((PgFdwState *)fsstate, false);
+
+ request_more_data(node);
+
+ /*
+ * Fetch the result immediately. This executes the next waiter if
+ * any.
+ */
+ fetch_received_data(node);
+ }
+        else if (!fsstate->s.connpriv->busy)
+ {
+ /* If the connection is not busy, just send the request. */
+ request_more_data(node);
+ }
+        else
+ {
+ /* This connection is busy */
+ bool available = true;
+ ForeignScanState *leader = fsstate->s.connpriv->leader;
+ PgFdwScanState *leader_state = GetPgFdwScanState(leader);
+
+ /* Check if the result is immediately available */
+ if (PQisBusy(leader_state->s.conn))
+ {
+ int rc = WaitLatchOrSocket(NULL,
+   WL_SOCKET_READABLE | WL_TIMEOUT |
+   WL_EXIT_ON_PM_DEATH,
+   PQsocket(leader_state->s.conn), 0,
+   WAIT_EVENT_ASYNC_WAIT);
+ if (!(rc & WL_SOCKET_READABLE))
+ available = false;
+ }
+
+ /* The next waiter is executed automatcically */
+ if (available)
+ fetch_received_data(leader);
+
+ /* add the requested node */
+ add_async_waiter(node);
+
+ /* add the previous leader */
+ add_async_waiter(leader);
+ }
+ }
 
  /*
- * Get some more tuples, if we've run out.
+ * If we haven't received a result for the given node this time,
+ * return with no tuple to give way to another node.
  */
  if (fsstate->next_tuple >= fsstate->num_tuples)
  {
- /* No point in another fetch if we already detected EOF, though. */
- if (!fsstate->eof_reached)
- fetch_more_data(node);
- /* If we didn't get any tuples, must be end of data. */
- if (fsstate->next_tuple >= fsstate->num_tuples)
- return ExecClearTuple(slot);
+ if (fsstate->eof_reached)
+ {
+ fsstate->result_ready = true;
+ node->ss.ps.asyncstate = AS_AVAILABLE;
+ }
+ else
+ {
+ fsstate->result_ready = false;
+ node->ss.ps.asyncstate = AS_WAITING;
+ }
+
+ return ExecClearTuple(slot);
  }
 
  /*
  * Return the next tuple.
  */
+ fsstate->result_ready = true;
+ node->ss.ps.asyncstate = AS_AVAILABLE;
  ExecStoreHeapTuple(fsstate->tuples[fsstate->next_tuple++],
    slot,
    false);
@@ -1535,7 +1806,7 @@ postgresIterateForeignScan(ForeignScanState *node)
 static void
 postgresReScanForeignScan(ForeignScanState *node)
 {
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
  char sql[64];
  PGresult   *res;
 
@@ -1543,6 +1814,8 @@ postgresReScanForeignScan(ForeignScanState *node)
  if (!fsstate->cursor_exists)
  return;
 
+ vacate_connection((PgFdwState *)fsstate, true);
+
  /*
  * If any internal parameters affecting this node have changed, we'd
  * better destroy and recreate the cursor.  Otherwise, rewinding it should
@@ -1571,9 +1844,9 @@ postgresReScanForeignScan(ForeignScanState *node)
  * We don't use a PG_TRY block here, so be careful not to throw error
  * without releasing the PGresult.
  */
- res = pgfdw_exec_query(fsstate->conn, sql);
+ res = pgfdw_exec_query(fsstate->s.conn, sql);
  if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
+ pgfdw_report_error(ERROR, res, fsstate->s.conn, true, sql);
  PQclear(res);
 
  /* Now force a fresh FETCH. */
@@ -1591,7 +1864,7 @@ postgresReScanForeignScan(ForeignScanState *node)
 static void
 postgresEndForeignScan(ForeignScanState *node)
 {
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
 
  /* if fsstate is NULL, we are in EXPLAIN; nothing to do */
  if (fsstate == NULL)
@@ -1599,15 +1872,31 @@ postgresEndForeignScan(ForeignScanState *node)
 
  /* Close the cursor if open, to prevent accumulation of cursors */
  if (fsstate->cursor_exists)
- close_cursor(fsstate->conn, fsstate->cursor_number);
+ close_cursor(fsstate->s.conn, fsstate->cursor_number);
 
  /* Release remote connection */
- ReleaseConnection(fsstate->conn);
- fsstate->conn = NULL;
+ ReleaseConnection(fsstate->s.conn);
+ fsstate->s.conn = NULL;
 
  /* MemoryContexts will be deleted automatically. */
 }
 
+/*
+ * postgresShutdownForeignScan
+ * Remove asynchrony stuff and cleanup garbage on the connection.
+ */
+static void
+postgresShutdownForeignScan(ForeignScanState *node)
+{
+ ForeignScan *plan = (ForeignScan *) node->ss.ps.plan;
+
+ if (plan->operation != CMD_SELECT)
+ return;
+
+ /* remove the node from waiting queue */
+ remove_async_node(node);
+}
+
 /*
  * postgresAddForeignUpdateTargets
  * Add resjunk column(s) needed for update/delete on a foreign table
@@ -2372,7 +2661,9 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
  * Get connection to the foreign server.  Connection manager will
  * establish new connection if necessary.
  */
- dmstate->conn = GetConnection(user, false);
+ dmstate->s.conn = GetConnection(user, false);
+ dmstate->s.connpriv = (PgFdwConnpriv *)
+ GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
 
  /* Update the foreign-join-related fields. */
  if (fsplan->scan.scanrelid == 0)
@@ -2457,7 +2748,11 @@ postgresIterateDirectModify(ForeignScanState *node)
  * If this is the first call after Begin, execute the statement.
  */
  if (dmstate->num_tuples == -1)
+ {
+ /* finish running query to send my command */
+ vacate_connection((PgFdwState *)dmstate, true);
  execute_dml_stmt(node);
+ }
 
  /*
  * If the local query doesn't specify RETURNING, just clear tuple slot.
@@ -2504,8 +2799,8 @@ postgresEndDirectModify(ForeignScanState *node)
  PQclear(dmstate->result);
 
  /* Release remote connection */
- ReleaseConnection(dmstate->conn);
- dmstate->conn = NULL;
+ ReleaseConnection(dmstate->s.conn);
+ dmstate->s.conn = NULL;
 
  /* MemoryContext will be deleted automatically. */
 }
@@ -2703,6 +2998,7 @@ estimate_path_cost_size(PlannerInfo *root,
  List   *local_param_join_conds;
  StringInfoData sql;
  PGconn   *conn;
+ PgFdwConnpriv *connpriv;
  Selectivity local_sel;
  QualCost local_cost;
  List   *fdw_scan_tlist = NIL;
@@ -2747,6 +3043,18 @@ estimate_path_cost_size(PlannerInfo *root,
 
  /* Get the remote estimate */
  conn = GetConnection(fpinfo->user, false);
+ connpriv = GetConnectionSpecificStorage(fpinfo->user,
+ sizeof(PgFdwConnpriv));
+ if (connpriv)
+ {
+ PgFdwState tmpstate;
+ tmpstate.conn = conn;
+ tmpstate.connpriv = connpriv;
+
+ /* finish running query to send my command */
+ vacate_connection(&tmpstate, true);
+ }
+
  get_remote_estimate(sql.data, conn, &rows, &width,
  &startup_cost, &total_cost);
  ReleaseConnection(conn);
@@ -3317,11 +3625,11 @@ ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
 static void
 create_cursor(ForeignScanState *node)
 {
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
  ExprContext *econtext = node->ss.ps.ps_ExprContext;
  int numParams = fsstate->numParams;
  const char **values = fsstate->param_values;
- PGconn   *conn = fsstate->conn;
+ PGconn   *conn = fsstate->s.conn;
  StringInfoData buf;
  PGresult   *res;
 
@@ -3384,50 +3692,127 @@ create_cursor(ForeignScanState *node)
 }
 
 /*
- * Fetch some more rows from the node's cursor.
+ * Sends the next request of the node. If the given node is different from the
+ * current connection leader, pushes it back to waiter queue and let the given
+ * node be the leader.
  */
 static void
-fetch_more_data(ForeignScanState *node)
+request_more_data(ForeignScanState *node)
 {
- PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+ ForeignScanState *leader = fsstate->s.connpriv->leader;
+ PGconn   *conn = fsstate->s.conn;
+ char sql[64];
+
+ /* must be non-busy */
+ Assert(!fsstate->s.connpriv->busy);
+ /* must be not-eof */
+ Assert(!fsstate->eof_reached);
+
+ /*
+ * If this is the first call after Begin or ReScan, we need to create the
+ * cursor on the remote side.
+ */
+ if (!fsstate->cursor_exists)
+ create_cursor(node);
+
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ if (!PQsendQuery(conn, sql))
+ pgfdw_report_error(ERROR, NULL, conn, false, sql);
+
+ fsstate->s.connpriv->busy = true;
+
+ /* Let the node be the leader if it is different from current one */
+ if (leader != node)
+ {
+ /*
+ * If the connection leader exists, insert the node as the connection
+ * leader making the current leader be the first waiter.
+ */
+ if (leader != NULL)
+ {
+ remove_async_node(node);
+ fsstate->last_waiter = GetPgFdwScanState(leader)->last_waiter;
+ fsstate->waiter = leader;
+ }
+ else
+ {
+ fsstate->last_waiter = node;
+ fsstate->waiter = NULL;
+ }
+
+ fsstate->s.connpriv->leader = node;
+ }
+}
+
+/*
+ * Fetches received data and automatically send requests of the next waiter.
+ */
+static void
+fetch_received_data(ForeignScanState *node)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
  PGresult   *volatile res = NULL;
  MemoryContext oldcontext;
+ ForeignScanState *waiter;
+
+ /* I should be the current connection leader */
+ Assert(fsstate->s.connpriv->leader == node);
 
  /*
  * We'll store the tuples in the batch_cxt.  First, flush the previous
- * batch.
+ * batch if no tuple is remaining
  */
- fsstate->tuples = NULL;
- MemoryContextReset(fsstate->batch_cxt);
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ fsstate->tuples = NULL;
+ fsstate->num_tuples = 0;
+ MemoryContextReset(fsstate->batch_cxt);
+ }
+ else if (fsstate->next_tuple > 0)
+ {
+ /* move the remaining tuples to the beginning of the store */
+ int n = 0;
+
+ while(fsstate->next_tuple < fsstate->num_tuples)
+ fsstate->tuples[n++] = fsstate->tuples[fsstate->next_tuple++];
+ fsstate->num_tuples = n;
+ }
+
  oldcontext = MemoryContextSwitchTo(fsstate->batch_cxt);
 
  /* PGresult must be released before leaving this function. */
  PG_TRY();
  {
- PGconn   *conn = fsstate->conn;
+ PGconn   *conn = fsstate->s.conn;
  char sql[64];
- int numrows;
+ int addrows;
+ size_t newsize;
  int i;
 
  snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
  fsstate->fetch_size, fsstate->cursor_number);
 
- res = pgfdw_exec_query(conn, sql);
+ res = pgfdw_get_result(conn, sql);
  /* On error, report the original query, not the FETCH. */
  if (PQresultStatus(res) != PGRES_TUPLES_OK)
  pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
 
  /* Convert the data into HeapTuples */
- numrows = PQntuples(res);
- fsstate->tuples = (HeapTuple *) palloc0(numrows * sizeof(HeapTuple));
- fsstate->num_tuples = numrows;
- fsstate->next_tuple = 0;
+ addrows = PQntuples(res);
+ newsize = (fsstate->num_tuples + addrows) * sizeof(HeapTuple);
+ if (fsstate->tuples)
+ fsstate->tuples = (HeapTuple *) repalloc(fsstate->tuples, newsize);
+ else
+ fsstate->tuples = (HeapTuple *) palloc(newsize);
 
- for (i = 0; i < numrows; i++)
+ for (i = 0; i < addrows; i++)
  {
  Assert(IsA(node->ss.ps.plan, ForeignScan));
 
- fsstate->tuples[i] =
+ fsstate->tuples[fsstate->num_tuples + i] =
  make_tuple_from_result_row(res, i,
    fsstate->rel,
    fsstate->attinmeta,
@@ -3437,22 +3822,75 @@ fetch_more_data(ForeignScanState *node)
  }
 
  /* Update fetch_ct_2 */
- if (fsstate->fetch_ct_2 < 2)
+ if (fsstate->fetch_ct_2 < 2 && fsstate->next_tuple == 0)
  fsstate->fetch_ct_2++;
 
+ fsstate->next_tuple = 0;
+ fsstate->num_tuples += addrows;
+
  /* Must be EOF if we didn't get as many tuples as we asked for. */
- fsstate->eof_reached = (numrows < fsstate->fetch_size);
+ fsstate->eof_reached = (addrows < fsstate->fetch_size);
+
+ PQclear(res);
+ res = NULL;
  }
  PG_FINALLY();
  {
+ fsstate->s.connpriv->busy = false;
+
  if (res)
  PQclear(res);
  }
  PG_END_TRY();
 
+ fsstate->s.connpriv->busy = false;
+
+ /* let the first waiter be the next leader of this connection */
+ waiter = move_to_next_waiter(node);
+
+ /* send the next request if any */
+ if (waiter)
+ request_more_data(waiter);
+
  MemoryContextSwitchTo(oldcontext);
 }
 
+/*
+ * Vacate a connection so that this node can send the next query
+ */
+static void
+vacate_connection(PgFdwState *fdwstate, bool clear_queue)
+{
+ PgFdwConnpriv *connpriv = fdwstate->connpriv;
+ ForeignScanState *leader;
+
+ /* the connection is alrady available */
+ if (connpriv == NULL || connpriv->leader == NULL || !connpriv->busy)
+ return;
+
+ /*
+ * let the current connection leader read the result for the running query
+ */
+ leader = connpriv->leader;
+ fetch_received_data(leader);
+
+ /* let the first waiter be the next leader of this connection */
+ move_to_next_waiter(leader);
+
+ if (!clear_queue)
+ return;
+
+ /* Clear the waiting list */
+ while (leader)
+ {
+ PgFdwScanState *fsstate = GetPgFdwScanState(leader);
+
+ fsstate->last_waiter = NULL;
+ leader = fsstate->waiter;
+ fsstate->waiter = NULL;
+ }
+}
+
 /*
  * Force assorted GUC parameters to settings that ensure that we'll output
  * data values in a form that is unambiguous to the remote server.
@@ -3566,7 +4004,9 @@ create_foreign_modify(EState *estate,
  user = GetUserMapping(userid, table->serverid);
 
  /* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
+ fmstate->s.conn = GetConnection(user, true);
+ fmstate->s.connpriv = (PgFdwConnpriv *)
+ GetConnectionSpecificStorage(user, sizeof(PgFdwConnpriv));
  fmstate->p_name = NULL; /* prepared statement not made yet */
 
  /* Set up remote query information. */
@@ -3653,6 +4093,9 @@ execute_foreign_modify(EState *estate,
    operation == CMD_UPDATE ||
    operation == CMD_DELETE);
 
+ /* finish running query to send my command */
+ vacate_connection((PgFdwState *)fmstate, true);
+
  /* Set up the prepared statement on the remote server, if we didn't yet */
  if (!fmstate->p_name)
  prepare_foreign_modify(fmstate);
@@ -3680,14 +4123,14 @@ execute_foreign_modify(EState *estate,
  /*
  * Execute the prepared statement.
  */
- if (!PQsendQueryPrepared(fmstate->conn,
+ if (!PQsendQueryPrepared(fmstate->s.conn,
  fmstate->p_name,
  fmstate->p_nums,
  p_values,
  NULL,
  NULL,
  0))
- pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
  /*
  * Get the result, and check for success.
@@ -3695,10 +4138,10 @@ execute_foreign_modify(EState *estate,
  * We don't use a PG_TRY block here, so be careful not to throw error
  * without releasing the PGresult.
  */
- res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
  if (PQresultStatus(res) !=
  (fmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
 
  /* Check number of rows affected, and fetch RETURNING tuple if any */
  if (fmstate->has_returning)
@@ -3734,7 +4177,7 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
 
  /* Construct name we'll use for the prepared statement. */
  snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
- GetPrepStmtNumber(fmstate->conn));
+ GetPrepStmtNumber(fmstate->s.conn));
  p_name = pstrdup(prep_name);
 
  /*
@@ -3744,12 +4187,12 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
  * the prepared statements we use in this module are simple enough that
  * the remote server will make the right choices.
  */
- if (!PQsendPrepare(fmstate->conn,
+ if (!PQsendPrepare(fmstate->s.conn,
    p_name,
    fmstate->query,
    0,
    NULL))
- pgfdw_report_error(ERROR, NULL, fmstate->conn, false, fmstate->query);
+ pgfdw_report_error(ERROR, NULL, fmstate->s.conn, false, fmstate->query);
 
  /*
  * Get the result, and check for success.
@@ -3757,9 +4200,9 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
  * We don't use a PG_TRY block here, so be careful not to throw error
  * without releasing the PGresult.
  */
- res = pgfdw_get_result(fmstate->conn, fmstate->query);
+ res = pgfdw_get_result(fmstate->s.conn, fmstate->query);
  if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, fmstate->query);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, fmstate->query);
  PQclear(res);
 
  /* This action shows that the prepare has been done. */
@@ -3888,16 +4331,16 @@ finish_foreign_modify(PgFdwModifyState *fmstate)
  * We don't use a PG_TRY block here, so be careful not to throw error
  * without releasing the PGresult.
  */
- res = pgfdw_exec_query(fmstate->conn, sql);
+ res = pgfdw_exec_query(fmstate->s.conn, sql);
  if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
+ pgfdw_report_error(ERROR, res, fmstate->s.conn, true, sql);
  PQclear(res);
  fmstate->p_name = NULL;
  }
 
  /* Release remote connection */
- ReleaseConnection(fmstate->conn);
- fmstate->conn = NULL;
+ ReleaseConnection(fmstate->s.conn);
+ fmstate->s.conn = NULL;
 }
 
 /*
@@ -4056,9 +4499,9 @@ execute_dml_stmt(ForeignScanState *node)
  * the desired result.  This allows us to avoid assuming that the remote
  * server has the same OIDs we do for the parameters' types.
  */
- if (!PQsendQueryParams(dmstate->conn, dmstate->query, numParams,
+ if (!PQsendQueryParams(dmstate->s.conn, dmstate->query, numParams,
    NULL, values, NULL, NULL, 0))
- pgfdw_report_error(ERROR, NULL, dmstate->conn, false, dmstate->query);
+ pgfdw_report_error(ERROR, NULL, dmstate->s.conn, false, dmstate->query);
 
  /*
  * Get the result, and check for success.
@@ -4066,10 +4509,10 @@ execute_dml_stmt(ForeignScanState *node)
  * We don't use a PG_TRY block here, so be careful not to throw error
  * without releasing the PGresult.
  */
- dmstate->result = pgfdw_get_result(dmstate->conn, dmstate->query);
+ dmstate->result = pgfdw_get_result(dmstate->s.conn, dmstate->query);
  if (PQresultStatus(dmstate->result) !=
  (dmstate->has_returning ? PGRES_TUPLES_OK : PGRES_COMMAND_OK))
- pgfdw_report_error(ERROR, dmstate->result, dmstate->conn, true,
+ pgfdw_report_error(ERROR, dmstate->result, dmstate->s.conn, true,
    dmstate->query);
 
  /* Get the number of rows affected. */
@@ -5560,6 +6003,42 @@ postgresGetForeignJoinPaths(PlannerInfo *root,
  /* XXX Consider parameterized paths for the join relation */
 }
 
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+ return true;
+}
+
+
+/*
+ * Configure waiting event.
+ *
+ * Add an wait event only when the node is the connection leader. Elsewise
+ * another node on this connection is the leader.
+ */
+static bool
+postgresForeignAsyncConfigureWait(ForeignScanState *node, WaitEventSet *wes,
+  void *caller_data, bool reinit)
+{
+ PgFdwScanState *fsstate = GetPgFdwScanState(node);
+
+
+ /* If the caller didn't reinit, this event is already in event set */
+ if (!reinit)
+ return true;
+
+ if (fsstate->s.connpriv->leader == node)
+ {
+ AddWaitEventToSet(wes,
+  WL_SOCKET_READABLE, PQsocket(fsstate->s.conn),
+  NULL, caller_data);
+ return true;
+ }
+
+ return false;
+}
+
+
 /*
  * Assess whether the aggregation, grouping and having operations can be pushed
  * down to the foreign server.  As a side effect, save information we obtain in
diff --git a/contrib/postgres_fdw/postgres_fdw.h b/contrib/postgres_fdw/postgres_fdw.h
index ea052872c3..696af73408 100644
--- a/contrib/postgres_fdw/postgres_fdw.h
+++ b/contrib/postgres_fdw/postgres_fdw.h
@@ -85,6 +85,7 @@ typedef struct PgFdwRelationInfo
  UserMapping *user; /* only set in use_remote_estimate mode */
 
  int fetch_size; /* fetch size for this remote table */
+ bool allow_prefetch; /* true to allow overlapped fetching  */
 
  /*
  * Name of the relation, for use while EXPLAINing ForeignScan.  It is used
@@ -130,6 +131,7 @@ extern void reset_transmission_modes(int nestlevel);
 
 /* in connection.c */
 extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+void *GetConnectionSpecificStorage(UserMapping *user, size_t initsize);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
diff --git a/contrib/postgres_fdw/sql/postgres_fdw.sql b/contrib/postgres_fdw/sql/postgres_fdw.sql
index 1c5c37b783..69c06ac6e4 100644
--- a/contrib/postgres_fdw/sql/postgres_fdw.sql
+++ b/contrib/postgres_fdw/sql/postgres_fdw.sql
@@ -1730,25 +1730,25 @@ INSERT INTO b(aa) VALUES('bbb');
 INSERT INTO b(aa) VALUES('bbbb');
 INSERT INTO b(aa) VALUES('bbbbb');
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE a SET aa = 'zzzzzz' WHERE aa LIKE 'aaaa%';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE b SET aa = 'new';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
 UPDATE a SET aa = 'newtoo';
 
-SELECT tableoid::regclass, * FROM a;
+SELECT tableoid::regclass, * FROM a ORDER BY 1, 2;
 SELECT tableoid::regclass, * FROM b;
 SELECT tableoid::regclass, * FROM ONLY a;
 
@@ -1790,12 +1790,12 @@ insert into bar2 values(4,44,44);
 insert into bar2 values(7,77,77);
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for update;
-select * from bar where f1 in (select f1 from foo) for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
+select * from bar where f1 in (select f1 from foo) order by 1 for update;
 
 explain (verbose, costs off)
-select * from bar where f1 in (select f1 from foo) for share;
-select * from bar where f1 in (select f1 from foo) for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
+select * from bar where f1 in (select f1 from foo) order by 1 for share;
 
 -- Check UPDATE with inherited target and an inherited source table
 explain (verbose, costs off)
@@ -1854,8 +1854,8 @@ explain (verbose, costs off)
 delete from foo where f1 < 5 returning *;
 delete from foo where f1 < 5 returning *;
 explain (verbose, costs off)
-update bar set f2 = f2 + 100 returning *;
-update bar set f2 = f2 + 100 returning *;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
+with u as (update bar set f2 = f2 + 100 returning *) select * from u order by 1;
 
 -- Test that UPDATE/DELETE with inherited target works with row-level triggers
 CREATE TRIGGER trig_row_before
--
2.23.0

Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Bruce Momjian
In reply to this post by Robert Haas
On Thu, Dec  5, 2019 at 03:19:50PM -0500, Robert Haas wrote:

> On Thu, Dec 5, 2019 at 1:12 PM Bruce Momjian <[hidden email]> wrote:
> > I agree with Stephen's request.  We have been waiting for the executor
> > rewrite for a while, so let's just do something simple and see how it
> > performs.
>
> I'm sympathetic to the frustration here, and I think it would be great
> if we could find a way forward that doesn't involve waiting for a full
> rewrite of the executor.  However, I seem to remember that when we
> tested the various patches that various people had written for this
> feature (I wrote one, too) they all had a noticeable performance
> penalty in the case of a plain old Append that involved no FDWs and
> nothing asynchronous. I don't think it's OK to have, say, a 2%
> regression on every query that involves an Append, because especially
> now that we have partitioning, that's a lot of queries.
>
> I don't know whether this patch has that kind of problem. If it
> doesn't, I would consider that a promising sign.

Certainly any overhead on normal queries would be unacceptable.

--
  Bruce Momjian  <[hidden email]>        http://momjian.us
  EnterpriseDB                             http://enterprisedb.com

+ As you are, so once was I.  As I am, so you will be. +
+                      Ancient Roman grave inscription +


Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Kyotaro Horiguchi-4
Hello.

I think I can say that this patch doesn't slows non-AsyncAppend,
non-postgres_fdw scans.


At Mon, 9 Dec 2019 12:18:44 -0500, Bruce Momjian <[hidden email]> wrote in
> Certainly any overhead on normal queries would be unacceptable.

I took performance numbers on the current shape of the async execution
patch for the following scan cases.

t0   : single local table (parallel disabled)
pll  : local partitioning (local Append, parallel disabled)
ft0  : single foreign table
pf0  : inheritance on 4 foreign tables, single connection
pf1  : inheritance on 4 foreign tables, 4 connections
ptf0 : partition on 4 foreign tables, single connection
ptf1 : partition on 4 foreign tables, 4 connections

The benchmarking system is configured as the follows on a single
machine.

          [ benchmark client   ]
           |                  |
    (localhost:5433)    (localhost:5432)
           |                  |
   +----+  |   +------+       |
   |    V  V   V      |       V
   | [master server]  |  [async server]
   |       V          |       V
   +--fdw--+          +--fdw--+


The patch works roughly in the following steps.

1. Planner decides how many children out of an append can run
  asynchrnously (called as async-capable.).

2. While ExecInit if an Append doesn't have an async-capable children,
  ExecAppend that is exactly the same function is set as
  ExecProcNode. Otherwise ExecAppendAsync is used.

If the infrastructure part in the patch causes any degradation, the
"t0"(scan on local single table) and/or "pll" test (scan on a local
paritioned table) gets slow.

3. postgresql_fdw always runs async-capable code path.

If the postgres_fdw part causes degradation, ft0 reflects that.


The tables has two integers and the query does sum(a) on all tuples.

With the default fetch_size = 100, number is run time in ms.  Each
number is the average of 14 runs.

     master  patched   gain
t0   7325    7130     +2.7%
pll  4558    4484     +1.7%
ft0  3670    3675     -0.1%
pf0  2322    1550    +33.3%
pf1  2367    1475    +37.7%
ptf0 2517    1624    +35.5%
ptf1 2343    1497    +36.2%

With larger fetch_size (200) the gain mysteriously decreases for
sharing single connection cases (pf0, ptf0), but others don't seem
change so much.

     master  patched   gain
t0   7212    7252     -0.6%
pll  4546    4397     +3.3%
ft0  3712    3731     -0.5%
pf0  2131    1570    +26.4%
pf1  1926    1189    +38.3%
ptf0 2001    1557    +22.2%
ptf1 1903    1193    +37.4%

FWIW, attached are the test script.

gentblr2.sql: Table creation script.
testrun.sh  : Benchmarking script.


regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

SELECT :scale  * 0 as th0,
       :scale  * 1 as th1,
       :scale  * 2 as th2,
       :scale  * 3 as th3,
       :scale  * 4 as th4,
       :scale  * 10 as th10,
       :scale  * 20 as th20,
       :scale  * 30 as th30,
       :scale  * 40 as th40,
       :scale  * 100 as th100,
       :scale  * 1000 as th1000
\gset

DROP TABLE IF EXISTS t0 CASCADE;
DROP TABLE IF EXISTS pl CASCADE;
DROP TABLE IF EXISTS pll CASCADE;
DROP TABLE IF EXISTS pf0 CASCADE;
DROP TABLE IF EXISTS pf1 CASCADE;
DROP TABLE IF EXISTS ptf0;
DROP TABLE IF EXISTS ptf1;

CREATE TABLE pl (a int, b int);
CREATE TABLE cl1 (LIKE pl) INHERITS (pl);
CREATE TABLE cl2 (LIKE pl) INHERITS (pl);
CREATE TABLE cl3 (LIKE pl) INHERITS (pl);
CREATE TABLE cl4 (LIKE pl) INHERITS (pl);
INSERT INTO cl1 (SELECT a, a FROM generate_series(:th0, :th1 - 1) a);
INSERT INTO cl2 (SELECT a, a FROM generate_series(:th1, :th2 - 1) a);
INSERT INTO cl3 (SELECT a, a FROM generate_series(:th2, :th3 - 1) a);
INSERT INTO cl4 (SELECT a, a FROM generate_series(:th3, :th4 - 1) a);

CREATE TABLE pll (a int, b int);
CREATE TABLE cll1 (LIKE pl) INHERITS (pll);
CREATE TABLE cll2 (LIKE pl) INHERITS (pll);
CREATE TABLE cll3 (LIKE pl) INHERITS (pll);
CREATE TABLE cll4 (LIKE pl) INHERITS (pll);
INSERT INTO cll1 (SELECT a, a FROM generate_series(:th0, :th10 - 1) a);
INSERT INTO cll2 (SELECT a, a FROM generate_series(:th10, :th20 - 1) a);
INSERT INTO cll3 (SELECT a, a FROM generate_series(:th20, :th30 - 1) a);
INSERT INTO cll4 (SELECT a, a FROM generate_series(:th30, :th40 - 1) a);


CREATE TABLE t0  (LIKE pl);
INSERT INTO t0 (SELECT a, a FROM generate_series(0, :th100 - 1) a);

DROP SERVER IF EXISTS svl CASCADE;
DROP SERVER IF EXISTS sv0 CASCADE;
DROP SERVER IF EXISTS sv1 CASCADE;
DROP SERVER IF EXISTS sv2 CASCADE;
DROP SERVER IF EXISTS sv3 CASCADE;
DROP SERVER IF EXISTS sv4 CASCADE;

CREATE SERVER svl FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname', fetch_size :'fetchsize');
CREATE SERVER sv0 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname', fetch_size :'fetchsize');
CREATE SERVER sv1 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname', fetch_size :'fetchsize');
CREATE SERVER sv2 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname', fetch_size :'fetchsize');
CREATE SERVER sv3 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname', fetch_size :'fetchsize');
CREATE SERVER sv4 FOREIGN DATA WRAPPER postgres_fdw OPTIONS (host :'svhost', port :'svport', dbname :'svdbname', fetch_size :'fetchsize');

CREATE USER MAPPING FOR public SERVER svl;
CREATE USER MAPPING FOR public SERVER sv0;
CREATE USER MAPPING FOR public SERVER sv1;
CREATE USER MAPPING FOR public SERVER sv2;
CREATE USER MAPPING FOR public SERVER sv3;
CREATE USER MAPPING FOR public SERVER sv4;

CREATE FOREIGN TABLE ft0 (a int, b int) SERVER svl OPTIONS (table_name 't0');

CREATE FOREIGN TABLE ft10 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl1');
CREATE FOREIGN TABLE ft20 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl2');
CREATE FOREIGN TABLE ft30 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl3');
CREATE FOREIGN TABLE ft40 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl4');
CREATE FOREIGN TABLE ft11 (a int, b int) SERVER sv1 OPTIONS (table_name 'cl1');
CREATE FOREIGN TABLE ft22 (a int, b int) SERVER sv2 OPTIONS (table_name 'cl2');
CREATE FOREIGN TABLE ft33 (a int, b int) SERVER sv3 OPTIONS (table_name 'cl3');
CREATE FOREIGN TABLE ft44 (a int, b int) SERVER sv4 OPTIONS (table_name 'cl4');

CREATE TABLE pf0 (LIKE pl);
ALTER FOREIGN TABLE ft10 INHERIT pf0;
ALTER FOREIGN TABLE ft20 INHERIT pf0;
ALTER FOREIGN TABLE ft30 INHERIT pf0;
ALTER FOREIGN TABLE ft40 INHERIT pf0;

CREATE TABLE pf1 (LIKE pl);
ALTER FOREIGN TABLE ft11 INHERIT pf1;
ALTER FOREIGN TABLE ft22 INHERIT pf1;
ALTER FOREIGN TABLE ft33 INHERIT pf1;
ALTER FOREIGN TABLE ft44 INHERIT pf1;

CREATE FOREIGN TABLE ftp10 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl1');
CREATE FOREIGN TABLE ftp20 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl2');
CREATE FOREIGN TABLE ftp30 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl3');
CREATE FOREIGN TABLE ftp40 (a int, b int) SERVER sv0 OPTIONS (table_name 'cl4');
CREATE FOREIGN TABLE ftp11 (a int, b int) SERVER sv1 OPTIONS (table_name 'cl1');
CREATE FOREIGN TABLE ftp22 (a int, b int) SERVER sv2 OPTIONS (table_name 'cl2');
CREATE FOREIGN TABLE ftp33 (a int, b int) SERVER sv3 OPTIONS (table_name 'cl3');
CREATE FOREIGN TABLE ftp44 (a int, b int) SERVER sv4 OPTIONS (table_name 'cl4');

CREATE TABLE ptf0 (a int, b int) PARTITION BY RANGE (a);
ALTER TABLE ptf0 ATTACH PARTITION ftp10 FOR VALUES FROM (:th0) TO (:th1);
ALTER TABLE ptf0 ATTACH PARTITION ftp20 FOR VALUES FROM (:th1) TO (:th2);
ALTER TABLE ptf0 ATTACH PARTITION ftp30 FOR VALUES FROM (:th2) TO (:th3);
ALTER TABLE ptf0 ATTACH PARTITION ftp40 FOR VALUES FROM (:th3) TO (:th4);

CREATE TABLE ptf1 (a int, b int) PARTITION BY RANGE (a);
ALTER TABLE ptf1 ATTACH PARTITION ftp11 FOR VALUES FROM (:th0) TO (:th1);
ALTER TABLE ptf1 ATTACH PARTITION ftp22 FOR VALUES FROM (:th1) TO (:th2);
ALTER TABLE ptf1 ATTACH PARTITION ftp33 FOR VALUES FROM (:th2) TO (:th3);
ALTER TABLE ptf1 ATTACH PARTITION ftp44 FOR VALUES FROM (:th3) TO (:th4);

ANALYZE;

#! /bin/bash

function do_test() {
        echo $1
        for i in $(seq 1 14); do psql postgres -c "set max_parallel_workers_per_gather to 0; set log_min_duration_statement = 0; set client_min_messages=log; explain analyze select sum(a) from $1"; done | grep LOG
}

function do_test_union1() {
        echo "UNION_pf0"
        for i in $(seq 1 14); do psql postgres -c "set max_parallel_workers_per_gather to 0; set log_min_duration_statement = 0; set client_min_messages=log; SELECT sum(a) FROM (SELECT a FROM ft10 UNION ALL SELECT a FROM ft20 UNION ALL SELECT a FROM ft30 UNION ALL SELECT a FROM ft40) as pf0"; done | grep LOG
}
function do_test_union2() {
        echo "UNION_pf1"
        for i in $(seq 1 14); do psql postgres -c "set max_parallel_workers_per_gather to 0; set log_min_duration_statement = 0; set client_min_messages=log; SELECT sum(a) FROM (SELECT a FROM ft11 UNION ALL SELECT a FROM ft22 UNION ALL SELECT a FROM ft33 UNION ALL SELECT a FROM ft44) as pf1"; done | grep LOG
}

function warmup() {
        for i in $(seq 1 5); do psql postgres -c "set log_min_duration_statement = -1; select sum(a) from $1"; done 1>&2 > /dev/null
}

#for t in "t0" "pll";
#for t in "ft0" "pf0" "pf1" "ptf0"  "ptf1";
#for t in "pf0" "ptf0";
for t in "t0" "pll" "ft0" "pf0" "pf1" "ptf0"  "ptf1";
  do
   warmup $t
   do_test $t
  done
exit
for t in "ft0" "pf0" "pf1" "ptf0"  "ptf1";
do
  warmup $t
  do_test $t
done
#do_test_union1
#do_test_union2
Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Ahsan Hadi-3
Hi Hackers,

Sharing the email below from Movead Li, I believe he wanted to share the benchmarking results as a response to this email thread but it started a new thread.. Here it is...

"
Hello

I have tested the patch with a partition table with several foreign
partitions living on seperate data nodes. The initial testing was done
with a partition table having 3 foreign partitions, test was done with
variety of scale facters. The seonnd test was with fixed data per data
node but number of data nodes were increased incrementally to see
the peformance impact as more nodes are added to the cluster. The
test three is similar to the initial test but with much huge data and
4 nodes.

The results are summary is given below and test script attached:

Test ENV
Parent node:2Core 8G
Child Nodes:2Core 4G


Test one:

1.1 The partition struct as below:

 [ ptf:(a int, b int, c varchar)]
    (Parent node)
        |             |             |
    [ptf1]      [ptf2]      [ptf3]
 (Node1)   (Node2)    (Node3)

The table data is partitioned across nodes, the test is done using a
simple select query and a count aggregate as shown below. The result
is an average of executing each query multiple times to ensure reliable
and consistent results.

①select * from ptf where b = 100;
②select count(*) from ptf;

1.2. Test Results

 For ① result:
       scalepernode    master    patched     performance
           2G                    7s             2s               350%
           5G                    173s         63s             275%
           10G                  462s         156s           296%
           20G                  968s         327s           296%
           30G                  1472s       494s           297%
           
 For ② result:
       scalepernode    master    patched     performance
           2G                    1079s       291s           370%
           5G                    2688s       741s           362%
           10G                  4473s       1493s         299%

It takes too long time to test a aggregate so the test was done with a
smaller data size.


1.3. summary

With the table partitioned over 3 nodes, the average performance gain
across variety of scale factors is almost 300%


Test Two
2.1 The partition struct as below:

 [ ptf:(a int, b int, c varchar)]
    (Parent node)
        |             |             |
    [ptf1]         ...      [ptfN]
 (Node1)      (...)    (NodeN)

①select * from ptf
②select * from ptf where b = 100;

This test is done with same size of data per node but table is partitioned
across N number of nodes. Each varation (master or patches) is tested
at-least 3 times to get reliable and consistent results. The purpose of the
test is to see impact on performance as number of data nodes are increased.

2.2 The results

For ① result(scalepernode=2G):
    nodenumber  master    patched     performance
             2             432s        180s              240%
             3             636s         223s             285%
             4             830s         283s             293%
             5             1065s       361s             295%
For ② result(scalepernode=10G):
    nodenumber  master    patched     performance
             2             281s        140s             201%
             3             421s        140s             300%
             4             562s        141s             398%
             5             702s        141s             497%
             6             833s        139s             599%
             7             986s        141s             699%
             8             1125s      140s             803%


Test Three

This test is similar to the [test one] but with much huge data and 
4 nodes.

For ① result:
    scalepernode    master    patched     performance
      100G                6592s       1649s         399%
For ② result:
    scalepernode    master    patched     performance
      100G                35383      12363         286%
The result show it work well in much huge data.


Summary
The patch is pretty good, it works well when there were little data back to
the parent node. The patch doesn’t provide parallel FDW scan, it ensures
that child nodes can send data to parent in parallel but  the parent can only
sequennly process the data from data nodes.

Providing there is no performance degrdation for non FDW append queries,
I would recomend to consider this patch as an interim soluton while we are
waiting for parallel FDW scan.
"



On Thu, Dec 12, 2019 at 5:41 PM Kyotaro Horiguchi <[hidden email]> wrote:
Hello.

I think I can say that this patch doesn't slows non-AsyncAppend,
non-postgres_fdw scans.


At Mon, 9 Dec 2019 12:18:44 -0500, Bruce Momjian <[hidden email]> wrote in
> Certainly any overhead on normal queries would be unacceptable.

I took performance numbers on the current shape of the async execution
patch for the following scan cases.

t0   : single local table (parallel disabled)
pll  : local partitioning (local Append, parallel disabled)
ft0  : single foreign table
pf0  : inheritance on 4 foreign tables, single connection
pf1  : inheritance on 4 foreign tables, 4 connections
ptf0 : partition on 4 foreign tables, single connection
ptf1 : partition on 4 foreign tables, 4 connections

The benchmarking system is configured as the follows on a single
machine.

          [ benchmark client   ]
           |                  |
    (localhost:5433)    (localhost:5432)
           |                  |
   +----+  |   +------+       |
   |    V  V   V      |       V
   | [master server]  |  [async server]
   |       V          |       V
   +--fdw--+          +--fdw--+


The patch works roughly in the following steps.

1. Planner decides how many children out of an append can run
  asynchrnously (called as async-capable.).

2. While ExecInit if an Append doesn't have an async-capable children,
  ExecAppend that is exactly the same function is set as
  ExecProcNode. Otherwise ExecAppendAsync is used.

If the infrastructure part in the patch causes any degradation, the
"t0"(scan on local single table) and/or "pll" test (scan on a local
paritioned table) gets slow.

3. postgresql_fdw always runs async-capable code path.

If the postgres_fdw part causes degradation, ft0 reflects that.


The tables has two integers and the query does sum(a) on all tuples.

With the default fetch_size = 100, number is run time in ms.  Each
number is the average of 14 runs.

     master  patched   gain
t0   7325    7130     +2.7%
pll  4558    4484     +1.7%
ft0  3670    3675     -0.1%
pf0  2322    1550    +33.3%
pf1  2367    1475    +37.7%
ptf0 2517    1624    +35.5%
ptf1 2343    1497    +36.2%

With larger fetch_size (200) the gain mysteriously decreases for
sharing single connection cases (pf0, ptf0), but others don't seem
change so much.

     master  patched   gain
t0   7212    7252     -0.6%
pll  4546    4397     +3.3%
ft0  3712    3731     -0.5%
pf0  2131    1570    +26.4%
pf1  1926    1189    +38.3%
ptf0 2001    1557    +22.2%
ptf1 1903    1193    +37.4%

FWIW, attached are the test script.

gentblr2.sql: Table creation script.
testrun.sh  : Benchmarking script.


regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center

script.tar (37K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Bruce Momjian
On Tue, Jan 14, 2020 at 02:37:48PM +0500, Ahsan Hadi wrote:
> Summary
> The patch is pretty good, it works well when there were little data back to
> the parent node. The patch doesn’t provide parallel FDW scan, it ensures
> that child nodes can send data to parent in parallel but  the parent can only
> sequennly process the data from data nodes.
>
> Providing there is no performance degrdation for non FDW append queries,
> I would recomend to consider this patch as an interim soluton while we are
> waiting for parallel FDW scan.

Wow, these are very impressive results!

--
  Bruce Momjian  <[hidden email]>        http://momjian.us
  EnterpriseDB                             http://enterprisedb.com

+ As you are, so once was I.  As I am, so you will be. +
+                      Ancient Roman grave inscription +


Reply | Threaded
Open this post in threaded view
|

Re: Append with naive multiplexing of FDWs

Kyotaro Horiguchi-4
Thank you very much for the testing of the patch, Ahsan!

At Wed, 15 Jan 2020 15:41:04 -0500, Bruce Momjian <[hidden email]> wrote in
> On Tue, Jan 14, 2020 at 02:37:48PM +0500, Ahsan Hadi wrote:
> > Summary
> > The patch is pretty good, it works well when there were little data back to
> > the parent node. The patch doesn’t provide parallel FDW scan, it ensures
> > that child nodes can send data to parent in parallel but the parent can only
> > sequennly process the data from data nodes.

"Parallel scan" at the moment means multiple workers fetch unique
blocks from *one* table in an arbitrated manner. In this sense
"parallel FDW scan" means multiple local workers fetch unique bundles
of tuples from *one* foreign table, which means it is running on a
single session.  That doesn't offer an advantage.

If parallel query processing worked in worker-per-table mode,
especially on partitioned tables, maybe the current FDW would work
without much of modification. But I believe asynchronous append on
foreign tables on a single process is far resource-effective and
moderately faster than parallel append.

> > Providing there is no performance degrdation for non FDW append queries,
> > I would recomend to consider this patch as an interim soluton while we are
> > waiting for parallel FDW scan.
>
> Wow, these are very impressive results!

Thanks.

--
Kyotaro Horiguchi
NTT Open Source Software Center