BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan

apt.postgresql.org Repository Update
The following bug has been logged on the website:

Bug reference:      16040
Logged by:          Jeremy Smith
Email address:      [hidden email]
PostgreSQL version: 12.0
Operating system:   Official Docker Image, CentOS7
Description:        

I have also tried this with 11.3, 11.4, and 11.5, so this is not new in
12.0.  Here's a really basic way to reproduce this:

postgres=# BEGIN;
BEGIN
postgres=#
postgres=# -- Create a test table and some data
postgres=# CREATE TABLE test (a int);
CREATE TABLE
postgres=# INSERT INTO test SELECT generate_series(1,10);
INSERT 0 10
postgres=# alter table test set (parallel_workers = 4);
ALTER TABLE
postgres=# -- Use auto_explain to show plan of query in the function
postgres=# LOAD 'auto_explain';
LOAD
postgres=# SET auto_explain.log_analyze = on;
SET
postgres=# SET client_min_messages = log;
SET
postgres=# SET auto_explain.log_nested_statements = on;
SET
postgres=# SET auto_explain.log_min_duration = 0;
SET
postgres=# -- Set parallel costs artificially low, for demonstration
purposes
postgres=# set parallel_tuple_cost = 0;
SET
postgres=# set parallel_setup_cost = 0;
SET
postgres=# set max_parallel_workers_per_gather = 4;
SET
postgres=# -- Normal query will use 4 workers
postgres=# SELECT test.a, count(*) FROM test GROUP BY test.a;
LOG:  duration: 19.280 ms  plan:
Query Text: SELECT test.a, count(*) FROM test GROUP BY test.a;
Finalize HashAggregate  (cost=25.56..27.56 rows=200 width=12) (actual
time=16.649..16.795 rows=10 loops=1)
  Group Key: a
  ->  Gather  (cost=19.56..21.56 rows=800 width=12) (actual
time=2.853..18.744 rows=10 loops=1)
        Workers Planned: 4
        Workers Launched: 4
        ->  Partial HashAggregate  (cost=19.56..21.56 rows=200 width=12)
(actual time=0.493..0.519 rows=2 loops=5)
              Group Key: a
              ->  Parallel Seq Scan on test  (cost=0.00..16.38 rows=638
width=4) (actual time=0.009..0.083 rows=2 loops=5)
 a  | count
----+-------
  9 |     1
  3 |     1
  5 |     1
  4 |     1
 10 |     1
  6 |     1
  2 |     1
  7 |     1
  1 |     1
  8 |     1
(10 rows)

postgres=#
postgres=# CREATE OR REPLACE FUNCTION test_count()
postgres-#   RETURNS TABLE (a int, n bigint) AS
postgres-#   $$
postgres$#     BEGIN
postgres$#       RETURN QUERY SELECT test.a, count(*) FROM test GROUP BY
test.a;
postgres$#     END;
postgres$#   $$
postgres-# LANGUAGE PLPGSQL;
CREATE FUNCTION
postgres=#
postgres=# -- This query will not use parallel workers
postgres=# SELECT * FROM test_count();
LOG:  duration: 0.437 ms  plan:
Query Text: SELECT test.a, count(*) FROM test GROUP BY test.a
HashAggregate  (cost=48.25..50.25 rows=200 width=12) (actual
time=0.193..0.276 rows=10 loops=1)
  Group Key: a
  ->  Seq Scan on test  (cost=0.00..35.50 rows=2550 width=4) (actual
time=0.010..0.096 rows=10 loops=1)
LOG:  duration: 1.069 ms  plan:
Query Text: SELECT * FROM test_count();
Function Scan on test_count  (cost=0.25..10.25 rows=1000 width=12) (actual
time=0.895..0.968 rows=10 loops=1)
 a  | n
----+---
  9 | 1
  3 | 1
  5 | 1
  4 | 1
 10 | 1
  6 | 1
  2 | 1
  7 | 1
  1 | 1
  8 | 1
(10 rows)

postgres=# -- A workaround for long-running queries, using CREATE TABLE,
which will run in parallel
postgres=# CREATE OR REPLACE FUNCTION test_count2()
postgres-#   RETURNS TABLE (a int, n bigint) AS
postgres-#   $$
postgres$#     BEGIN
postgres$#       CREATE TEMPORARY TABLE test_count2_temp_table AS
postgres$#         SELECT test.a, count(*) FROM test GROUP BY test.a;
postgres$#       RETURN QUERY select * from test_count2_temp_table;
postgres$#     END;
postgres$#  $$
postgres-# LANGUAGE PLPGSQL;
CREATE FUNCTION
postgres=#
postgres=# -- The CREATE TABLE AS query will use parallel workers, but the
postgres=# -- RETURN QUERY statement will not
postgres=# SELECT * FROM test_count2();
LOG:  duration: 24.139 ms  plan:
Query Text: CREATE TEMPORARY TABLE test_count2_temp_table AS
        SELECT test.a, count(*) FROM test GROUP BY test.a
Finalize HashAggregate  (cost=25.56..27.56 rows=200 width=12) (actual
time=21.819..21.896 rows=10 loops=1)
  Group Key: a
  ->  Gather  (cost=19.56..21.56 rows=800 width=12) (actual
time=0.755..22.966 rows=10 loops=1)
        Workers Planned: 4
        Workers Launched: 4
        ->  Partial HashAggregate  (cost=19.56..21.56 rows=200 width=12)
(actual time=0.105..0.148 rows=2 loops=5)
              Group Key: a
              ->  Parallel Seq Scan on test  (cost=0.00..16.38 rows=638
width=4) (actual time=0.009..0.056 rows=2 loops=5)
LOG:  duration: 0.420 ms  plan:
Query Text: select * from test_count2_temp_table
Seq Scan on test_count2_temp_table  (cost=0.00..30.40 rows=2040 width=12)
(actual time=0.014..0.305 rows=10 loops=1)
LOG:  duration: 26.118 ms  plan:
Query Text: SELECT * FROM test_count2();
Function Scan on test_count2  (cost=0.25..10.25 rows=1000 width=12) (actual
time=25.845..25.994 rows=10 loops=1)
 a  | n
----+---
  9 | 1
  3 | 1
  5 | 1
  4 | 1
 10 | 1
  6 | 1
  2 | 1
  7 | 1
  1 | 1
  8 | 1
(10 rows)



It's not obvious from the documentation
(https://www.postgresql.org/docs/12/when-can-parallel-query-be-used.html)
that this should be the case.  RETURN QUERY is not interruptible, like a
cursor or for loop.

Reply | Threaded
Open this post in threaded view
|

Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan

Tom Lane-2
PG Bug reporting form <[hidden email]> writes:
> [ $SUBJECT ]

I got around to looking at this today, and what I find is that the
problem is that exec_stmt_return_query() uses a portal (i.e. a cursor)
to read the results of the query.  That seemed like a good idea, back
in the late bronze age, because it allowed plpgsql to fetch the query
results a few rows at a time and not risk blowing out memory with a huge
SPI result.  However, the parallel-query infrastructure refuses to
parallelize when the query is being read via a cursor.

I think that the latter restriction is probably sane, because we don't
want to suspend execution of a parallel query while we've got worker
processes waiting.  And there might be some implementation restrictions
lurking under it too --- that's not a part of the code I know in any
detail.

However, there's no fundamental reason why exec_stmt_return_query has
to use a cursor.  It's going to run the query to completion immediately
anyway, and shove all the result rows into a tuplestore.  What we lack
is a way to get the SPI query to pass its results directly to a
tuplestore, without the SPITupleTable intermediary.  (Note that the
tuplestore can spill a large result to disk, whereas SPITupleTable
can't do that.)

So, attached is a draft patch to enable that.  By getting rid of the
intermediate SPITupleTable, this should improve the performance of
RETURN QUERY somewhat even without considering the possibility of
parallelizing the source query.  I've not tried to measure that though.
I've also not looked for other places that could use this new
infrastructure, but there may well be some.

One thing I'm not totally pleased about with this is adding another
SPI interface routine using the old parameter-values API (that is,
null flags as char ' '/'n').  That was the path of least resistance
given the other moving parts in pl_exec.c and spi.c, but maybe we
should try to modernize that before we set it in stone.

Another thing standing between this patch and committability is suitable
additions to the SPI documentation.  But I saw no value in writing that
before the previous point is settled.

I will go add this to the next commitfest (for v14), but I wonder
if we should try to squeeze it into v13?  This isn't the only
complaint we've gotten about non-parallelizability of RETURN QUERY.

                        regards, tom lane


diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 40be506..a23add6 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -383,7 +383,9 @@ PersistHoldablePortal(Portal portal)
  SetTuplestoreDestReceiverParams(queryDesc->dest,
  portal->holdStore,
  portal->holdContext,
- true);
+ true,
+ NULL,
+ NULL);
 
  /* Fetch the result set into the tuplestore */
  ExecutorRun(queryDesc, ForwardScanDirection, 0L, false);
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index b108168..f2b698f 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -60,7 +60,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
 
 static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
   Snapshot snapshot, Snapshot crosscheck_snapshot,
-  bool read_only, bool fire_triggers, uint64 tcount);
+  bool read_only, bool fire_triggers, uint64 tcount,
+  DestReceiver *caller_dest);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
  Datum *Values, const char *Nulls);
@@ -513,7 +514,7 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
  res = _SPI_execute_plan(&plan, NULL,
  InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
 
  _SPI_end_call(true);
  return res;
@@ -547,7 +548,7 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
  _SPI_convert_params(plan->nargs, plan->argtypes,
  Values, Nulls),
  InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
 
  _SPI_end_call(true);
  return res;
@@ -576,7 +577,34 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
 
  res = _SPI_execute_plan(plan, params,
  InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/*
+ * Execute a previously prepared plan, sending result tuples to the
+ * caller-supplied DestReceiver rather than the usual SPI output arrangements.
+ */
+int
+SPI_execute_plan_with_receiver(SPIPlanPtr plan,
+   ParamListInfo params,
+   bool read_only, long tcount,
+   DestReceiver *dest)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan, params,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, dest);
 
  _SPI_end_call(true);
  return res;
@@ -617,7 +645,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
  _SPI_convert_params(plan->nargs, plan->argtypes,
  Values, Nulls),
  snapshot, crosscheck_snapshot,
- read_only, fire_triggers, tcount);
+ read_only, fire_triggers, tcount, NULL);
 
  _SPI_end_call(true);
  return res;
@@ -664,7 +692,56 @@ SPI_execute_with_args(const char *src,
 
  res = _SPI_execute_plan(&plan, paramLI,
  InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/*
+ * SPI_execute_with_receiver -- plan and execute a query with arguments
+ *
+ * This is the same as SPI_execute_with_args except that we send result tuples
+ * to the caller-supplied DestReceiver rather than the usual SPI output
+ * arrangements.
+ */
+int
+SPI_execute_with_receiver(const char *src,
+  int nargs, Oid *argtypes,
+  Datum *Values, const char *Nulls,
+  bool read_only, long tcount,
+  DestReceiver *dest)
+{
+ int res;
+ _SPI_plan plan;
+ ParamListInfo paramLI;
+
+ if (src == NULL || nargs < 0 || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ if (nargs > 0 && (argtypes == NULL || Values == NULL))
+ return SPI_ERROR_PARAM;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = CURSOR_OPT_PARALLEL_OK;
+ plan.nargs = nargs;
+ plan.argtypes = argtypes;
+ plan.parserSetup = NULL;
+ plan.parserSetupArg = NULL;
+
+ paramLI = _SPI_convert_params(nargs, argtypes,
+  Values, Nulls);
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, paramLI,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, dest);
 
  _SPI_end_call(true);
  return res;
@@ -2090,11 +2167,13 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
  * fire_triggers: true to fire AFTER triggers at end of query (normal case);
  * false means any AFTER triggers are postponed to end of outer query
  * tcount: execution tuple-count limit, or 0 for none
+ * caller_dest: DestReceiver to receive output, or NULL for normal SPI output
  */
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
   Snapshot snapshot, Snapshot crosscheck_snapshot,
-  bool read_only, bool fire_triggers, uint64 tcount)
+  bool read_only, bool fire_triggers, uint64 tcount,
+  DestReceiver *caller_dest)
 {
  int my_res = 0;
  uint64 my_processed = 0;
@@ -2228,6 +2307,12 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
  bool canSetTag = stmt->canSetTag;
  DestReceiver *dest;
 
+ /*
+ * Reset output state.  (Note that if a non-SPI receiver is used,
+ * _SPI_current->processed will stay zero, and that's what we'll
+ * report to the caller.  It's the receiver's job to count tuples
+ * in that case.)
+ */
  _SPI_current->processed = 0;
  _SPI_current->tuptable = NULL;
 
@@ -2267,7 +2352,16 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
  UpdateActiveSnapshotCommandId();
  }
 
- dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+ /*
+ * Select appropriate tuple receiver.  Output from non-canSetTag
+ * subqueries always goes to the bit bucket.
+ */
+ if (!canSetTag)
+ dest = CreateDestReceiver(DestNone);
+ else if (caller_dest)
+ dest = caller_dest;
+ else
+ dest = CreateDestReceiver(DestSPI);
 
  if (stmt->utilityStmt == NULL)
  {
@@ -2373,7 +2467,13 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
  SPI_freetuptable(_SPI_current->tuptable);
  _SPI_current->tuptable = NULL;
  }
- /* we know that the receiver doesn't need a destroy call */
+
+ /*
+ * We don't issue a destroy call to the receiver.  The SPI and
+ * None receivers would ignore it anyway, while if the caller
+ * supplied a receiver, it's not our job to destroy it.
+ */
+
  if (res < 0)
  {
  my_res = res;
@@ -2465,7 +2565,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
  switch (operation)
  {
  case CMD_SELECT:
- if (queryDesc->dest->mydest != DestSPI)
+ if (queryDesc->dest->mydest == DestNone)
  {
  /* Don't return SPI_OK_SELECT if we're discarding result */
  res = SPI_OK_UTILITY;
diff --git a/src/backend/executor/tstoreReceiver.c b/src/backend/executor/tstoreReceiver.c
index 6c2dfbc..e8172be 100644
--- a/src/backend/executor/tstoreReceiver.c
+++ b/src/backend/executor/tstoreReceiver.c
@@ -8,6 +8,8 @@
  * toasted values.  This is to support cursors WITH HOLD, which must retain
  * data even if the underlying table is dropped.
  *
+ * Also optionally, we can apply a tuple conversion map before storing.
+ *
  *
  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -21,6 +23,7 @@
 #include "postgres.h"
 
 #include "access/detoast.h"
+#include "access/tupconvert.h"
 #include "executor/tstoreReceiver.h"
 
 
@@ -31,14 +34,19 @@ typedef struct
  Tuplestorestate *tstore; /* where to put the data */
  MemoryContext cxt; /* context containing tstore */
  bool detoast; /* were we told to detoast? */
+ TupleDesc target_tupdesc; /* target tupdesc, or NULL if none */
+ const char *map_failure_msg; /* tupdesc mapping failure message */
  /* workspace: */
  Datum   *outvalues; /* values array for result tuple */
  Datum   *tofree; /* temp values to be pfree'd */
+ TupleConversionMap *tupmap; /* conversion map, if needed */
+ TupleTableSlot *mapslot; /* slot for mapped tuples */
 } TStoreState;
 
 
 static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
 static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self);
 
 
 /*
@@ -69,27 +77,46 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
  }
  }
 
+ /* Check if tuple conversion is needed */
+ if (myState->target_tupdesc)
+ myState->tupmap = convert_tuples_by_position(typeinfo,
+ myState->target_tupdesc,
+ myState->map_failure_msg);
+ else
+ myState->tupmap = NULL;
+
  /* Set up appropriate callback */
  if (needtoast)
  {
+ Assert(!myState->tupmap);
  myState->pub.receiveSlot = tstoreReceiveSlot_detoast;
  /* Create workspace */
  myState->outvalues = (Datum *)
  MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
  myState->tofree = (Datum *)
  MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
+ myState->mapslot = NULL;
+ }
+ else if (myState->tupmap)
+ {
+ myState->pub.receiveSlot = tstoreReceiveSlot_tupmap;
+ myState->outvalues = NULL;
+ myState->tofree = NULL;
+ myState->mapslot = MakeSingleTupleTableSlot(myState->target_tupdesc,
+ &TTSOpsVirtual);
  }
  else
  {
  myState->pub.receiveSlot = tstoreReceiveSlot_notoast;
  myState->outvalues = NULL;
  myState->tofree = NULL;
+ myState->mapslot = NULL;
  }
 }
 
 /*
  * Receive a tuple from the executor and store it in the tuplestore.
- * This is for the easy case where we don't have to detoast.
+ * This is for the easy case where we don't have to detoast nor map anything.
  */
 static bool
 tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
@@ -158,6 +185,21 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
 }
 
 /*
+ * Receive a tuple from the executor and store it in the tuplestore.
+ * This is for the case where we must apply a tuple conversion map.
+ */
+static bool
+tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self)
+{
+ TStoreState *myState = (TStoreState *) self;
+
+ execute_attr_map_slot(myState->tupmap->attrMap, slot, myState->mapslot);
+ tuplestore_puttupleslot(myState->tstore, myState->mapslot);
+
+ return true;
+}
+
+/*
  * Clean up at end of an executor run
  */
 static void
@@ -172,6 +214,12 @@ tstoreShutdownReceiver(DestReceiver *self)
  if (myState->tofree)
  pfree(myState->tofree);
  myState->tofree = NULL;
+ if (myState->tupmap)
+ free_conversion_map(myState->tupmap);
+ myState->tupmap = NULL;
+ if (myState->mapslot)
+ ExecDropSingleTupleTableSlot(myState->mapslot);
+ myState->mapslot = NULL;
 }
 
 /*
@@ -204,17 +252,32 @@ CreateTuplestoreDestReceiver(void)
 
 /*
  * Set parameters for a TuplestoreDestReceiver
+ *
+ * tStore: where to store the tuples
+ * tContext: memory context containing tStore
+ * detoast: forcibly detoast contained data?
+ * target_tupdesc: if not NULL, forcibly convert tuples to this rowtype
+ * map_failure_msg: error message to use if mapping to target_tupdesc fails
+ *
+ * We don't currently support both detoast and target_tupdesc at the same
+ * time, just because no existing caller needs that combination.
  */
 void
 SetTuplestoreDestReceiverParams(DestReceiver *self,
  Tuplestorestate *tStore,
  MemoryContext tContext,
- bool detoast)
+ bool detoast,
+ TupleDesc target_tupdesc,
+ const char *map_failure_msg)
 {
  TStoreState *myState = (TStoreState *) self;
 
+ Assert(!(detoast && target_tupdesc));
+
  Assert(myState->pub.mydest == DestTuplestore);
  myState->tstore = tStore;
  myState->cxt = tContext;
  myState->detoast = detoast;
+ myState->target_tupdesc = target_tupdesc;
+ myState->map_failure_msg = map_failure_msg;
 }
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 5781fb2..96ea74f 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -996,7 +996,9 @@ FillPortalStore(Portal portal, bool isTopLevel)
  SetTuplestoreDestReceiverParams(treceiver,
  portal->holdStore,
  portal->holdContext,
- false);
+ false,
+ NULL,
+ NULL);
 
  switch (portal->strategy)
  {
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 06de20a..9be0c68 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -90,6 +90,10 @@ extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
  ParamListInfo params,
  bool read_only, long tcount);
+extern int SPI_execute_plan_with_receiver(SPIPlanPtr plan,
+   ParamListInfo params,
+   bool read_only, long tcount,
+   DestReceiver *dest);
 extern int SPI_exec(const char *src, long tcount);
 extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
   long tcount);
@@ -102,6 +106,11 @@ extern int SPI_execute_with_args(const char *src,
   int nargs, Oid *argtypes,
   Datum *Values, const char *Nulls,
   bool read_only, long tcount);
+extern int SPI_execute_with_receiver(const char *src,
+  int nargs, Oid *argtypes,
+  Datum *Values, const char *Nulls,
+  bool read_only, long tcount,
+  DestReceiver *dest);
 extern SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes);
 extern SPIPlanPtr SPI_prepare_cursor(const char *src, int nargs, Oid *argtypes,
  int cursorOptions);
diff --git a/src/include/executor/tstoreReceiver.h b/src/include/executor/tstoreReceiver.h
index b2390c4..e9461cf 100644
--- a/src/include/executor/tstoreReceiver.h
+++ b/src/include/executor/tstoreReceiver.h
@@ -24,6 +24,8 @@ extern DestReceiver *CreateTuplestoreDestReceiver(void);
 extern void SetTuplestoreDestReceiverParams(DestReceiver *self,
  Tuplestorestate *tStore,
  MemoryContext tContext,
- bool detoast);
+ bool detoast,
+ TupleDesc target_tupdesc,
+ const char *map_failure_msg);
 
 #endif /* TSTORE_RECEIVER_H */
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index d3ad4fa..b5563ee 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -27,6 +27,7 @@
 #include "executor/execExpr.h"
 #include "executor/spi.h"
 #include "executor/spi_priv.h"
+#include "executor/tstoreReceiver.h"
 #include "funcapi.h"
 #include "mb/stringinfo_mb.h"
 #include "miscadmin.h"
@@ -3491,9 +3492,11 @@ static int
 exec_stmt_return_query(PLpgSQL_execstate *estate,
    PLpgSQL_stmt_return_query *stmt)
 {
- Portal portal;
- uint64 processed = 0;
- TupleConversionMap *tupmap;
+ int64 tcount;
+ DestReceiver *treceiver;
+ int rc;
+ uint64 processed;
+ MemoryContext stmt_mcontext = get_stmt_mcontext(estate);
  MemoryContext oldcontext;
 
  if (!estate->retisset)
@@ -3503,60 +3506,115 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,
 
  if (estate->tuple_store == NULL)
  exec_init_tuple_store(estate);
+ /* There might be some tuples in the tuplestore already */
+ tcount = tuplestore_tuple_count(estate->tuple_store);
+
+ /*
+ * Set up DestReceiver to transfer results directly to tuplestore,
+ * converting rowtype if necessary.  DestReceiver lives in mcontext.
+ */
+ oldcontext = MemoryContextSwitchTo(stmt_mcontext);
+ treceiver = CreateDestReceiver(DestTuplestore);
+ SetTuplestoreDestReceiverParams(treceiver,
+ estate->tuple_store,
+ estate->tuple_store_cxt,
+ false,
+ estate->tuple_store_desc,
+ gettext_noop("structure of query does not match function result type"));
+ MemoryContextSwitchTo(oldcontext);
 
  if (stmt->query != NULL)
  {
  /* static query */
- exec_run_select(estate, stmt->query, 0, &portal);
+ PLpgSQL_expr *expr = stmt->query;
+ ParamListInfo paramLI;
+
+ /*
+ * On the first call for this expression generate the plan.
+ */
+ if (expr->plan == NULL)
+ exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK, true);
+
+ /*
+ * Set up ParamListInfo to pass to executor
+ */
+ paramLI = setup_param_list(estate, expr);
+
+ /*
+ * Execute the query
+ */
+ rc = SPI_execute_plan_with_receiver(expr->plan, paramLI,
+ estate->readonly_func, 0,
+ treceiver);
+ if (rc != SPI_OK_SELECT)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("query \"%s\" is not a SELECT", expr->query)));
  }
  else
  {
  /* RETURN QUERY EXECUTE */
- Assert(stmt->dynquery != NULL);
- portal = exec_dynquery_with_params(estate, stmt->dynquery,
-   stmt->params, NULL,
-   0);
- }
+ Datum query;
+ bool isnull;
+ Oid restype;
+ int32 restypmod;
+ char   *querystr;
 
- /* Use eval_mcontext for tuple conversion work */
- oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
+ /*
+ * Evaluate the string expression after the EXECUTE keyword. Its
+ * result is the querystring we have to execute.
+ */
+ Assert(stmt->dynquery != NULL);
+ query = exec_eval_expr(estate, stmt->dynquery,
+   &isnull, &restype, &restypmod);
+ if (isnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("query string argument of EXECUTE is null")));
 
- tupmap = convert_tuples_by_position(portal->tupDesc,
- estate->tuple_store_desc,
- gettext_noop("structure of query does not match function result type"));
+ /* Get the C-String representation */
+ querystr = convert_value_to_string(estate, query, restype);
 
- while (true)
- {
- uint64 i;
+ /* copy it into the stmt_mcontext before we clean up */
+ querystr = MemoryContextStrdup(stmt_mcontext, querystr);
 
- SPI_cursor_fetch(portal, true, 50);
+ exec_eval_cleanup(estate);
 
- /* SPI will have changed CurrentMemoryContext */
- MemoryContextSwitchTo(get_eval_mcontext(estate));
+ /* Execute query, passing params if necessary */
+ if (stmt->params)
+ {
+ PreparedParamsData *ppd;
 
- if (SPI_processed == 0)
- break;
+ ppd = exec_eval_using_params(estate, stmt->params);
 
- for (i = 0; i < SPI_processed; i++)
+ rc = SPI_execute_with_receiver(querystr,
+   ppd->nargs, ppd->types,
+   ppd->values, ppd->nulls,
+   estate->readonly_func,
+   0,
+   treceiver);
+ }
+ else
  {
- HeapTuple tuple = SPI_tuptable->vals[i];
-
- if (tupmap)
- tuple = execute_attr_map_tuple(tuple, tupmap);
- tuplestore_puttuple(estate->tuple_store, tuple);
- if (tupmap)
- heap_freetuple(tuple);
- processed++;
+ rc = SPI_execute_with_receiver(querystr,
+   0, NULL, NULL, NULL,
+   estate->readonly_func,
+   0,
+   treceiver);
  }
 
- SPI_freetuptable(SPI_tuptable);
+ if (rc < 0)
+ elog(ERROR, "SPI_execute_with_receiver failed executing query \"%s\": %s",
+ querystr, SPI_result_code_string(rc));
  }
 
- SPI_freetuptable(SPI_tuptable);
- SPI_cursor_close(portal);
-
- MemoryContextSwitchTo(oldcontext);
+ /* Clean up */
+ treceiver->rDestroy(treceiver);
  exec_eval_cleanup(estate);
+ MemoryContextReset(stmt_mcontext);
+
+ /* Count how many tuples we got */
+ processed = tuplestore_tuple_count(estate->tuple_store) - tcount;
 
  estate->eval_processed = processed;
  exec_set_found(estate, processed != 0);
Reply | Threaded
Open this post in threaded view
|

Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan

Pavel Stehule


ne 22. 3. 2020 v 4:23 odesílatel Tom Lane <[hidden email]> napsal:
PG Bug reporting form <[hidden email]> writes:
> [ $SUBJECT ]

I got around to looking at this today, and what I find is that the
problem is that exec_stmt_return_query() uses a portal (i.e. a cursor)
to read the results of the query.  That seemed like a good idea, back
in the late bronze age, because it allowed plpgsql to fetch the query
results a few rows at a time and not risk blowing out memory with a huge
SPI result.  However, the parallel-query infrastructure refuses to
parallelize when the query is being read via a cursor.

I think that the latter restriction is probably sane, because we don't
want to suspend execution of a parallel query while we've got worker
processes waiting.  And there might be some implementation restrictions
lurking under it too --- that's not a part of the code I know in any
detail.

However, there's no fundamental reason why exec_stmt_return_query has
to use a cursor.  It's going to run the query to completion immediately
anyway, and shove all the result rows into a tuplestore.  What we lack
is a way to get the SPI query to pass its results directly to a
tuplestore, without the SPITupleTable intermediary.  (Note that the
tuplestore can spill a large result to disk, whereas SPITupleTable
can't do that.)

So, attached is a draft patch to enable that.  By getting rid of the
intermediate SPITupleTable, this should improve the performance of
RETURN QUERY somewhat even without considering the possibility of
parallelizing the source query.  I've not tried to measure that though.
I've also not looked for other places that could use this new
infrastructure, but there may well be some.

One thing I'm not totally pleased about with this is adding another
SPI interface routine using the old parameter-values API (that is,
null flags as char ' '/'n').  That was the path of least resistance
given the other moving parts in pl_exec.c and spi.c, but maybe we
should try to modernize that before we set it in stone.

Another thing standing between this patch and committability is suitable
additions to the SPI documentation.  But I saw no value in writing that
before the previous point is settled.

I will go add this to the next commitfest (for v14), but I wonder
if we should try to squeeze it into v13?  This isn't the only
complaint we've gotten about non-parallelizability of RETURN QUERY.

+1

Pavel


                        regards, tom lane

Reply | Threaded
Open this post in threaded view
|

Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan

Tom Lane-2
In reply to this post by Tom Lane-2
I wrote:
> ... attached is a draft patch to enable that.  By getting rid of the
> intermediate SPITupleTable, this should improve the performance of
> RETURN QUERY somewhat even without considering the possibility of
> parallelizing the source query.  I've not tried to measure that though.
> I've also not looked for other places that could use this new
> infrastructure, but there may well be some.

> One thing I'm not totally pleased about with this is adding another
> SPI interface routine using the old parameter-values API (that is,
> null flags as char ' '/'n').  That was the path of least resistance
> given the other moving parts in pl_exec.c and spi.c, but maybe we
> should try to modernize that before we set it in stone.

Here's a revised patch that does the additional legwork needed to
use ParamListInfo throughout the newly-added code.  I was able to
get pl_exec.c out of the business of using old-style null flags
entirely, which seems like a nice improvement.

> Another thing standing between this patch and committability is suitable
> additions to the SPI documentation.  But I saw no value in writing that
> before the previous point is settled.

Took care of that too.

I looked around for other places that could use this infrastructure.
It turns out that most places that are fetching via SPITupleTables
don't really have much of an issue, because they are only expecting
to get one or so tuples anyway.  There are a few where it might be
worth changing, but it's hard to get really excited because they all
have other constraints on the max amount of data.  As an example,
the various table-to-xml thingies in utils/adt/xml.c could be converted,
but they're still funneling their output into an XML string.  As long
as that has a hard limit at 1GB, it's not very realistic to expect that
you can shove huge tables into it.

A different sort of cleanup we could undertake is to deprecate and
eventually remove some of the SPI API functions.  As of this patch,
for example, SPI_cursor_open_with_args and SPI_execute_with_args are
unused anywhere in our code.  But since we document them, it's hard
to guess whether any external code is relying on them.  I suppose
deprecation would be a multi-year project in any case.

I think this is committable now.  Any objections?

                        regards, tom lane


diff --git a/doc/src/sgml/spi.sgml b/doc/src/sgml/spi.sgml
index 3b2a614929..5d97ea6f25 100644
--- a/doc/src/sgml/spi.sgml
+++ b/doc/src/sgml/spi.sgml
@@ -785,6 +785,127 @@ int SPI_execute_with_args(const char *<parameter>command</parameter>,
 
 <!-- *********************************************** -->
 
+<refentry id="spi-spi-execute-with-receiver">
+ <indexterm><primary>SPI_execute_with_receiver</primary></indexterm>
+
+ <refmeta>
+  <refentrytitle>SPI_execute_with_receiver</refentrytitle>
+  <manvolnum>3</manvolnum>
+ </refmeta>
+
+ <refnamediv>
+  <refname>SPI_execute_with_receiver</refname>
+  <refpurpose>execute a command with out-of-line parameters</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+  int SPI_execute_with_receiver(const char *<parameter>command</parameter>,
+                                ParamListInfo <parameter>params</parameter>,
+                                bool <parameter>read_only</parameter>,
+                                long <parameter>count</parameter>,
+                                DestReceiver *<parameter>dest</parameter>)
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+   <function>SPI_execute_with_receiver</function> executes a command that might
+   include references to externally supplied parameters.  The command text
+   refers to a parameter as <literal>$<replaceable>n</replaceable></literal>,
+   and the <parameter>dest</parameter> object provides values and type
+   information for each such symbol.
+   <parameter>read_only</parameter> and <parameter>count</parameter> have
+   the same interpretation as in <function>SPI_execute</function>.
+   If <parameter>dest</parameter> is not NULL, then result tuples are passed
+   to that object as they are generated by the executor, instead of being
+   accumulated in <varname>SPI_tuptable</varname>.
+  </para>
+
+  <para>
+   The <parameter>params</parameter> object should normally mark each
+   parameter with the <literal>PARAM_FLAG_CONST</literal> flag, since
+   a one-shot plan is always used for the query.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Arguments</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><literal>const char * <parameter>command</parameter></literal></term>
+    <listitem>
+     <para>
+      command string
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>ParamListInfo <parameter>params</parameter></literal></term>
+    <listitem>
+     <para>
+      data structure containing parameter types and values; NULL if none
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>bool <parameter>read_only</parameter></literal></term>
+    <listitem>
+     <para><literal>true</literal> for read-only execution</para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>long <parameter>count</parameter></literal></term>
+    <listitem>
+     <para>
+      maximum number of rows to return,
+      or <literal>0</literal> for no limit
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>DestReceiver *<parameter>dest</parameter></literal></term>
+    <listitem>
+     <para>
+      <literal>DestReceiver</literal> object that will receive any tuples
+      emitted by the query; if NULL, tuples are returned
+      in <varname>SPI_tuptable</varname>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Return Value</title>
+
+  <para>
+   The return value is the same as for <function>SPI_execute</function>.
+  </para>
+
+  <para>
+   When <parameter>dest</parameter> is NULL,
+   <varname>SPI_processed</varname> and
+   <varname>SPI_tuptable</varname> are set as in
+   <function>SPI_execute</function>.
+   When <parameter>dest</parameter> is not NULL,
+   <varname>SPI_processed</varname> is set to zero and
+   <varname>SPI_tuptable</varname> is set to NULL.  If a tuple count
+   is required, the caller's <literal>DestReceiver</literal> object must
+   calculate it.
+  </para>
+ </refsect1>
+</refentry>
+
+<!-- *********************************************** -->
+
 <refentry id="spi-spi-prepare">
  <indexterm><primary>SPI_prepare</primary></indexterm>
 
@@ -1564,6 +1685,120 @@ int SPI_execute_plan_with_paramlist(SPIPlanPtr <parameter>plan</parameter>,
 
 <!-- *********************************************** -->
 
+<refentry id="spi-spi-execute-plan-with-receiver">
+ <indexterm><primary>SPI_execute_plan_with_receiver</primary></indexterm>
+
+ <refmeta>
+  <refentrytitle>SPI_execute_plan_with_receiver</refentrytitle>
+  <manvolnum>3</manvolnum>
+ </refmeta>
+
+ <refnamediv>
+  <refname>SPI_execute_plan_with_receiver</refname>
+  <refpurpose>execute a statement prepared by <function>SPI_prepare</function></refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+int SPI_execute_plan_with_receiver(SPIPlanPtr <parameter>plan</parameter>,
+                                   ParamListInfo <parameter>params</parameter>,
+                                   bool <parameter>read_only</parameter>,
+                                   long <parameter>count</parameter>,
+                                   DestReceiver *<parameter>dest</parameter>)
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+   <function>SPI_execute_plan_with_receiver</function> executes a statement
+   prepared by <function>SPI_prepare</function>.  This function is
+   equivalent to <function>SPI_execute_plan_with_paramlist</function>
+   except that, instead of always accumulating the result tuples into a
+   <varname>SPI_tuptable</varname> structure, tuples can be passed to a
+   caller-supplied <literal>DestReceiver</literal> object as they are
+   generated by the executor.  This is particularly helpful for queries
+   that might generate many tuples, since the data can be processed
+   on-the-fly instead of being accumulated in memory.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Arguments</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><literal>SPIPlanPtr <parameter>plan</parameter></literal></term>
+    <listitem>
+     <para>
+      prepared statement (returned by <function>SPI_prepare</function>)
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>ParamListInfo <parameter>params</parameter></literal></term>
+    <listitem>
+     <para>
+      data structure containing parameter types and values; NULL if none
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>bool <parameter>read_only</parameter></literal></term>
+    <listitem>
+     <para><literal>true</literal> for read-only execution</para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>long <parameter>count</parameter></literal></term>
+    <listitem>
+     <para>
+      maximum number of rows to return,
+      or <literal>0</literal> for no limit
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>DestReceiver *<parameter>dest</parameter></literal></term>
+    <listitem>
+     <para>
+      <literal>DestReceiver</literal> object that will receive any tuples
+      emitted by the query; if NULL, this function is exactly equivalent to
+      <function>SPI_execute_plan_with_paramlist</function>
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Return Value</title>
+
+  <para>
+   The return value is the same as for <function>SPI_execute_plan</function>.
+  </para>
+
+  <para>
+   When <parameter>dest</parameter> is NULL,
+   <varname>SPI_processed</varname> and
+   <varname>SPI_tuptable</varname> are set as in
+   <function>SPI_execute_plan</function>.
+   When <parameter>dest</parameter> is not NULL,
+   <varname>SPI_processed</varname> is set to zero and
+   <varname>SPI_tuptable</varname> is set to NULL.  If a tuple count
+   is required, the caller's <literal>DestReceiver</literal> object must
+   calculate it.
+  </para>
+ </refsect1>
+</refentry>
+
+<!-- *********************************************** -->
+
 <refentry id="spi-spi-execp">
  <indexterm><primary>SPI_execp</primary></indexterm>
 
@@ -2041,6 +2276,114 @@ Portal SPI_cursor_open_with_paramlist(const char *<parameter>name</parameter>,
 
 <!-- *********************************************** -->
 
+<refentry id="spi-spi-cursor-parse-open-with-paramlist">
+ <indexterm><primary>SPI_cursor_parse_open_with_paramlist</primary></indexterm>
+
+ <refmeta>
+  <refentrytitle>SPI_cursor_parse_open_with_paramlist</refentrytitle>
+  <manvolnum>3</manvolnum>
+ </refmeta>
+
+ <refnamediv>
+  <refname>SPI_cursor_parse_open_with_paramlist</refname>
+  <refpurpose>set up a cursor using a query and parameters</refpurpose>
+ </refnamediv>
+
+ <refsynopsisdiv>
+<synopsis>
+Portal SPI_cursor_parse_open_with_paramlist(const char *<parameter>name</parameter>,
+                                            const char *<parameter>command</parameter>,
+                                            ParamListInfo <parameter>params</parameter>,
+                                            bool <parameter>read_only</parameter>,
+                                            int <parameter>cursorOptions</parameter>)
+</synopsis>
+ </refsynopsisdiv>
+
+ <refsect1>
+  <title>Description</title>
+
+  <para>
+   <function>SPI_cursor_parse_open_with_paramlist</function> sets up a cursor
+   (internally, a portal) that will execute the specified query.  This
+   function is equivalent to <function>SPI_cursor_open_with_args</function>
+   except that any parameters referenced by the query are provided by
+   a <literal>ParamListInfo</literal> object, rather than in ad-hoc arrays.
+  </para>
+
+  <para>
+   The <parameter>params</parameter> object should normally mark each
+   parameter with the <literal>PARAM_FLAG_CONST</literal> flag, since
+   a one-shot plan is always used for the query.
+  </para>
+
+  <para>
+   The passed-in parameter data will be copied into the cursor's portal, so it
+   can be freed while the cursor still exists.
+  </para>
+ </refsect1>
+
+ <refsect1>
+  <title>Arguments</title>
+
+  <variablelist>
+   <varlistentry>
+    <term><literal>const char * <parameter>name</parameter></literal></term>
+    <listitem>
+     <para>
+      name for portal, or <symbol>NULL</symbol> to let the system
+      select a name
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>const char * <parameter>command</parameter></literal></term>
+    <listitem>
+     <para>
+      command string
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>ParamListInfo <parameter>params</parameter></literal></term>
+    <listitem>
+     <para>
+      data structure containing parameter types and values; NULL if none
+     </para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>bool <parameter>read_only</parameter></literal></term>
+    <listitem>
+     <para><literal>true</literal> for read-only execution</para>
+    </listitem>
+   </varlistentry>
+
+   <varlistentry>
+    <term><literal>int <parameter>cursorOptions</parameter></literal></term>
+    <listitem>
+     <para>
+      integer bit mask of cursor options; zero produces default behavior
+     </para>
+    </listitem>
+   </varlistentry>
+  </variablelist>
+ </refsect1>
+
+ <refsect1>
+  <title>Return Value</title>
+
+  <para>
+   Pointer to portal containing the cursor.  Note there is no error
+   return convention; any error will be reported via <function>elog</function>.
+  </para>
+ </refsect1>
+</refentry>
+
+<!-- *********************************************** -->
+
 <refentry id="spi-spi-cursor-find">
  <indexterm><primary>SPI_cursor_find</primary></indexterm>
 
diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 6a2c233615..e4b7483e32 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -383,7 +383,9 @@ PersistHoldablePortal(Portal portal)
  SetTuplestoreDestReceiverParams(queryDesc->dest,
  portal->holdStore,
  portal->holdContext,
- true);
+ true,
+ NULL,
+ NULL);
 
  /* Fetch the result set into the tuplestore */
  ExecutorRun(queryDesc, ForwardScanDirection, 0L, false);
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index b108168821..055ebb77ae 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -60,7 +60,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
 
 static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
   Snapshot snapshot, Snapshot crosscheck_snapshot,
-  bool read_only, bool fire_triggers, uint64 tcount);
+  bool read_only, bool fire_triggers, uint64 tcount,
+  DestReceiver *caller_dest);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
  Datum *Values, const char *Nulls);
@@ -513,7 +514,7 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
  res = _SPI_execute_plan(&plan, NULL,
  InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
 
  _SPI_end_call(true);
  return res;
@@ -547,7 +548,7 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
  _SPI_convert_params(plan->nargs, plan->argtypes,
  Values, Nulls),
  InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
 
  _SPI_end_call(true);
  return res;
@@ -576,7 +577,36 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
 
  res = _SPI_execute_plan(plan, params,
  InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/*
+ * Execute a previously prepared plan.  If dest isn't NULL, we send result
+ * tuples to the caller-supplied DestReceiver rather than through the usual
+ * SPI output arrangements.  If dest is NULL this is equivalent to
+ * SPI_execute_plan_with_paramlist.
+ */
+int
+SPI_execute_plan_with_receiver(SPIPlanPtr plan,
+   ParamListInfo params,
+   bool read_only, long tcount,
+   DestReceiver *dest)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan, params,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, dest);
 
  _SPI_end_call(true);
  return res;
@@ -617,7 +647,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
  _SPI_convert_params(plan->nargs, plan->argtypes,
  Values, Nulls),
  snapshot, crosscheck_snapshot,
- read_only, fire_triggers, tcount);
+ read_only, fire_triggers, tcount, NULL);
 
  _SPI_end_call(true);
  return res;
@@ -664,7 +694,50 @@ SPI_execute_with_args(const char *src,
 
  res = _SPI_execute_plan(&plan, paramLI,
  InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/*
+ * SPI_execute_with_receiver -- plan and execute a query with arguments
+ *
+ * This is the same as SPI_execute_with_args except that parameters are
+ * supplied through a ParamListInfo, and (if dest isn't NULL) we send
+ * result tuples to the caller-supplied DestReceiver rather than through
+ * the usual SPI output arrangements.
+ */
+int
+SPI_execute_with_receiver(const char *src,
+  ParamListInfo params,
+  bool read_only, long tcount,
+  DestReceiver *dest)
+{
+ int res;
+ _SPI_plan plan;
+
+ if (src == NULL || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = CURSOR_OPT_PARALLEL_OK;
+ if (params)
+ {
+ plan.parserSetup = params->parserSetup;
+ plan.parserSetupArg = params->parserSetupArg;
+ }
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, params,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, dest);
 
  _SPI_end_call(true);
  return res;
@@ -1303,6 +1376,49 @@ SPI_cursor_open_with_paramlist(const char *name, SPIPlanPtr plan,
  return SPI_cursor_open_internal(name, plan, params, read_only);
 }
 
+/*
+ * SPI_cursor_parse_open_with_paramlist()
+ *
+ * Same as SPI_cursor_open_with_args except that parameters (if any) are passed
+ * as a ParamListInfo, which supports dynamic parameter set determination
+ */
+Portal
+SPI_cursor_parse_open_with_paramlist(const char *name,
+ const char *src,
+ ParamListInfo params,
+ bool read_only, int cursorOptions)
+{
+ Portal result;
+ _SPI_plan plan;
+
+ if (src == NULL)
+ elog(ERROR, "SPI_cursor_parse_open_with_paramlist called with invalid arguments");
+
+ SPI_result = _SPI_begin_call(true);
+ if (SPI_result < 0)
+ elog(ERROR, "SPI_cursor_parse_open_with_paramlist called while not connected");
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = cursorOptions;
+ if (params)
+ {
+ plan.parserSetup = params->parserSetup;
+ plan.parserSetupArg = params->parserSetupArg;
+ }
+
+ _SPI_prepare_plan(src, &plan);
+
+ /* We needn't copy the plan; SPI_cursor_open_internal will do so */
+
+ result = SPI_cursor_open_internal(name, &plan, params, read_only);
+
+ /* And clean up */
+ _SPI_end_call(true);
+
+ return result;
+}
+
 
 /*
  * SPI_cursor_open_internal()
@@ -2090,11 +2206,13 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
  * fire_triggers: true to fire AFTER triggers at end of query (normal case);
  * false means any AFTER triggers are postponed to end of outer query
  * tcount: execution tuple-count limit, or 0 for none
+ * caller_dest: DestReceiver to receive output, or NULL for normal SPI output
  */
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
   Snapshot snapshot, Snapshot crosscheck_snapshot,
-  bool read_only, bool fire_triggers, uint64 tcount)
+  bool read_only, bool fire_triggers, uint64 tcount,
+  DestReceiver *caller_dest)
 {
  int my_res = 0;
  uint64 my_processed = 0;
@@ -2228,6 +2346,12 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
  bool canSetTag = stmt->canSetTag;
  DestReceiver *dest;
 
+ /*
+ * Reset output state.  (Note that if a non-SPI receiver is used,
+ * _SPI_current->processed will stay zero, and that's what we'll
+ * report to the caller.  It's the receiver's job to count tuples
+ * in that case.)
+ */
  _SPI_current->processed = 0;
  _SPI_current->tuptable = NULL;
 
@@ -2267,7 +2391,16 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
  UpdateActiveSnapshotCommandId();
  }
 
- dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+ /*
+ * Select appropriate tuple receiver.  Output from non-canSetTag
+ * subqueries always goes to the bit bucket.
+ */
+ if (!canSetTag)
+ dest = CreateDestReceiver(DestNone);
+ else if (caller_dest)
+ dest = caller_dest;
+ else
+ dest = CreateDestReceiver(DestSPI);
 
  if (stmt->utilityStmt == NULL)
  {
@@ -2373,7 +2506,13 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
  SPI_freetuptable(_SPI_current->tuptable);
  _SPI_current->tuptable = NULL;
  }
- /* we know that the receiver doesn't need a destroy call */
+
+ /*
+ * We don't issue a destroy call to the receiver.  The SPI and
+ * None receivers would ignore it anyway, while if the caller
+ * supplied a receiver, it's not our job to destroy it.
+ */
+
  if (res < 0)
  {
  my_res = res;
@@ -2465,7 +2604,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
  switch (operation)
  {
  case CMD_SELECT:
- if (queryDesc->dest->mydest != DestSPI)
+ if (queryDesc->dest->mydest == DestNone)
  {
  /* Don't return SPI_OK_SELECT if we're discarding result */
  res = SPI_OK_UTILITY;
diff --git a/src/backend/executor/tstoreReceiver.c b/src/backend/executor/tstoreReceiver.c
index 6c2dfbc1a6..e8172bedd0 100644
--- a/src/backend/executor/tstoreReceiver.c
+++ b/src/backend/executor/tstoreReceiver.c
@@ -8,6 +8,8 @@
  * toasted values.  This is to support cursors WITH HOLD, which must retain
  * data even if the underlying table is dropped.
  *
+ * Also optionally, we can apply a tuple conversion map before storing.
+ *
  *
  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -21,6 +23,7 @@
 #include "postgres.h"
 
 #include "access/detoast.h"
+#include "access/tupconvert.h"
 #include "executor/tstoreReceiver.h"
 
 
@@ -31,14 +34,19 @@ typedef struct
  Tuplestorestate *tstore; /* where to put the data */
  MemoryContext cxt; /* context containing tstore */
  bool detoast; /* were we told to detoast? */
+ TupleDesc target_tupdesc; /* target tupdesc, or NULL if none */
+ const char *map_failure_msg; /* tupdesc mapping failure message */
  /* workspace: */
  Datum   *outvalues; /* values array for result tuple */
  Datum   *tofree; /* temp values to be pfree'd */
+ TupleConversionMap *tupmap; /* conversion map, if needed */
+ TupleTableSlot *mapslot; /* slot for mapped tuples */
 } TStoreState;
 
 
 static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
 static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self);
 
 
 /*
@@ -69,27 +77,46 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
  }
  }
 
+ /* Check if tuple conversion is needed */
+ if (myState->target_tupdesc)
+ myState->tupmap = convert_tuples_by_position(typeinfo,
+ myState->target_tupdesc,
+ myState->map_failure_msg);
+ else
+ myState->tupmap = NULL;
+
  /* Set up appropriate callback */
  if (needtoast)
  {
+ Assert(!myState->tupmap);
  myState->pub.receiveSlot = tstoreReceiveSlot_detoast;
  /* Create workspace */
  myState->outvalues = (Datum *)
  MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
  myState->tofree = (Datum *)
  MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
+ myState->mapslot = NULL;
+ }
+ else if (myState->tupmap)
+ {
+ myState->pub.receiveSlot = tstoreReceiveSlot_tupmap;
+ myState->outvalues = NULL;
+ myState->tofree = NULL;
+ myState->mapslot = MakeSingleTupleTableSlot(myState->target_tupdesc,
+ &TTSOpsVirtual);
  }
  else
  {
  myState->pub.receiveSlot = tstoreReceiveSlot_notoast;
  myState->outvalues = NULL;
  myState->tofree = NULL;
+ myState->mapslot = NULL;
  }
 }
 
 /*
  * Receive a tuple from the executor and store it in the tuplestore.
- * This is for the easy case where we don't have to detoast.
+ * This is for the easy case where we don't have to detoast nor map anything.
  */
 static bool
 tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
@@ -157,6 +184,21 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
  return true;
 }
 
+/*
+ * Receive a tuple from the executor and store it in the tuplestore.
+ * This is for the case where we must apply a tuple conversion map.
+ */
+static bool
+tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self)
+{
+ TStoreState *myState = (TStoreState *) self;
+
+ execute_attr_map_slot(myState->tupmap->attrMap, slot, myState->mapslot);
+ tuplestore_puttupleslot(myState->tstore, myState->mapslot);
+
+ return true;
+}
+
 /*
  * Clean up at end of an executor run
  */
@@ -172,6 +214,12 @@ tstoreShutdownReceiver(DestReceiver *self)
  if (myState->tofree)
  pfree(myState->tofree);
  myState->tofree = NULL;
+ if (myState->tupmap)
+ free_conversion_map(myState->tupmap);
+ myState->tupmap = NULL;
+ if (myState->mapslot)
+ ExecDropSingleTupleTableSlot(myState->mapslot);
+ myState->mapslot = NULL;
 }
 
 /*
@@ -204,17 +252,32 @@ CreateTuplestoreDestReceiver(void)
 
 /*
  * Set parameters for a TuplestoreDestReceiver
+ *
+ * tStore: where to store the tuples
+ * tContext: memory context containing tStore
+ * detoast: forcibly detoast contained data?
+ * target_tupdesc: if not NULL, forcibly convert tuples to this rowtype
+ * map_failure_msg: error message to use if mapping to target_tupdesc fails
+ *
+ * We don't currently support both detoast and target_tupdesc at the same
+ * time, just because no existing caller needs that combination.
  */
 void
 SetTuplestoreDestReceiverParams(DestReceiver *self,
  Tuplestorestate *tStore,
  MemoryContext tContext,
- bool detoast)
+ bool detoast,
+ TupleDesc target_tupdesc,
+ const char *map_failure_msg)
 {
  TStoreState *myState = (TStoreState *) self;
 
+ Assert(!(detoast && target_tupdesc));
+
  Assert(myState->pub.mydest == DestTuplestore);
  myState->tstore = tStore;
  myState->cxt = tContext;
  myState->detoast = detoast;
+ myState->target_tupdesc = target_tupdesc;
+ myState->map_failure_msg = map_failure_msg;
 }
diff --git a/src/backend/nodes/params.c b/src/backend/nodes/params.c
index ed2ee6a975..1719119fc2 100644
--- a/src/backend/nodes/params.c
+++ b/src/backend/nodes/params.c
@@ -17,19 +17,27 @@
 
 #include "access/xact.h"
 #include "mb/stringinfo_mb.h"
-#include "nodes/bitmapset.h"
 #include "nodes/params.h"
+#include "parser/parse_node.h"
 #include "storage/shmem.h"
 #include "utils/datum.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 
 
+static void paramlist_parser_setup(ParseState *pstate, void *arg);
+static Node *paramlist_param_ref(ParseState *pstate, ParamRef *pref);
+
+
 /*
  * Allocate and initialize a new ParamListInfo structure.
  *
  * To make a new structure for the "dynamic" way (with hooks), pass 0 for
  * numParams and set numParams manually.
+ *
+ * A default parserSetup function is supplied automatically.  Callers may
+ * override it if they choose.  (Note that most use-cases for ParamListInfos
+ * will never use the parserSetup function anyway.)
  */
 ParamListInfo
 makeParamList(int numParams)
@@ -45,8 +53,8 @@ makeParamList(int numParams)
  retval->paramFetchArg = NULL;
  retval->paramCompile = NULL;
  retval->paramCompileArg = NULL;
- retval->parserSetup = NULL;
- retval->parserSetupArg = NULL;
+ retval->parserSetup = paramlist_parser_setup;
+ retval->parserSetupArg = (void *) retval;
  retval->paramValuesStr = NULL;
  retval->numParams = numParams;
 
@@ -102,6 +110,55 @@ copyParamList(ParamListInfo from)
  return retval;
 }
 
+
+/*
+ * Set up to parse a query containing references to parameters
+ * sourced from a ParamListInfo.
+ */
+static void
+paramlist_parser_setup(ParseState *pstate, void *arg)
+{
+ pstate->p_paramref_hook = paramlist_param_ref;
+ /* no need to use p_coerce_param_hook */
+ pstate->p_ref_hook_state = arg;
+}
+
+/*
+ * Transform a ParamRef using parameter type data from a ParamListInfo.
+ */
+static Node *
+paramlist_param_ref(ParseState *pstate, ParamRef *pref)
+{
+ ParamListInfo paramLI = (ParamListInfo) pstate->p_ref_hook_state;
+ int paramno = pref->number;
+ ParamExternData *prm;
+ ParamExternData prmdata;
+ Param   *param;
+
+ /* check parameter number is valid */
+ if (paramno <= 0 || paramno > paramLI->numParams)
+ return NULL;
+
+ /* give hook a chance in case parameter is dynamic */
+ if (paramLI->paramFetch != NULL)
+ prm = paramLI->paramFetch(paramLI, paramno, false, &prmdata);
+ else
+ prm = &paramLI->params[paramno - 1];
+
+ if (!OidIsValid(prm->ptype))
+ return NULL;
+
+ param = makeNode(Param);
+ param->paramkind = PARAM_EXTERN;
+ param->paramid = paramno;
+ param->paramtype = prm->ptype;
+ param->paramtypmod = -1;
+ param->paramcollid = get_typcollation(param->paramtype);
+ param->location = pref->location;
+
+ return (Node *) param;
+}
+
 /*
  * Estimate the amount of space required to serialize a ParamListInfo.
  */
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 5781fb2e55..96ea74f118 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -996,7 +996,9 @@ FillPortalStore(Portal portal, bool isTopLevel)
  SetTuplestoreDestReceiverParams(treceiver,
  portal->holdStore,
  portal->holdContext,
- false);
+ false,
+ NULL,
+ NULL);
 
  switch (portal->strategy)
  {
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 06de20ada5..896ec0a2ad 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -90,6 +90,10 @@ extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
  ParamListInfo params,
  bool read_only, long tcount);
+extern int SPI_execute_plan_with_receiver(SPIPlanPtr plan,
+   ParamListInfo params,
+   bool read_only, long tcount,
+   DestReceiver *dest);
 extern int SPI_exec(const char *src, long tcount);
 extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
   long tcount);
@@ -102,6 +106,10 @@ extern int SPI_execute_with_args(const char *src,
   int nargs, Oid *argtypes,
   Datum *Values, const char *Nulls,
   bool read_only, long tcount);
+extern int SPI_execute_with_receiver(const char *src,
+  ParamListInfo params,
+  bool read_only, long tcount,
+  DestReceiver *dest);
 extern SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes);
 extern SPIPlanPtr SPI_prepare_cursor(const char *src, int nargs, Oid *argtypes,
  int cursorOptions);
@@ -150,6 +158,11 @@ extern Portal SPI_cursor_open_with_args(const char *name,
  bool read_only, int cursorOptions);
 extern Portal SPI_cursor_open_with_paramlist(const char *name, SPIPlanPtr plan,
  ParamListInfo params, bool read_only);
+extern Portal SPI_cursor_parse_open_with_paramlist(const char *name,
+   const char *src,
+   ParamListInfo params,
+   bool read_only,
+   int cursorOptions);
 extern Portal SPI_cursor_find(const char *name);
 extern void SPI_cursor_fetch(Portal portal, bool forward, long count);
 extern void SPI_cursor_move(Portal portal, bool forward, long count);
diff --git a/src/include/executor/tstoreReceiver.h b/src/include/executor/tstoreReceiver.h
index b2390c4a4d..e9461cf914 100644
--- a/src/include/executor/tstoreReceiver.h
+++ b/src/include/executor/tstoreReceiver.h
@@ -24,6 +24,8 @@ extern DestReceiver *CreateTuplestoreDestReceiver(void);
 extern void SetTuplestoreDestReceiverParams(DestReceiver *self,
  Tuplestorestate *tStore,
  MemoryContext tContext,
- bool detoast);
+ bool detoast,
+ TupleDesc target_tupdesc,
+ const char *map_failure_msg);
 
 #endif /* TSTORE_RECEIVER_H */
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index 9a87cd70f1..92d930fec8 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -27,6 +27,7 @@
 #include "executor/execExpr.h"
 #include "executor/spi.h"
 #include "executor/spi_priv.h"
+#include "executor/tstoreReceiver.h"
 #include "funcapi.h"
 #include "mb/stringinfo_mb.h"
 #include "miscadmin.h"
@@ -51,14 +52,6 @@
 #include "utils/syscache.h"
 #include "utils/typcache.h"
 
-typedef struct
-{
- int nargs; /* number of arguments */
- Oid   *types; /* types of arguments */
- Datum   *values; /* evaluated argument values */
- char   *nulls; /* null markers (' '/'n' style) */
-} PreparedParamsData;
-
 /*
  * All plpgsql function executions within a single transaction share the same
  * executor EState for evaluating "simple" expressions.  Each function call
@@ -441,15 +434,15 @@ static void assign_text_var(PLpgSQL_execstate *estate, PLpgSQL_var *var,
  const char *str);
 static void assign_record_var(PLpgSQL_execstate *estate, PLpgSQL_rec *rec,
   ExpandedRecordHeader *erh);
-static PreparedParamsData *exec_eval_using_params(PLpgSQL_execstate *estate,
-  List *params);
+static ParamListInfo exec_eval_using_params(PLpgSQL_execstate *estate,
+ List *params);
 static Portal exec_dynquery_with_params(PLpgSQL_execstate *estate,
  PLpgSQL_expr *dynquery, List *params,
  const char *portalname, int cursorOptions);
 static char *format_expr_params(PLpgSQL_execstate *estate,
  const PLpgSQL_expr *expr);
 static char *format_preparedparamsdata(PLpgSQL_execstate *estate,
-   const PreparedParamsData *ppd);
+   ParamListInfo paramLI);
 
 
 /* ----------
@@ -3513,9 +3506,11 @@ static int
 exec_stmt_return_query(PLpgSQL_execstate *estate,
    PLpgSQL_stmt_return_query *stmt)
 {
- Portal portal;
- uint64 processed = 0;
- TupleConversionMap *tupmap;
+ int64 tcount;
+ DestReceiver *treceiver;
+ int rc;
+ uint64 processed;
+ MemoryContext stmt_mcontext = get_stmt_mcontext(estate);
  MemoryContext oldcontext;
 
  if (!estate->retisset)
@@ -3525,60 +3520,99 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,
 
  if (estate->tuple_store == NULL)
  exec_init_tuple_store(estate);
+ /* There might be some tuples in the tuplestore already */
+ tcount = tuplestore_tuple_count(estate->tuple_store);
+
+ /*
+ * Set up DestReceiver to transfer results directly to tuplestore,
+ * converting rowtype if necessary.  DestReceiver lives in mcontext.
+ */
+ oldcontext = MemoryContextSwitchTo(stmt_mcontext);
+ treceiver = CreateDestReceiver(DestTuplestore);
+ SetTuplestoreDestReceiverParams(treceiver,
+ estate->tuple_store,
+ estate->tuple_store_cxt,
+ false,
+ estate->tuple_store_desc,
+ gettext_noop("structure of query does not match function result type"));
+ MemoryContextSwitchTo(oldcontext);
 
  if (stmt->query != NULL)
  {
  /* static query */
- exec_run_select(estate, stmt->query, 0, &portal);
- }
- else
- {
- /* RETURN QUERY EXECUTE */
- Assert(stmt->dynquery != NULL);
- portal = exec_dynquery_with_params(estate, stmt->dynquery,
-   stmt->params, NULL,
-   0);
- }
+ PLpgSQL_expr *expr = stmt->query;
+ ParamListInfo paramLI;
 
- /* Use eval_mcontext for tuple conversion work */
- oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
+ /*
+ * On the first call for this expression generate the plan.
+ */
+ if (expr->plan == NULL)
+ exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK, true);
 
- tupmap = convert_tuples_by_position(portal->tupDesc,
- estate->tuple_store_desc,
- gettext_noop("structure of query does not match function result type"));
+ /*
+ * Set up ParamListInfo to pass to executor
+ */
+ paramLI = setup_param_list(estate, expr);
 
- while (true)
+ /*
+ * Execute the query
+ */
+ rc = SPI_execute_plan_with_receiver(expr->plan, paramLI,
+ estate->readonly_func, 0,
+ treceiver);
+ if (rc != SPI_OK_SELECT)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("query \"%s\" is not a SELECT", expr->query)));
+ }
+ else
  {
- uint64 i;
-
- SPI_cursor_fetch(portal, true, 50);
+ /* RETURN QUERY EXECUTE */
+ Datum query;
+ bool isnull;
+ Oid restype;
+ int32 restypmod;
+ char   *querystr;
 
- /* SPI will have changed CurrentMemoryContext */
- MemoryContextSwitchTo(get_eval_mcontext(estate));
+ /*
+ * Evaluate the string expression after the EXECUTE keyword. Its
+ * result is the querystring we have to execute.
+ */
+ Assert(stmt->dynquery != NULL);
+ query = exec_eval_expr(estate, stmt->dynquery,
+   &isnull, &restype, &restypmod);
+ if (isnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("query string argument of EXECUTE is null")));
 
- if (SPI_processed == 0)
- break;
+ /* Get the C-String representation */
+ querystr = convert_value_to_string(estate, query, restype);
 
- for (i = 0; i < SPI_processed; i++)
- {
- HeapTuple tuple = SPI_tuptable->vals[i];
+ /* copy it into the stmt_mcontext before we clean up */
+ querystr = MemoryContextStrdup(stmt_mcontext, querystr);
 
- if (tupmap)
- tuple = execute_attr_map_tuple(tuple, tupmap);
- tuplestore_puttuple(estate->tuple_store, tuple);
- if (tupmap)
- heap_freetuple(tuple);
- processed++;
- }
+ exec_eval_cleanup(estate);
 
- SPI_freetuptable(SPI_tuptable);
+ /* Execute query, passing params if necessary */
+ rc = SPI_execute_with_receiver(querystr,
+   exec_eval_using_params(estate,
+  stmt->params),
+   estate->readonly_func,
+   0,
+   treceiver);
+ if (rc < 0)
+ elog(ERROR, "SPI_execute_with_receiver failed executing query \"%s\": %s",
+ querystr, SPI_result_code_string(rc));
  }
 
- SPI_freetuptable(SPI_tuptable);
- SPI_cursor_close(portal);
-
- MemoryContextSwitchTo(oldcontext);
+ /* Clean up */
+ treceiver->rDestroy(treceiver);
  exec_eval_cleanup(estate);
+ MemoryContextReset(stmt_mcontext);
+
+ /* Count how many tuples we got */
+ processed = tuplestore_tuple_count(estate->tuple_store) - tcount;
 
  estate->eval_processed = processed;
  exec_set_found(estate, processed != 0);
@@ -4344,7 +4378,7 @@ exec_stmt_dynexecute(PLpgSQL_execstate *estate,
  int32 restypmod;
  char   *querystr;
  int exec_res;
- PreparedParamsData *ppd = NULL;
+ ParamListInfo paramLI;
  MemoryContext stmt_mcontext = get_stmt_mcontext(estate);
 
  /*
@@ -4368,16 +4402,9 @@ exec_stmt_dynexecute(PLpgSQL_execstate *estate,
  /*
  * Execute the query without preparing a saved plan.
  */
- if (stmt->params)
- {
- ppd = exec_eval_using_params(estate, stmt->params);
- exec_res = SPI_execute_with_args(querystr,
- ppd->nargs, ppd->types,
- ppd->values, ppd->nulls,
- estate->readonly_func, 0);
- }
- else
- exec_res = SPI_execute(querystr, estate->readonly_func, 0);
+ paramLI = exec_eval_using_params(estate, stmt->params);
+ exec_res = SPI_execute_with_receiver(querystr, paramLI,
+ estate->readonly_func, 0, NULL);
 
  switch (exec_res)
  {
@@ -4429,7 +4456,7 @@ exec_stmt_dynexecute(PLpgSQL_execstate *estate,
  break;
 
  default:
- elog(ERROR, "SPI_execute failed executing query \"%s\": %s",
+ elog(ERROR, "SPI_execute_with_receiver failed executing query \"%s\": %s",
  querystr, SPI_result_code_string(exec_res));
  break;
  }
@@ -4465,7 +4492,7 @@ exec_stmt_dynexecute(PLpgSQL_execstate *estate,
  char   *errdetail;
 
  if (estate->func->print_strict_params)
- errdetail = format_preparedparamsdata(estate, ppd);
+ errdetail = format_preparedparamsdata(estate, paramLI);
  else
  errdetail = NULL;
 
@@ -4484,7 +4511,7 @@ exec_stmt_dynexecute(PLpgSQL_execstate *estate,
  char   *errdetail;
 
  if (estate->func->print_strict_params)
- errdetail = format_preparedparamsdata(estate, ppd);
+ errdetail = format_preparedparamsdata(estate, paramLI);
  else
  errdetail = NULL;
 
@@ -6308,9 +6335,9 @@ exec_eval_simple_expr(PLpgSQL_execstate *estate,
 /*
  * Create a ParamListInfo to pass to SPI
  *
- * We use a single ParamListInfo struct for all SPI calls made from this
- * estate; it contains no per-param data, just hook functions, so it's
- * effectively read-only for SPI.
+ * We use a single ParamListInfo struct for all SPI calls made to evaluate
+ * PLpgSQL_exprs in this estate.  It contains no per-param data, just hook
+ * functions, so it's effectively read-only for SPI.
  *
  * An exception from pure read-only-ness is that the parserSetupArg points
  * to the specific PLpgSQL_expr being evaluated.  This is not an issue for
@@ -8575,65 +8602,68 @@ assign_record_var(PLpgSQL_execstate *estate, PLpgSQL_rec *rec,
  * The result data structure is created in the stmt_mcontext, and should
  * be freed by resetting that context.
  */
-static PreparedParamsData *
+static ParamListInfo
 exec_eval_using_params(PLpgSQL_execstate *estate, List *params)
 {
- PreparedParamsData *ppd;
- MemoryContext stmt_mcontext = get_stmt_mcontext(estate);
+ ParamListInfo paramLI;
  int nargs;
+ MemoryContext stmt_mcontext;
+ MemoryContext oldcontext;
  int i;
  ListCell   *lc;
 
- ppd = (PreparedParamsData *)
- MemoryContextAlloc(stmt_mcontext, sizeof(PreparedParamsData));
- nargs = list_length(params);
+ /* Fast path for no parameters: we can just return NULL */
+ if (params == NIL)
+ return NULL;
 
- ppd->nargs = nargs;
- ppd->types = (Oid *)
- MemoryContextAlloc(stmt_mcontext, nargs * sizeof(Oid));
- ppd->values = (Datum *)
- MemoryContextAlloc(stmt_mcontext, nargs * sizeof(Datum));
- ppd->nulls = (char *)
- MemoryContextAlloc(stmt_mcontext, nargs * sizeof(char));
+ nargs = list_length(params);
+ stmt_mcontext = get_stmt_mcontext(estate);
+ oldcontext = MemoryContextSwitchTo(stmt_mcontext);
+ paramLI = makeParamList(nargs);
+ MemoryContextSwitchTo(oldcontext);
 
  i = 0;
  foreach(lc, params)
  {
  PLpgSQL_expr *param = (PLpgSQL_expr *) lfirst(lc);
- bool isnull;
+ ParamExternData *prm = &paramLI->params[i];
  int32 ppdtypmod;
- MemoryContext oldcontext;
 
- ppd->values[i] = exec_eval_expr(estate, param,
- &isnull,
- &ppd->types[i],
- &ppdtypmod);
- ppd->nulls[i] = isnull ? 'n' : ' ';
+ /*
+ * Always mark params as const, since we only use the result with
+ * one-shot plans.
+ */
+ prm->pflags = PARAM_FLAG_CONST;
+
+ prm->value = exec_eval_expr(estate, param,
+ &prm->isnull,
+ &prm->ptype,
+ &ppdtypmod);
 
  oldcontext = MemoryContextSwitchTo(stmt_mcontext);
 
- if (ppd->types[i] == UNKNOWNOID)
+ if (prm->ptype == UNKNOWNOID)
  {
  /*
  * Treat 'unknown' parameters as text, since that's what most
- * people would expect. SPI_execute_with_args can coerce unknown
- * constants in a more intelligent way, but not unknown Params.
- * This code also takes care of copying into the right context.
- * Note we assume 'unknown' has the representation of C-string.
+ * people would expect.  SPI_execute* can coerce unknown constants
+ * in a more intelligent way, but not unknown Params.  This code
+ * also takes care of copying into the right context.  Note we
+ * assume 'unknown' has the representation of C-string.
  */
- ppd->types[i] = TEXTOID;
- if (!isnull)
- ppd->values[i] = CStringGetTextDatum(DatumGetCString(ppd->values[i]));
+ prm->ptype = TEXTOID;
+ if (!prm->isnull)
+ prm->value = CStringGetTextDatum(DatumGetCString(prm->value));
  }
  /* pass-by-ref non null values must be copied into stmt_mcontext */
- else if (!isnull)
+ else if (!prm->isnull)
  {
  int16 typLen;
  bool typByVal;
 
- get_typlenbyval(ppd->types[i], &typLen, &typByVal);
+ get_typlenbyval(prm->ptype, &typLen, &typByVal);
  if (!typByVal)
- ppd->values[i] = datumCopy(ppd->values[i], typByVal, typLen);
+ prm->value = datumCopy(prm->value, typByVal, typLen);
  }
 
  MemoryContextSwitchTo(oldcontext);
@@ -8643,7 +8673,7 @@ exec_eval_using_params(PLpgSQL_execstate *estate, List *params)
  i++;
  }
 
- return ppd;
+ return paramLI;
 }
 
 /*
@@ -8689,30 +8719,15 @@ exec_dynquery_with_params(PLpgSQL_execstate *estate,
 
  /*
  * Open an implicit cursor for the query.  We use
- * SPI_cursor_open_with_args even when there are no params, because this
- * avoids making and freeing one copy of the plan.
+ * SPI_cursor_parse_open_with_paramlist even when there are no params,
+ * because this avoids making and freeing one copy of the plan.
  */
- if (params)
- {
- PreparedParamsData *ppd;
-
- ppd = exec_eval_using_params(estate, params);
- portal = SPI_cursor_open_with_args(portalname,
-   querystr,
-   ppd->nargs, ppd->types,
-   ppd->values, ppd->nulls,
-   estate->readonly_func,
-   cursorOptions);
- }
- else
- {
- portal = SPI_cursor_open_with_args(portalname,
-   querystr,
-   0, NULL,
-   NULL, NULL,
-   estate->readonly_func,
-   cursorOptions);
- }
+ portal = SPI_cursor_parse_open_with_paramlist(portalname,
+  querystr,
+  exec_eval_using_params(estate,
+ params),
+  estate->readonly_func,
+  cursorOptions);
 
  if (portal == NULL)
  elog(ERROR, "could not open implicit cursor for query \"%s\": %s",
@@ -8782,37 +8797,44 @@ format_expr_params(PLpgSQL_execstate *estate,
 }
 
 /*
- * Return a formatted string with information about PreparedParamsData, or NULL
- * if there are no parameters.
+ * Return a formatted string with information about the parameter values,
+ * or NULL if there are no parameters.
  * The result is in the eval_mcontext.
  */
 static char *
 format_preparedparamsdata(PLpgSQL_execstate *estate,
-  const PreparedParamsData *ppd)
+  ParamListInfo paramLI)
 {
  int paramno;
  StringInfoData paramstr;
  MemoryContext oldcontext;
 
- if (!ppd)
+ if (!paramLI)
  return NULL;
 
  oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
 
  initStringInfo(&paramstr);
- for (paramno = 0; paramno < ppd->nargs; paramno++)
+ for (paramno = 0; paramno < paramLI->numParams; paramno++)
  {
+ ParamExternData *prm = &paramLI->params[paramno];
+
+ /*
+ * Note: for now, this is only used on ParamListInfos produced by
+ * exec_eval_using_params(), so we don't worry about invoking the
+ * paramFetch hook or skipping unused parameters.
+ */
  appendStringInfo(&paramstr, "%s$%d = ",
  paramno > 0 ? ", " : "",
  paramno + 1);
 
- if (ppd->nulls[paramno] == 'n')
+ if (prm->isnull)
  appendStringInfoString(&paramstr, "NULL");
  else
  appendStringInfoStringQuoted(&paramstr,
  convert_value_to_string(estate,
- ppd->values[paramno],
- ppd->types[paramno]),
+ prm->value,
+ prm->ptype),
  -1);
  }
 
Reply | Threaded
Open this post in threaded view
|

Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan

Robert Haas
In reply to this post by Tom Lane-2
On Sat, Mar 21, 2020 at 11:23 PM Tom Lane <[hidden email]> wrote:
> I think that the latter restriction is probably sane, because we don't
> want to suspend execution of a parallel query while we've got worker
> processes waiting.

Right.

> And there might be some implementation restrictions
> lurking under it too --- that's not a part of the code I know in any
> detail.

There are. When you EnterParallelMode(), various normally-permissible
options are restricted and will error out (e.g. updating your snapshot
or command ID). Parallel query's not safe unless you remain in
parallel mode from start to finish, but that means you can't let
control escape into code that might do arbitrary things. That in a
nutshell is why the cursor restriction is there.

This is a heck of a nice improvement. Thanks for working on it.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company