BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan

apt.postgresql.org Repository Update
The following bug has been logged on the website:

Bug reference:      16040
Logged by:          Jeremy Smith
Email address:      [hidden email]
PostgreSQL version: 12.0
Operating system:   Official Docker Image, CentOS7
Description:        

I have also tried this with 11.3, 11.4, and 11.5, so this is not new in
12.0.  Here's a really basic way to reproduce this:

postgres=# BEGIN;
BEGIN
postgres=#
postgres=# -- Create a test table and some data
postgres=# CREATE TABLE test (a int);
CREATE TABLE
postgres=# INSERT INTO test SELECT generate_series(1,10);
INSERT 0 10
postgres=# alter table test set (parallel_workers = 4);
ALTER TABLE
postgres=# -- Use auto_explain to show plan of query in the function
postgres=# LOAD 'auto_explain';
LOAD
postgres=# SET auto_explain.log_analyze = on;
SET
postgres=# SET client_min_messages = log;
SET
postgres=# SET auto_explain.log_nested_statements = on;
SET
postgres=# SET auto_explain.log_min_duration = 0;
SET
postgres=# -- Set parallel costs artificially low, for demonstration
purposes
postgres=# set parallel_tuple_cost = 0;
SET
postgres=# set parallel_setup_cost = 0;
SET
postgres=# set max_parallel_workers_per_gather = 4;
SET
postgres=# -- Normal query will use 4 workers
postgres=# SELECT test.a, count(*) FROM test GROUP BY test.a;
LOG:  duration: 19.280 ms  plan:
Query Text: SELECT test.a, count(*) FROM test GROUP BY test.a;
Finalize HashAggregate  (cost=25.56..27.56 rows=200 width=12) (actual
time=16.649..16.795 rows=10 loops=1)
  Group Key: a
  ->  Gather  (cost=19.56..21.56 rows=800 width=12) (actual
time=2.853..18.744 rows=10 loops=1)
        Workers Planned: 4
        Workers Launched: 4
        ->  Partial HashAggregate  (cost=19.56..21.56 rows=200 width=12)
(actual time=0.493..0.519 rows=2 loops=5)
              Group Key: a
              ->  Parallel Seq Scan on test  (cost=0.00..16.38 rows=638
width=4) (actual time=0.009..0.083 rows=2 loops=5)
 a  | count
----+-------
  9 |     1
  3 |     1
  5 |     1
  4 |     1
 10 |     1
  6 |     1
  2 |     1
  7 |     1
  1 |     1
  8 |     1
(10 rows)

postgres=#
postgres=# CREATE OR REPLACE FUNCTION test_count()
postgres-#   RETURNS TABLE (a int, n bigint) AS
postgres-#   $$
postgres$#     BEGIN
postgres$#       RETURN QUERY SELECT test.a, count(*) FROM test GROUP BY
test.a;
postgres$#     END;
postgres$#   $$
postgres-# LANGUAGE PLPGSQL;
CREATE FUNCTION
postgres=#
postgres=# -- This query will not use parallel workers
postgres=# SELECT * FROM test_count();
LOG:  duration: 0.437 ms  plan:
Query Text: SELECT test.a, count(*) FROM test GROUP BY test.a
HashAggregate  (cost=48.25..50.25 rows=200 width=12) (actual
time=0.193..0.276 rows=10 loops=1)
  Group Key: a
  ->  Seq Scan on test  (cost=0.00..35.50 rows=2550 width=4) (actual
time=0.010..0.096 rows=10 loops=1)
LOG:  duration: 1.069 ms  plan:
Query Text: SELECT * FROM test_count();
Function Scan on test_count  (cost=0.25..10.25 rows=1000 width=12) (actual
time=0.895..0.968 rows=10 loops=1)
 a  | n
----+---
  9 | 1
  3 | 1
  5 | 1
  4 | 1
 10 | 1
  6 | 1
  2 | 1
  7 | 1
  1 | 1
  8 | 1
(10 rows)

postgres=# -- A workaround for long-running queries, using CREATE TABLE,
which will run in parallel
postgres=# CREATE OR REPLACE FUNCTION test_count2()
postgres-#   RETURNS TABLE (a int, n bigint) AS
postgres-#   $$
postgres$#     BEGIN
postgres$#       CREATE TEMPORARY TABLE test_count2_temp_table AS
postgres$#         SELECT test.a, count(*) FROM test GROUP BY test.a;
postgres$#       RETURN QUERY select * from test_count2_temp_table;
postgres$#     END;
postgres$#  $$
postgres-# LANGUAGE PLPGSQL;
CREATE FUNCTION
postgres=#
postgres=# -- The CREATE TABLE AS query will use parallel workers, but the
postgres=# -- RETURN QUERY statement will not
postgres=# SELECT * FROM test_count2();
LOG:  duration: 24.139 ms  plan:
Query Text: CREATE TEMPORARY TABLE test_count2_temp_table AS
        SELECT test.a, count(*) FROM test GROUP BY test.a
Finalize HashAggregate  (cost=25.56..27.56 rows=200 width=12) (actual
time=21.819..21.896 rows=10 loops=1)
  Group Key: a
  ->  Gather  (cost=19.56..21.56 rows=800 width=12) (actual
time=0.755..22.966 rows=10 loops=1)
        Workers Planned: 4
        Workers Launched: 4
        ->  Partial HashAggregate  (cost=19.56..21.56 rows=200 width=12)
(actual time=0.105..0.148 rows=2 loops=5)
              Group Key: a
              ->  Parallel Seq Scan on test  (cost=0.00..16.38 rows=638
width=4) (actual time=0.009..0.056 rows=2 loops=5)
LOG:  duration: 0.420 ms  plan:
Query Text: select * from test_count2_temp_table
Seq Scan on test_count2_temp_table  (cost=0.00..30.40 rows=2040 width=12)
(actual time=0.014..0.305 rows=10 loops=1)
LOG:  duration: 26.118 ms  plan:
Query Text: SELECT * FROM test_count2();
Function Scan on test_count2  (cost=0.25..10.25 rows=1000 width=12) (actual
time=25.845..25.994 rows=10 loops=1)
 a  | n
----+---
  9 | 1
  3 | 1
  5 | 1
  4 | 1
 10 | 1
  6 | 1
  2 | 1
  7 | 1
  1 | 1
  8 | 1
(10 rows)



It's not obvious from the documentation
(https://www.postgresql.org/docs/12/when-can-parallel-query-be-used.html)
that this should be the case.  RETURN QUERY is not interruptible, like a
cursor or for loop.

Reply | Threaded
Open this post in threaded view
|

Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan

Tom Lane-2
PG Bug reporting form <[hidden email]> writes:
> [ $SUBJECT ]

I got around to looking at this today, and what I find is that the
problem is that exec_stmt_return_query() uses a portal (i.e. a cursor)
to read the results of the query.  That seemed like a good idea, back
in the late bronze age, because it allowed plpgsql to fetch the query
results a few rows at a time and not risk blowing out memory with a huge
SPI result.  However, the parallel-query infrastructure refuses to
parallelize when the query is being read via a cursor.

I think that the latter restriction is probably sane, because we don't
want to suspend execution of a parallel query while we've got worker
processes waiting.  And there might be some implementation restrictions
lurking under it too --- that's not a part of the code I know in any
detail.

However, there's no fundamental reason why exec_stmt_return_query has
to use a cursor.  It's going to run the query to completion immediately
anyway, and shove all the result rows into a tuplestore.  What we lack
is a way to get the SPI query to pass its results directly to a
tuplestore, without the SPITupleTable intermediary.  (Note that the
tuplestore can spill a large result to disk, whereas SPITupleTable
can't do that.)

So, attached is a draft patch to enable that.  By getting rid of the
intermediate SPITupleTable, this should improve the performance of
RETURN QUERY somewhat even without considering the possibility of
parallelizing the source query.  I've not tried to measure that though.
I've also not looked for other places that could use this new
infrastructure, but there may well be some.

One thing I'm not totally pleased about with this is adding another
SPI interface routine using the old parameter-values API (that is,
null flags as char ' '/'n').  That was the path of least resistance
given the other moving parts in pl_exec.c and spi.c, but maybe we
should try to modernize that before we set it in stone.

Another thing standing between this patch and committability is suitable
additions to the SPI documentation.  But I saw no value in writing that
before the previous point is settled.

I will go add this to the next commitfest (for v14), but I wonder
if we should try to squeeze it into v13?  This isn't the only
complaint we've gotten about non-parallelizability of RETURN QUERY.

                        regards, tom lane


diff --git a/src/backend/commands/portalcmds.c b/src/backend/commands/portalcmds.c
index 40be506..a23add6 100644
--- a/src/backend/commands/portalcmds.c
+++ b/src/backend/commands/portalcmds.c
@@ -383,7 +383,9 @@ PersistHoldablePortal(Portal portal)
  SetTuplestoreDestReceiverParams(queryDesc->dest,
  portal->holdStore,
  portal->holdContext,
- true);
+ true,
+ NULL,
+ NULL);
 
  /* Fetch the result set into the tuplestore */
  ExecutorRun(queryDesc, ForwardScanDirection, 0L, false);
diff --git a/src/backend/executor/spi.c b/src/backend/executor/spi.c
index b108168..f2b698f 100644
--- a/src/backend/executor/spi.c
+++ b/src/backend/executor/spi.c
@@ -60,7 +60,8 @@ static void _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan);
 
 static int _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
   Snapshot snapshot, Snapshot crosscheck_snapshot,
-  bool read_only, bool fire_triggers, uint64 tcount);
+  bool read_only, bool fire_triggers, uint64 tcount,
+  DestReceiver *caller_dest);
 
 static ParamListInfo _SPI_convert_params(int nargs, Oid *argtypes,
  Datum *Values, const char *Nulls);
@@ -513,7 +514,7 @@ SPI_execute(const char *src, bool read_only, long tcount)
 
  res = _SPI_execute_plan(&plan, NULL,
  InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
 
  _SPI_end_call(true);
  return res;
@@ -547,7 +548,7 @@ SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
  _SPI_convert_params(plan->nargs, plan->argtypes,
  Values, Nulls),
  InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
 
  _SPI_end_call(true);
  return res;
@@ -576,7 +577,34 @@ SPI_execute_plan_with_paramlist(SPIPlanPtr plan, ParamListInfo params,
 
  res = _SPI_execute_plan(plan, params,
  InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/*
+ * Execute a previously prepared plan, sending result tuples to the
+ * caller-supplied DestReceiver rather than the usual SPI output arrangements.
+ */
+int
+SPI_execute_plan_with_receiver(SPIPlanPtr plan,
+   ParamListInfo params,
+   bool read_only, long tcount,
+   DestReceiver *dest)
+{
+ int res;
+
+ if (plan == NULL || plan->magic != _SPI_PLAN_MAGIC || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ res = _SPI_execute_plan(plan, params,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, dest);
 
  _SPI_end_call(true);
  return res;
@@ -617,7 +645,7 @@ SPI_execute_snapshot(SPIPlanPtr plan,
  _SPI_convert_params(plan->nargs, plan->argtypes,
  Values, Nulls),
  snapshot, crosscheck_snapshot,
- read_only, fire_triggers, tcount);
+ read_only, fire_triggers, tcount, NULL);
 
  _SPI_end_call(true);
  return res;
@@ -664,7 +692,56 @@ SPI_execute_with_args(const char *src,
 
  res = _SPI_execute_plan(&plan, paramLI,
  InvalidSnapshot, InvalidSnapshot,
- read_only, true, tcount);
+ read_only, true, tcount, NULL);
+
+ _SPI_end_call(true);
+ return res;
+}
+
+/*
+ * SPI_execute_with_receiver -- plan and execute a query with arguments
+ *
+ * This is the same as SPI_execute_with_args except that we send result tuples
+ * to the caller-supplied DestReceiver rather than the usual SPI output
+ * arrangements.
+ */
+int
+SPI_execute_with_receiver(const char *src,
+  int nargs, Oid *argtypes,
+  Datum *Values, const char *Nulls,
+  bool read_only, long tcount,
+  DestReceiver *dest)
+{
+ int res;
+ _SPI_plan plan;
+ ParamListInfo paramLI;
+
+ if (src == NULL || nargs < 0 || tcount < 0)
+ return SPI_ERROR_ARGUMENT;
+
+ if (nargs > 0 && (argtypes == NULL || Values == NULL))
+ return SPI_ERROR_PARAM;
+
+ res = _SPI_begin_call(true);
+ if (res < 0)
+ return res;
+
+ memset(&plan, 0, sizeof(_SPI_plan));
+ plan.magic = _SPI_PLAN_MAGIC;
+ plan.cursor_options = CURSOR_OPT_PARALLEL_OK;
+ plan.nargs = nargs;
+ plan.argtypes = argtypes;
+ plan.parserSetup = NULL;
+ plan.parserSetupArg = NULL;
+
+ paramLI = _SPI_convert_params(nargs, argtypes,
+  Values, Nulls);
+
+ _SPI_prepare_oneshot_plan(src, &plan);
+
+ res = _SPI_execute_plan(&plan, paramLI,
+ InvalidSnapshot, InvalidSnapshot,
+ read_only, true, tcount, dest);
 
  _SPI_end_call(true);
  return res;
@@ -2090,11 +2167,13 @@ _SPI_prepare_oneshot_plan(const char *src, SPIPlanPtr plan)
  * fire_triggers: true to fire AFTER triggers at end of query (normal case);
  * false means any AFTER triggers are postponed to end of outer query
  * tcount: execution tuple-count limit, or 0 for none
+ * caller_dest: DestReceiver to receive output, or NULL for normal SPI output
  */
 static int
 _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
   Snapshot snapshot, Snapshot crosscheck_snapshot,
-  bool read_only, bool fire_triggers, uint64 tcount)
+  bool read_only, bool fire_triggers, uint64 tcount,
+  DestReceiver *caller_dest)
 {
  int my_res = 0;
  uint64 my_processed = 0;
@@ -2228,6 +2307,12 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
  bool canSetTag = stmt->canSetTag;
  DestReceiver *dest;
 
+ /*
+ * Reset output state.  (Note that if a non-SPI receiver is used,
+ * _SPI_current->processed will stay zero, and that's what we'll
+ * report to the caller.  It's the receiver's job to count tuples
+ * in that case.)
+ */
  _SPI_current->processed = 0;
  _SPI_current->tuptable = NULL;
 
@@ -2267,7 +2352,16 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
  UpdateActiveSnapshotCommandId();
  }
 
- dest = CreateDestReceiver(canSetTag ? DestSPI : DestNone);
+ /*
+ * Select appropriate tuple receiver.  Output from non-canSetTag
+ * subqueries always goes to the bit bucket.
+ */
+ if (!canSetTag)
+ dest = CreateDestReceiver(DestNone);
+ else if (caller_dest)
+ dest = caller_dest;
+ else
+ dest = CreateDestReceiver(DestSPI);
 
  if (stmt->utilityStmt == NULL)
  {
@@ -2373,7 +2467,13 @@ _SPI_execute_plan(SPIPlanPtr plan, ParamListInfo paramLI,
  SPI_freetuptable(_SPI_current->tuptable);
  _SPI_current->tuptable = NULL;
  }
- /* we know that the receiver doesn't need a destroy call */
+
+ /*
+ * We don't issue a destroy call to the receiver.  The SPI and
+ * None receivers would ignore it anyway, while if the caller
+ * supplied a receiver, it's not our job to destroy it.
+ */
+
  if (res < 0)
  {
  my_res = res;
@@ -2465,7 +2565,7 @@ _SPI_pquery(QueryDesc *queryDesc, bool fire_triggers, uint64 tcount)
  switch (operation)
  {
  case CMD_SELECT:
- if (queryDesc->dest->mydest != DestSPI)
+ if (queryDesc->dest->mydest == DestNone)
  {
  /* Don't return SPI_OK_SELECT if we're discarding result */
  res = SPI_OK_UTILITY;
diff --git a/src/backend/executor/tstoreReceiver.c b/src/backend/executor/tstoreReceiver.c
index 6c2dfbc..e8172be 100644
--- a/src/backend/executor/tstoreReceiver.c
+++ b/src/backend/executor/tstoreReceiver.c
@@ -8,6 +8,8 @@
  * toasted values.  This is to support cursors WITH HOLD, which must retain
  * data even if the underlying table is dropped.
  *
+ * Also optionally, we can apply a tuple conversion map before storing.
+ *
  *
  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
@@ -21,6 +23,7 @@
 #include "postgres.h"
 
 #include "access/detoast.h"
+#include "access/tupconvert.h"
 #include "executor/tstoreReceiver.h"
 
 
@@ -31,14 +34,19 @@ typedef struct
  Tuplestorestate *tstore; /* where to put the data */
  MemoryContext cxt; /* context containing tstore */
  bool detoast; /* were we told to detoast? */
+ TupleDesc target_tupdesc; /* target tupdesc, or NULL if none */
+ const char *map_failure_msg; /* tupdesc mapping failure message */
  /* workspace: */
  Datum   *outvalues; /* values array for result tuple */
  Datum   *tofree; /* temp values to be pfree'd */
+ TupleConversionMap *tupmap; /* conversion map, if needed */
+ TupleTableSlot *mapslot; /* slot for mapped tuples */
 } TStoreState;
 
 
 static bool tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self);
 static bool tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self);
+static bool tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self);
 
 
 /*
@@ -69,27 +77,46 @@ tstoreStartupReceiver(DestReceiver *self, int operation, TupleDesc typeinfo)
  }
  }
 
+ /* Check if tuple conversion is needed */
+ if (myState->target_tupdesc)
+ myState->tupmap = convert_tuples_by_position(typeinfo,
+ myState->target_tupdesc,
+ myState->map_failure_msg);
+ else
+ myState->tupmap = NULL;
+
  /* Set up appropriate callback */
  if (needtoast)
  {
+ Assert(!myState->tupmap);
  myState->pub.receiveSlot = tstoreReceiveSlot_detoast;
  /* Create workspace */
  myState->outvalues = (Datum *)
  MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
  myState->tofree = (Datum *)
  MemoryContextAlloc(myState->cxt, natts * sizeof(Datum));
+ myState->mapslot = NULL;
+ }
+ else if (myState->tupmap)
+ {
+ myState->pub.receiveSlot = tstoreReceiveSlot_tupmap;
+ myState->outvalues = NULL;
+ myState->tofree = NULL;
+ myState->mapslot = MakeSingleTupleTableSlot(myState->target_tupdesc,
+ &TTSOpsVirtual);
  }
  else
  {
  myState->pub.receiveSlot = tstoreReceiveSlot_notoast;
  myState->outvalues = NULL;
  myState->tofree = NULL;
+ myState->mapslot = NULL;
  }
 }
 
 /*
  * Receive a tuple from the executor and store it in the tuplestore.
- * This is for the easy case where we don't have to detoast.
+ * This is for the easy case where we don't have to detoast nor map anything.
  */
 static bool
 tstoreReceiveSlot_notoast(TupleTableSlot *slot, DestReceiver *self)
@@ -158,6 +185,21 @@ tstoreReceiveSlot_detoast(TupleTableSlot *slot, DestReceiver *self)
 }
 
 /*
+ * Receive a tuple from the executor and store it in the tuplestore.
+ * This is for the case where we must apply a tuple conversion map.
+ */
+static bool
+tstoreReceiveSlot_tupmap(TupleTableSlot *slot, DestReceiver *self)
+{
+ TStoreState *myState = (TStoreState *) self;
+
+ execute_attr_map_slot(myState->tupmap->attrMap, slot, myState->mapslot);
+ tuplestore_puttupleslot(myState->tstore, myState->mapslot);
+
+ return true;
+}
+
+/*
  * Clean up at end of an executor run
  */
 static void
@@ -172,6 +214,12 @@ tstoreShutdownReceiver(DestReceiver *self)
  if (myState->tofree)
  pfree(myState->tofree);
  myState->tofree = NULL;
+ if (myState->tupmap)
+ free_conversion_map(myState->tupmap);
+ myState->tupmap = NULL;
+ if (myState->mapslot)
+ ExecDropSingleTupleTableSlot(myState->mapslot);
+ myState->mapslot = NULL;
 }
 
 /*
@@ -204,17 +252,32 @@ CreateTuplestoreDestReceiver(void)
 
 /*
  * Set parameters for a TuplestoreDestReceiver
+ *
+ * tStore: where to store the tuples
+ * tContext: memory context containing tStore
+ * detoast: forcibly detoast contained data?
+ * target_tupdesc: if not NULL, forcibly convert tuples to this rowtype
+ * map_failure_msg: error message to use if mapping to target_tupdesc fails
+ *
+ * We don't currently support both detoast and target_tupdesc at the same
+ * time, just because no existing caller needs that combination.
  */
 void
 SetTuplestoreDestReceiverParams(DestReceiver *self,
  Tuplestorestate *tStore,
  MemoryContext tContext,
- bool detoast)
+ bool detoast,
+ TupleDesc target_tupdesc,
+ const char *map_failure_msg)
 {
  TStoreState *myState = (TStoreState *) self;
 
+ Assert(!(detoast && target_tupdesc));
+
  Assert(myState->pub.mydest == DestTuplestore);
  myState->tstore = tStore;
  myState->cxt = tContext;
  myState->detoast = detoast;
+ myState->target_tupdesc = target_tupdesc;
+ myState->map_failure_msg = map_failure_msg;
 }
diff --git a/src/backend/tcop/pquery.c b/src/backend/tcop/pquery.c
index 5781fb2..96ea74f 100644
--- a/src/backend/tcop/pquery.c
+++ b/src/backend/tcop/pquery.c
@@ -996,7 +996,9 @@ FillPortalStore(Portal portal, bool isTopLevel)
  SetTuplestoreDestReceiverParams(treceiver,
  portal->holdStore,
  portal->holdContext,
- false);
+ false,
+ NULL,
+ NULL);
 
  switch (portal->strategy)
  {
diff --git a/src/include/executor/spi.h b/src/include/executor/spi.h
index 06de20a..9be0c68 100644
--- a/src/include/executor/spi.h
+++ b/src/include/executor/spi.h
@@ -90,6 +90,10 @@ extern int SPI_execute_plan(SPIPlanPtr plan, Datum *Values, const char *Nulls,
 extern int SPI_execute_plan_with_paramlist(SPIPlanPtr plan,
  ParamListInfo params,
  bool read_only, long tcount);
+extern int SPI_execute_plan_with_receiver(SPIPlanPtr plan,
+   ParamListInfo params,
+   bool read_only, long tcount,
+   DestReceiver *dest);
 extern int SPI_exec(const char *src, long tcount);
 extern int SPI_execp(SPIPlanPtr plan, Datum *Values, const char *Nulls,
   long tcount);
@@ -102,6 +106,11 @@ extern int SPI_execute_with_args(const char *src,
   int nargs, Oid *argtypes,
   Datum *Values, const char *Nulls,
   bool read_only, long tcount);
+extern int SPI_execute_with_receiver(const char *src,
+  int nargs, Oid *argtypes,
+  Datum *Values, const char *Nulls,
+  bool read_only, long tcount,
+  DestReceiver *dest);
 extern SPIPlanPtr SPI_prepare(const char *src, int nargs, Oid *argtypes);
 extern SPIPlanPtr SPI_prepare_cursor(const char *src, int nargs, Oid *argtypes,
  int cursorOptions);
diff --git a/src/include/executor/tstoreReceiver.h b/src/include/executor/tstoreReceiver.h
index b2390c4..e9461cf 100644
--- a/src/include/executor/tstoreReceiver.h
+++ b/src/include/executor/tstoreReceiver.h
@@ -24,6 +24,8 @@ extern DestReceiver *CreateTuplestoreDestReceiver(void);
 extern void SetTuplestoreDestReceiverParams(DestReceiver *self,
  Tuplestorestate *tStore,
  MemoryContext tContext,
- bool detoast);
+ bool detoast,
+ TupleDesc target_tupdesc,
+ const char *map_failure_msg);
 
 #endif /* TSTORE_RECEIVER_H */
diff --git a/src/pl/plpgsql/src/pl_exec.c b/src/pl/plpgsql/src/pl_exec.c
index d3ad4fa..b5563ee 100644
--- a/src/pl/plpgsql/src/pl_exec.c
+++ b/src/pl/plpgsql/src/pl_exec.c
@@ -27,6 +27,7 @@
 #include "executor/execExpr.h"
 #include "executor/spi.h"
 #include "executor/spi_priv.h"
+#include "executor/tstoreReceiver.h"
 #include "funcapi.h"
 #include "mb/stringinfo_mb.h"
 #include "miscadmin.h"
@@ -3491,9 +3492,11 @@ static int
 exec_stmt_return_query(PLpgSQL_execstate *estate,
    PLpgSQL_stmt_return_query *stmt)
 {
- Portal portal;
- uint64 processed = 0;
- TupleConversionMap *tupmap;
+ int64 tcount;
+ DestReceiver *treceiver;
+ int rc;
+ uint64 processed;
+ MemoryContext stmt_mcontext = get_stmt_mcontext(estate);
  MemoryContext oldcontext;
 
  if (!estate->retisset)
@@ -3503,60 +3506,115 @@ exec_stmt_return_query(PLpgSQL_execstate *estate,
 
  if (estate->tuple_store == NULL)
  exec_init_tuple_store(estate);
+ /* There might be some tuples in the tuplestore already */
+ tcount = tuplestore_tuple_count(estate->tuple_store);
+
+ /*
+ * Set up DestReceiver to transfer results directly to tuplestore,
+ * converting rowtype if necessary.  DestReceiver lives in mcontext.
+ */
+ oldcontext = MemoryContextSwitchTo(stmt_mcontext);
+ treceiver = CreateDestReceiver(DestTuplestore);
+ SetTuplestoreDestReceiverParams(treceiver,
+ estate->tuple_store,
+ estate->tuple_store_cxt,
+ false,
+ estate->tuple_store_desc,
+ gettext_noop("structure of query does not match function result type"));
+ MemoryContextSwitchTo(oldcontext);
 
  if (stmt->query != NULL)
  {
  /* static query */
- exec_run_select(estate, stmt->query, 0, &portal);
+ PLpgSQL_expr *expr = stmt->query;
+ ParamListInfo paramLI;
+
+ /*
+ * On the first call for this expression generate the plan.
+ */
+ if (expr->plan == NULL)
+ exec_prepare_plan(estate, expr, CURSOR_OPT_PARALLEL_OK, true);
+
+ /*
+ * Set up ParamListInfo to pass to executor
+ */
+ paramLI = setup_param_list(estate, expr);
+
+ /*
+ * Execute the query
+ */
+ rc = SPI_execute_plan_with_receiver(expr->plan, paramLI,
+ estate->readonly_func, 0,
+ treceiver);
+ if (rc != SPI_OK_SELECT)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("query \"%s\" is not a SELECT", expr->query)));
  }
  else
  {
  /* RETURN QUERY EXECUTE */
- Assert(stmt->dynquery != NULL);
- portal = exec_dynquery_with_params(estate, stmt->dynquery,
-   stmt->params, NULL,
-   0);
- }
+ Datum query;
+ bool isnull;
+ Oid restype;
+ int32 restypmod;
+ char   *querystr;
 
- /* Use eval_mcontext for tuple conversion work */
- oldcontext = MemoryContextSwitchTo(get_eval_mcontext(estate));
+ /*
+ * Evaluate the string expression after the EXECUTE keyword. Its
+ * result is the querystring we have to execute.
+ */
+ Assert(stmt->dynquery != NULL);
+ query = exec_eval_expr(estate, stmt->dynquery,
+   &isnull, &restype, &restypmod);
+ if (isnull)
+ ereport(ERROR,
+ (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
+ errmsg("query string argument of EXECUTE is null")));
 
- tupmap = convert_tuples_by_position(portal->tupDesc,
- estate->tuple_store_desc,
- gettext_noop("structure of query does not match function result type"));
+ /* Get the C-String representation */
+ querystr = convert_value_to_string(estate, query, restype);
 
- while (true)
- {
- uint64 i;
+ /* copy it into the stmt_mcontext before we clean up */
+ querystr = MemoryContextStrdup(stmt_mcontext, querystr);
 
- SPI_cursor_fetch(portal, true, 50);
+ exec_eval_cleanup(estate);
 
- /* SPI will have changed CurrentMemoryContext */
- MemoryContextSwitchTo(get_eval_mcontext(estate));
+ /* Execute query, passing params if necessary */
+ if (stmt->params)
+ {
+ PreparedParamsData *ppd;
 
- if (SPI_processed == 0)
- break;
+ ppd = exec_eval_using_params(estate, stmt->params);
 
- for (i = 0; i < SPI_processed; i++)
+ rc = SPI_execute_with_receiver(querystr,
+   ppd->nargs, ppd->types,
+   ppd->values, ppd->nulls,
+   estate->readonly_func,
+   0,
+   treceiver);
+ }
+ else
  {
- HeapTuple tuple = SPI_tuptable->vals[i];
-
- if (tupmap)
- tuple = execute_attr_map_tuple(tuple, tupmap);
- tuplestore_puttuple(estate->tuple_store, tuple);
- if (tupmap)
- heap_freetuple(tuple);
- processed++;
+ rc = SPI_execute_with_receiver(querystr,
+   0, NULL, NULL, NULL,
+   estate->readonly_func,
+   0,
+   treceiver);
  }
 
- SPI_freetuptable(SPI_tuptable);
+ if (rc < 0)
+ elog(ERROR, "SPI_execute_with_receiver failed executing query \"%s\": %s",
+ querystr, SPI_result_code_string(rc));
  }
 
- SPI_freetuptable(SPI_tuptable);
- SPI_cursor_close(portal);
-
- MemoryContextSwitchTo(oldcontext);
+ /* Clean up */
+ treceiver->rDestroy(treceiver);
  exec_eval_cleanup(estate);
+ MemoryContextReset(stmt_mcontext);
+
+ /* Count how many tuples we got */
+ processed = tuplestore_tuple_count(estate->tuple_store) - tcount;
 
  estate->eval_processed = processed;
  exec_set_found(estate, processed != 0);
Reply | Threaded
Open this post in threaded view
|

Re: BUG #16040: PL/PGSQL RETURN QUERY statement never uses a parallel plan

Pavel Stehule


ne 22. 3. 2020 v 4:23 odesílatel Tom Lane <[hidden email]> napsal:
PG Bug reporting form <[hidden email]> writes:
> [ $SUBJECT ]

I got around to looking at this today, and what I find is that the
problem is that exec_stmt_return_query() uses a portal (i.e. a cursor)
to read the results of the query.  That seemed like a good idea, back
in the late bronze age, because it allowed plpgsql to fetch the query
results a few rows at a time and not risk blowing out memory with a huge
SPI result.  However, the parallel-query infrastructure refuses to
parallelize when the query is being read via a cursor.

I think that the latter restriction is probably sane, because we don't
want to suspend execution of a parallel query while we've got worker
processes waiting.  And there might be some implementation restrictions
lurking under it too --- that's not a part of the code I know in any
detail.

However, there's no fundamental reason why exec_stmt_return_query has
to use a cursor.  It's going to run the query to completion immediately
anyway, and shove all the result rows into a tuplestore.  What we lack
is a way to get the SPI query to pass its results directly to a
tuplestore, without the SPITupleTable intermediary.  (Note that the
tuplestore can spill a large result to disk, whereas SPITupleTable
can't do that.)

So, attached is a draft patch to enable that.  By getting rid of the
intermediate SPITupleTable, this should improve the performance of
RETURN QUERY somewhat even without considering the possibility of
parallelizing the source query.  I've not tried to measure that though.
I've also not looked for other places that could use this new
infrastructure, but there may well be some.

One thing I'm not totally pleased about with this is adding another
SPI interface routine using the old parameter-values API (that is,
null flags as char ' '/'n').  That was the path of least resistance
given the other moving parts in pl_exec.c and spi.c, but maybe we
should try to modernize that before we set it in stone.

Another thing standing between this patch and committability is suitable
additions to the SPI documentation.  But I saw no value in writing that
before the previous point is settled.

I will go add this to the next commitfest (for v14), but I wonder
if we should try to squeeze it into v13?  This isn't the only
complaint we've gotten about non-parallelizability of RETURN QUERY.

+1

Pavel


                        regards, tom lane