Hi,
The idea of this patch is to allow the leader and each worker insert the tuples in parallel if the SELECT part of the CTAS is parallelizable. Along with the parallel inserts, if the CTAS code path is allowed to do table_multi_insert()[1], then the gain we achieve is as follows: For a table with 2 integer columns, 100million tuples(more testing results are at [2]), the exec time on the HEAD is 120sec, where as with the parallelism patch proposed here and multi insert patch [1], with 3 workers and leader participation the exec time is 22sec(5.45X). With the current CTAS code which does single tuple insert(see intorel_receive()), the performance gain is limited to ~1.7X with parallelism. This is due to the fact that the workers contend more for locks on buffer pages while extending the table. So, the maximum benefit we could get for CTAS is with both parallelism and multi tuple inserts. The design: Let the planner know that the SELECT is from CTAS in createas.c so that it can set the number of tuples transferred from the workers to Gather node to 0. With this change, there are chances that the planner may choose the parallel plan. After the planning, check if the upper plan node is Gather in createas.c and mark a parallelism flag in the CTAS dest receiver. Pass the into clause, object id, command id from the leader to workers, so that each worker can create its own CTAS dest receiver. Leader inserts it's share of tuples if instructed to do, and so are workers. Each worker writes atomically it's number of inserted tuples into a shared memory variable, the leader combines this with it's own number of inserted tuples and shares to the client. Below things are still pending. Thoughts are most welcome: 1. How better we can lift the "cannot insert tuples in a parallel worker" from heap_prepare_insert() for only CTAS cases or for that matter parallel copy? How about having a variable in any of the worker global contexts and use that? Of course, we can remove this restriction entirely in case we fully allow parallelism for INSERT INTO SELECT, CTAS, and COPY.2. How to represent the parallel insert for CTAS in explain plans? The explain CTAS shows the plan for only the SELECT part. How about having some textual info along with the Gather node? ----------------------------------------------------------------------------- Gather (cost=1000.00..108738.90 rows=0 width=8) Workers Planned: 2 -> Parallel Seq Scan on t_test (cost=0.00..106748.00 rows=4954 width=8) Filter: (many < 10000) ----------------------------------------------------------------------------- 3. Need to restrict parallel inserts, if CTAS tries to create temp/global tables as the workers will not have access to those tables. Need to analyze whether to allow parallelism if CTAS has prepared statements or with no data. 4. Need to stop unnecessary parallel shared state such as tuple queue being created and shared to workers. 5. Addition of new test cases. Testing with more scenarios and different data sets, sizes, tablespaces, select into. Analysis on the 2 mismatches in write_parallel.sql regression test.Thoughts? Credits: 1. Thanks to DIlip Kumar for the main design idea and the discussions. Thanks to Vignesh for the discussions. 2. Patch development, testing is by me. 3. Thanks to the authors of table_multi_insert() in CTAS patch [1]. [1] - For table_multi_insert() in CTAS, I used an in-progress patch available at https://www.postgresql.org/message-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1%3DN1AC-Fw%40mail.gmail.com [2] - Table with 2 integer columns, 100million tuples, with leader participation,with default postgresql.conf file. All readings are of triplet form - (workers, exec time in sec, improvement). case 1: no multi inserts - (0,120,1X),(1,91,1.32X),(2,75,1.6X),(3,67,1.79X),(4,72,1.66X),(5,77,1.56),(6,83,1.44X) case 2: with multi inserts - (0,59,1X),(1,32,1.84X),(2,28,2.1X),(3,25,2.36X),(4,23,2.56X),(5,22,2.68X),(6,22,2.68X) case 3: same table but unlogged with multi inserts - (0,50,1X),(1,28,1.78X),(2,25,2X),(3,22,2.27X),(4,21,2.38X),(5,21,2.38X),(6,20,2.5X) With Regards, Bharath Rupireddy. EnterpriseDB: http://www.enterprisedb.com |
>
> [1] - For table_multi_insert() in CTAS, I used an in-progress patch available at https://www.postgresql.org/message-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1%3DN1AC-Fw%40mail.gmail.com > [2] - Table with 2 integer columns, 100million tuples, with leader participation,with default postgresql.conf file. All readings are of triplet form - (workers, exec time in sec, improvement). > case 1: no multi inserts - (0,120,1X),(1,91,1.32X),(2,75,1.6X),(3,67,1.79X),(4,72,1.66X),(5,77,1.56),(6,83,1.44X) > case 2: with multi inserts - (0,59,1X),(1,32,1.84X),(2,28,2.1X),(3,25,2.36X),(4,23,2.56X),(5,22,2.68X),(6,22,2.68X) > case 3: same table but unlogged with multi inserts - (0,50,1X),(1,28,1.78X),(2,25,2X),(3,22,2.27X),(4,21,2.38X),(5,21,2.38X),(6,20,2.5X) > I feel this enhancement could give good improvement, +1 for this. Regards, Vignesh EnterpriseDB: http://www.enterprisedb.com |
In reply to this post by Bharath Rupireddy
Hi,
On 2020-09-23 17:20:20 +0530, Bharath Rupireddy wrote: > The idea of this patch is to allow the leader and each worker insert the > tuples in parallel if the SELECT part of the CTAS is parallelizable. Cool! > The design: I think it'd be good if you could explain a bit more why you think this safe to do in the way you have done it. E.g. from a quick scroll through the patch, there's not even a comment explaining that the only reason there doesn't need to be code dealing with xid assignment because we already did the catalog changes to create the table. But how does that work for SELECT INTO? Are you prohibiting that? ... > Pass the into clause, object id, command id from the leader to > workers, so that each worker can create its own CTAS dest > receiver. Leader inserts it's share of tuples if instructed to do, and > so are workers. Each worker writes atomically it's number of inserted > tuples into a shared memory variable, the leader combines this with > it's own number of inserted tuples and shares to the client. > > Below things are still pending. Thoughts are most welcome: > 1. How better we can lift the "cannot insert tuples in a parallel worker" > from heap_prepare_insert() for only CTAS cases or for that matter parallel > copy? How about having a variable in any of the worker global contexts and > use that? Of course, we can remove this restriction entirely in case we > fully allow parallelism for INSERT INTO SELECT, CTAS, and COPY. I have mentioned before that I think it'd be good if we changed the insert APIs to have a more 'scan' like structure. I am thinking of something like TableInsertScan* table_begin_insert(Relation); table_tuple_insert(TableInsertScan *is, other, args); table_multi_insert(TableInsertScan *is, other, args); table_end_insert(TableInsertScan *); that'd then replace the BulkInsertStateData logic we have right now. But more importantly it'd allow an AM to optimize operations across multiple inserts, which is important for column stores. And for the purpose of your question, we could then have a table_insert_allow_parallel(TableInsertScan *); or an additional arg to table_begin_insert(). > 3. Need to restrict parallel inserts, if CTAS tries to create temp/global > tables as the workers will not have access to those tables. Need to analyze > whether to allow parallelism if CTAS has prepared statements or with no > data. In which case does CTAS not create a table? You definitely need to ensure that the table is created before your workers are started, and there needs to be in a different CommandId. Greetings, Andres Freund |
Thanks Andres for the comments.
On Thu, Sep 24, 2020 at 8:11 AM Andres Freund <[hidden email]> wrote: > > > The design: > > I think it'd be good if you could explain a bit more why you think this > safe to do in the way you have done it. > > E.g. from a quick scroll through the patch, there's not even a comment > explaining that the only reason there doesn't need to be code dealing > with xid assignment because we already did the catalog changes to create > the table. > Yes we do a bunch of catalog changes related to the created new table. We will have both the txn id and command id assigned when catalogue changes are being made. But, right after the table is created in the leader, the command id is incremented (CommandCounterIncrement() is called from create_ctas_internal()) whereas the txn id remains the same. The new command id is marked as GetCurrentCommandId(true); in intorel_startup, then the parallel mode is entered. The txn id and command id are serialized into parallel DSM, they are then available to all parallel workers. This is discussed in [1]. Few changes I have to make in the parallel worker code: set currentCommandIdUsed = true;, may be via a common API SetCurrentCommandIdUsedForWorker() proposed in [1] and remove the extra command id sharing from the leader to workers. I will add a few comments in the upcoming patch related to the above info. > > But how does that work for SELECT INTO? Are you prohibiting > that? ... > In case of SELECT INTO, a new table gets created and I'm not prohibiting the parallel inserts and I think we don't need to. Thoughts? > > > Below things are still pending. Thoughts are most welcome: > > 1. How better we can lift the "cannot insert tuples in a parallel worker" > > from heap_prepare_insert() for only CTAS cases or for that matter parallel > > copy? How about having a variable in any of the worker global contexts and > > use that? Of course, we can remove this restriction entirely in case we > > fully allow parallelism for INSERT INTO SELECT, CTAS, and COPY. > > And for the purpose of your question, we could then have a > table_insert_allow_parallel(TableInsertScan *); > or an additional arg to table_begin_insert(). > Removing "cannot insert tuples in a parallel worker" restriction from heap_prepare_insert() is a common problem for parallel inserts in general, i.e. parallel inserts in CTAS, parallel INSERT INTO SELECTs[1] and parallel copy[2]. It will be good if a common solution is agreed. > > > 3. Need to restrict parallel inserts, if CTAS tries to create temp/global > > tables as the workers will not have access to those tables. Need to analyze > > whether to allow parallelism if CTAS has prepared statements or with no > > data. > > In which case does CTAS not create a table? AFAICS, the table gets created in all the cases but the insertion of the data gets skipped if the user specifies "with no data" option in which case the select part is not even planned, and so the parallelism will also not be picked. > > You definitely need to > ensure that the table is created before your workers are started, and > there needs to be in a different CommandId. > Yeah, this is already being done. Table gets created in the leader(intorel_startup which gets called from dest->rStartup(dest in standard_ExecutorRun()) before entering the parallel mode. [1] https://www.postgresql.org/message-id/CAJcOf-fn1nhEtaU91NvRuA3EbvbJGACMd4_c%2BUu3XU5VMv37Aw%40mail.gmail.com [2] https://www.postgresql.org/message-id/CAA4eK1%2BkpddvvLxWm4BuG_AhVvYz8mKAEa7osxp_X0d4ZEiV%3Dg%40mail.gmail.com With Regards, Bharath Rupireddy. EnterpriseDB: http://www.enterprisedb.com |
On Mon, Sep 28, 2020 at 3:58 PM Bharath Rupireddy
<[hidden email]> wrote: > > Thanks Andres for the comments. > > On Thu, Sep 24, 2020 at 8:11 AM Andres Freund <[hidden email]> wrote: > > > > > The design: > > > > I think it'd be good if you could explain a bit more why you think this > > safe to do in the way you have done it. > > > > E.g. from a quick scroll through the patch, there's not even a comment > > explaining that the only reason there doesn't need to be code dealing > > with xid assignment because we already did the catalog changes to create > > the table. > > > > Yes we do a bunch of catalog changes related to the created new table. > We will have both the txn id and command id assigned when catalogue > changes are being made. But, right after the table is created in the > leader, the command id is incremented (CommandCounterIncrement() is > called from create_ctas_internal()) whereas the txn id remains the > same. The new command id is marked as GetCurrentCommandId(true); in > intorel_startup, then the parallel mode is entered. The txn id and > command id are serialized into parallel DSM, they are then available > to all parallel workers. This is discussed in [1]. > > Few changes I have to make in the parallel worker code: set > currentCommandIdUsed = true;, may be via a common API > SetCurrentCommandIdUsedForWorker() proposed in [1] and remove the > extra command id sharing from the leader to workers. > > I will add a few comments in the upcoming patch related to the above info. > Yes, that would be good. > > > > But how does that work for SELECT INTO? Are you prohibiting > > that? ... > > > > In case of SELECT INTO, a new table gets created and I'm not > prohibiting the parallel inserts and I think we don't need to. > So, in this case, also do we ensure that table is created before we launch the workers. If so, I think you can explain in comments about it and what you need to do that to ensure the same. While skimming through the patch, a small thing I noticed: + /* + * SELECT part of the CTAS is parallelizable, so we can make + * each parallel worker insert the tuples that are resulted + * in it's execution into the target table. + */ + if (!is_matview && + IsA(plan->planTree, Gather)) + ((DR_intorel *) dest)->is_parallel = true; + I am not sure at this stage if this is the best way to make CTAS as parallel but if so, then probably you can expand the comments a bit to say why you consider only Gather node (and that too when it is the top-most node) and why not another parallel node like GatherMerge? > Thoughts? > > > > > > Below things are still pending. Thoughts are most welcome: > > > 1. How better we can lift the "cannot insert tuples in a parallel worker" > > > from heap_prepare_insert() for only CTAS cases or for that matter parallel > > > copy? How about having a variable in any of the worker global contexts and > > > use that? Of course, we can remove this restriction entirely in case we > > > fully allow parallelism for INSERT INTO SELECT, CTAS, and COPY. > > > > And for the purpose of your question, we could then have a > > table_insert_allow_parallel(TableInsertScan *); > > or an additional arg to table_begin_insert(). > > > > Removing "cannot insert tuples in a parallel worker" restriction from > heap_prepare_insert() is a common problem for parallel inserts in > general, i.e. parallel inserts in CTAS, parallel INSERT INTO > SELECTs[1] and parallel copy[2]. It will be good if a common solution > is agreed. > Right, for now, I think you can simply remove that check from the code instead of just commenting it. We will see if there is a better check/Assert we can add there. -- With Regards, Amit Kapila. |
On Tue, Oct 6, 2020 at 10:58 AM Amit Kapila <[hidden email]> wrote:
> > > Yes we do a bunch of catalog changes related to the created new table. > > We will have both the txn id and command id assigned when catalogue > > changes are being made. But, right after the table is created in the > > leader, the command id is incremented (CommandCounterIncrement() is > > called from create_ctas_internal()) whereas the txn id remains the > > same. The new command id is marked as GetCurrentCommandId(true); in > > intorel_startup, then the parallel mode is entered. The txn id and > > command id are serialized into parallel DSM, they are then available > > to all parallel workers. This is discussed in [1]. > > > > Few changes I have to make in the parallel worker code: set > > currentCommandIdUsed = true;, may be via a common API > > SetCurrentCommandIdUsedForWorker() proposed in [1] and remove the > > extra command id sharing from the leader to workers. > > > > I will add a few comments in the upcoming patch related to the above info. > > > > Yes, that would be good. > > > > > But how does that work for SELECT INTO? Are you prohibiting > > > that? ... > > > > > > > In case of SELECT INTO, a new table gets created and I'm not > > prohibiting the parallel inserts and I think we don't need to. > > > > So, in this case, also do we ensure that table is created before we > launch the workers. If so, I think you can explain in comments about > it and what you need to do that to ensure the same. > create_ctas_internal(), then ExecInitParallelPlan() gets called which launches the workers and then the leader(if asked to do so) and the workers insert the rows. So, we don't need to do any extra work to ensure the table gets created before the workers start inserting tuples. > > While skimming through the patch, a small thing I noticed: > + /* > + * SELECT part of the CTAS is parallelizable, so we can make > + * each parallel worker insert the tuples that are resulted > + * in it's execution into the target table. > + */ > + if (!is_matview && > + IsA(plan->planTree, Gather)) > + ((DR_intorel *) dest)->is_parallel = true; > + > > I am not sure at this stage if this is the best way to make CTAS as > parallel but if so, then probably you can expand the comments a bit to > say why you consider only Gather node (and that too when it is the > top-most node) and why not another parallel node like GatherMerge? > coming from GatherMerge node of the select part in CTAS or SELECT INTO while inserting, now if parallelism is allowed, that may not be the case i.e. the order of insertion of tuples may vary. I'm not quite sure, if someone wants to use order by in the select parts of CTAS or SELECT INTO in a real world use case. Thoughts? > > Right, for now, I think you can simply remove that check from the code > instead of just commenting it. We will see if there is a better > check/Assert we can add there. > Done. I also worked on some of the open points I listed earlier in my mail. > > 3. Need to restrict parallel inserts, if CTAS tries to create temp/global tables as the workers will not have access to those tables. > Done. > > Need to analyze whether to allow parallelism if CTAS has prepared statements or with no data. > For prepared statements, the parallelism will not be picked and so is parallel insertion. For CTAS with no data option case the select part is not even planned, and so the parallelism will also not be picked. > > 4. Need to stop unnecessary parallel shared state such as tuple queue being created and shared to workers. > Done. I'm listing the things that are still pending. 1. How to represent the parallel insert for CTAS in explain plans? The explain CTAS shows the plan for only the SELECT part. How about having some textual info along with the Gather node? I'm not quite sure on this point, any suggestions are welcome. 2. Addition of new test cases. Testing with more scenarios and different data sets, sizes, tablespaces, select into. Analysis on the 2 mismatches in write_parallel.sql regression test. Attaching v2 patch, thoughts and comments are welcome. With Regards, Bharath Rupireddy. EnterpriseDB: http://www.enterprisedb.com |
On Wed, Oct 14, 2020 at 2:46 PM Bharath Rupireddy
<[hidden email]> wrote: > > On Tue, Oct 6, 2020 at 10:58 AM Amit Kapila <[hidden email]> wrote: > > > > > > While skimming through the patch, a small thing I noticed: > > + /* > > + * SELECT part of the CTAS is parallelizable, so we can make > > + * each parallel worker insert the tuples that are resulted > > + * in it's execution into the target table. > > + */ > > + if (!is_matview && > > + IsA(plan->planTree, Gather)) > > + ((DR_intorel *) dest)->is_parallel = true; > > + > > > > I am not sure at this stage if this is the best way to make CTAS as > > parallel but if so, then probably you can expand the comments a bit to > > say why you consider only Gather node (and that too when it is the > > top-most node) and why not another parallel node like GatherMerge? > > > > If somebody expects to preserve the order of the tuples that are > coming from GatherMerge node of the select part in CTAS or SELECT INTO > while inserting, now if parallelism is allowed, that may not be the > case i.e. the order of insertion of tuples may vary. I'm not quite > sure, if someone wants to use order by in the select parts of CTAS or > SELECT INTO in a real world use case. Thoughts? > I think there is no reason why one can't use ORDER BY in the statements we are talking about here. But, I think we can't enable parallelism for GatherMerge is because for that node we always need to fetch the data in the leader backend to perform the final merge phase. So, I was expecting a small comment saying something on those lines. > > > > > Need to analyze whether to allow parallelism if CTAS has prepared statements or with no data. > > > > For prepared statements, the parallelism will not be picked and so is > parallel insertion. > Hmm, I am not sure what makes you say this statement. The parallelism is enabled for prepared statements since commit 57a6a72b6b. > > I'm listing the things that are still pending. > > 1. How to represent the parallel insert for CTAS in explain plans? The > explain CTAS shows the plan for only the SELECT part. How about having > some textual info along with the Gather node? I'm not quite sure on > this point, any suggestions are welcome. > I am also not sure about this point because we don't display anything for the DDL part in explain. Can you propose by showing some example of what you have in mind? -- With Regards, Amit Kapila. |
On Wed, Oct 14, 2020 at 6:16 PM Amit Kapila <[hidden email]> wrote: > > > If somebody expects to preserve the order of the tuples that are > > coming from GatherMerge node of the select part in CTAS or SELECT INTO > > while inserting, now if parallelism is allowed, that may not be the > > case i.e. the order of insertion of tuples may vary. I'm not quite > > sure, if someone wants to use order by in the select parts of CTAS or > > SELECT INTO in a real world use case. Thoughts? > > > > I think there is no reason why one can't use ORDER BY in the > statements we are talking about here. But, I think we can't enable > parallelism for GatherMerge is because for that node we always need to > fetch the data in the leader backend to perform the final merge phase. > So, I was expecting a small comment saying something on those lines. > Sure, I will add comments in the upcoming patch. > > > For prepared statements, the parallelism will not be picked and so is > > parallel insertion. > > Hmm, I am not sure what makes you say this statement. The parallelism > is enabled for prepared statements since commit 57a6a72b6b. > Thanks for letting me know this. I misunderstood the parallelism for prepared statements. Now, I verified with a proper use case(see below), where I had a prepared statement, CTAS having EXECUTE, in this case too parallelism is picked and parallel insertion happened with the patch proposed in this thread. Do we have any problems if we allow parallel insertion for these cases? PREPARE myselect AS SELECT * FROM t1; EXPLAIN ANALYZE CREATE TABLE t1_test AS EXECUTE myselect; I think the commit 57a6a72b6b has not added any test cases, isn't it good to add one in prepare.sql or select_parallel.sql? > > > 1. How to represent the parallel insert for CTAS in explain plans? The > > explain CTAS shows the plan for only the SELECT part. How about having > > some textual info along with the Gather node? I'm not quite sure on > > this point, any suggestions are welcome. > > I am also not sure about this point because we don't display anything > for the DDL part in explain. Can you propose by showing some example > of what you have in mind? > I thought we could have something like this. ----------------------------------------------------------------------------- Gather (cost=1000.00..108738.90 rows=0 width=8) Workers Planned: 2 Parallel Insert on t_test1 -> Parallel Seq Scan on t_test (cost=0.00..106748.00 rows=4954 width=8) Filter: (many < 10000) ----------------------------------------------------------------------------- |
On Thu, Oct 15, 2020 at 9:14 AM Bharath Rupireddy
<[hidden email]> wrote: > > On Wed, Oct 14, 2020 at 6:16 PM Amit Kapila <[hidden email]> wrote: > > > > > For prepared statements, the parallelism will not be picked and so is > > > parallel insertion. > > > > Hmm, I am not sure what makes you say this statement. The parallelism > > is enabled for prepared statements since commit 57a6a72b6b. > > > > Thanks for letting me know this. I misunderstood the parallelism for prepared statements. Now, I verified with a proper use case(see below), where I had a prepared statement, CTAS having EXECUTE, in this case too parallelism is picked and parallel insertion happened with the patch proposed in this thread. Do we have any problems if we allow parallel insertion for these cases? > > PREPARE myselect AS SELECT * FROM t1; > EXPLAIN ANALYZE CREATE TABLE t1_test AS EXECUTE myselect; > > I think the commit 57a6a72b6b has not added any test cases, isn't it good to add one in prepare.sql or select_parallel.sql? > I am not sure if it is worth as this is not functionality which is too complex or there are many chances of getting it broken. > > > > > 1. How to represent the parallel insert for CTAS in explain plans? The > > > explain CTAS shows the plan for only the SELECT part. How about having > > > some textual info along with the Gather node? I'm not quite sure on > > > this point, any suggestions are welcome. > > > > I am also not sure about this point because we don't display anything > > for the DDL part in explain. Can you propose by showing some example > > of what you have in mind? > > > > I thought we could have something like this. > ----------------------------------------------------------------------------- > Gather (cost=1000.00..108738.90 rows=0 width=8) > Workers Planned: 2 Parallel Insert on t_test1 > -> Parallel Seq Scan on t_test (cost=0.00..106748.00 rows=4954 width=8) > Filter: (many < 10000) > ----------------------------------------------------------------------------- > maybe something like below: Gather (cost=1000.00..108738.90 rows=0 width=8) -> Create t_test1 -> Parallel Seq Scan on t_test I don't know what is the best thing to do here. I think for the temporary purpose you can keep something like above then once the patch is matured then we can take a separate opinion for this. -- With Regards, Amit Kapila. |
In reply to this post by Bharath Rupireddy
On 14.10.20 11:16, Bharath Rupireddy wrote:
> On Tue, Oct 6, 2020 at 10:58 AM Amit Kapila <[hidden email]> wrote: >> >>> Yes we do a bunch of catalog changes related to the created new table. >>> We will have both the txn id and command id assigned when catalogue >>> changes are being made. But, right after the table is created in the >>> leader, the command id is incremented (CommandCounterIncrement() is >>> called from create_ctas_internal()) whereas the txn id remains the >>> same. The new command id is marked as GetCurrentCommandId(true); in >>> intorel_startup, then the parallel mode is entered. The txn id and >>> command id are serialized into parallel DSM, they are then available >>> to all parallel workers. This is discussed in [1]. >>> >>> Few changes I have to make in the parallel worker code: set >>> currentCommandIdUsed = true;, may be via a common API >>> SetCurrentCommandIdUsedForWorker() proposed in [1] and remove the >>> extra command id sharing from the leader to workers. >>> >>> I will add a few comments in the upcoming patch related to the above info. >>> >> >> Yes, that would be good. >> > > Added comments. > >> >>>> But how does that work for SELECT INTO? Are you prohibiting >>>> that? ... >>>> >>> >>> In case of SELECT INTO, a new table gets created and I'm not >>> prohibiting the parallel inserts and I think we don't need to. >>> >> >> So, in this case, also do we ensure that table is created before we >> launch the workers. If so, I think you can explain in comments about >> it and what you need to do that to ensure the same. >> > > For SELECT INTO, the table gets created by the leader in > create_ctas_internal(), then ExecInitParallelPlan() gets called which > launches the workers and then the leader(if asked to do so) and the > workers insert the rows. So, we don't need to do any extra work to > ensure the table gets created before the workers start inserting > tuples. > >> >> While skimming through the patch, a small thing I noticed: >> + /* >> + * SELECT part of the CTAS is parallelizable, so we can make >> + * each parallel worker insert the tuples that are resulted >> + * in it's execution into the target table. >> + */ >> + if (!is_matview && >> + IsA(plan->planTree, Gather)) >> + ((DR_intorel *) dest)->is_parallel = true; >> + >> >> I am not sure at this stage if this is the best way to make CTAS as >> parallel but if so, then probably you can expand the comments a bit to >> say why you consider only Gather node (and that too when it is the >> top-most node) and why not another parallel node like GatherMerge? >> > > If somebody expects to preserve the order of the tuples that are > coming from GatherMerge node of the select part in CTAS or SELECT INTO > while inserting, now if parallelism is allowed, that may not be the > case i.e. the order of insertion of tuples may vary. I'm not quite > sure, if someone wants to use order by in the select parts of CTAS or > SELECT INTO in a real world use case. Thoughts? > >> >> Right, for now, I think you can simply remove that check from the code >> instead of just commenting it. We will see if there is a better >> check/Assert we can add there. >> > > Done. > > I also worked on some of the open points I listed earlier in my mail. > >> >> 3. Need to restrict parallel inserts, if CTAS tries to create temp/global tables as the workers will not have access to those tables. >> > > Done. > >> >> Need to analyze whether to allow parallelism if CTAS has prepared statements or with no data. >> > > For prepared statements, the parallelism will not be picked and so is > parallel insertion. > For CTAS with no data option case the select part is not even planned, > and so the parallelism will also not be picked. > >> >> 4. Need to stop unnecessary parallel shared state such as tuple queue being created and shared to workers. >> > > Done. > > I'm listing the things that are still pending. > > 1. How to represent the parallel insert for CTAS in explain plans? The > explain CTAS shows the plan for only the SELECT part. How about having > some textual info along with the Gather node? I'm not quite sure on > this point, any suggestions are welcome. > 2. Addition of new test cases. Testing with more scenarios and > different data sets, sizes, tablespaces, select into. Analysis on the > 2 mismatches in write_parallel.sql regression test. > > Attaching v2 patch, thoughts and comments are welcome. > > With Regards, > Bharath Rupireddy. > EnterpriseDB: http://www.enterprisedb.com > Hi, Really looking forward to this ending up in postgres as I think it's a very nice improvement. Whilst reviewing your patch I was wondering: is there a reason you did not introduce a batch insert in the destreceiver for the CTAS? For me this makes a huge difference in ingest speed as otherwise the inserts do not really scale so well as lock contention start to be a big problem. If you like I can make a patch to introduce this on top? Kind regards, Luc Swarm64 |
On Fri, Oct 16, 2020 at 11:33 AM Luc Vlaming <[hidden email]> wrote:
> > Really looking forward to this ending up in postgres as I think it's a > very nice improvement. > > Whilst reviewing your patch I was wondering: is there a reason you did > not introduce a batch insert in the destreceiver for the CTAS? For me > this makes a huge difference in ingest speed as otherwise the inserts do > not really scale so well as lock contention start to be a big problem. > If you like I can make a patch to introduce this on top? > Thanks for your interest. You are right, we can get maximum improvement if we have multi inserts in destreceiver for the CTAS on the similar lines to COPY FROM command. I specified this point in my first mail [1]. You may want to take a look at an already existing patch [2] for multi inserts, I think there are some review comments to be addressed in that patch. I would love to see the multi insert patch getting revived. [1] - https://www.postgresql.org/message-id/CALj2ACWFq6Z4_jd9RPByURB8-Y8wccQWzLf%2B0-Jg%2BKYT7ZO-Ug%40mail.gmail.com [2] - https://www.postgresql.org/message-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1%3DN1AC-Fw%40mail.gmail.com With Regards, Bharath Rupireddy. EnterpriseDB: http://www.enterprisedb.com |
On 16.10.20 08:23, Bharath Rupireddy wrote:
> On Fri, Oct 16, 2020 at 11:33 AM Luc Vlaming <[hidden email]> wrote: >> >> Really looking forward to this ending up in postgres as I think it's a >> very nice improvement. >> >> Whilst reviewing your patch I was wondering: is there a reason you did >> not introduce a batch insert in the destreceiver for the CTAS? For me >> this makes a huge difference in ingest speed as otherwise the inserts do >> not really scale so well as lock contention start to be a big problem. >> If you like I can make a patch to introduce this on top? >> > > Thanks for your interest. You are right, we can get maximum > improvement if we have multi inserts in destreceiver for the CTAS on > the similar lines to COPY FROM command. I specified this point in my > first mail [1]. You may want to take a look at an already existing > patch [2] for multi inserts, I think there are some review comments to > be addressed in that patch. I would love to see the multi insert patch > getting revived. > > [1] - https://www.postgresql.org/message-id/CALj2ACWFq6Z4_jd9RPByURB8-Y8wccQWzLf%2B0-Jg%2BKYT7ZO-Ug%40mail.gmail.com > [2] - https://www.postgresql.org/message-id/CAEET0ZG31mD5SWjTYsAt0JTLReOejPvusJorZ3kGZ1%3DN1AC-Fw%40mail.gmail.com > > With Regards, > Bharath Rupireddy. > EnterpriseDB: http://www.enterprisedb.com > Sorry had not seen that pointer in your first email. I'll first finish some other patches I'm working on and then I'll try to revive that patch. Thanks for the pointers. Kind regards, Luc Swarm64 |
In reply to this post by akapila
On Thu, Oct 15, 2020 at 3:18 PM Amit Kapila <[hidden email]> wrote: > > > > > 1. How to represent the parallel insert for CTAS in explain plans? The > > > > explain CTAS shows the plan for only the SELECT part. How about having > > > > some textual info along with the Gather node? I'm not quite sure on > > > > this point, any suggestions are welcome. > > > > > > I am also not sure about this point because we don't display anything > > > for the DDL part in explain. Can you propose by showing some example > > > of what you have in mind? > > > > I thought we could have something like this. > > ----------------------------------------------------------------------------- > > Gather (cost=1000.00..108738.90 rows=0 width=8) > > Workers Planned: 2 Parallel Insert on t_test1 > > -> Parallel Seq Scan on t_test (cost=0.00..106748.00 rows=4954 width=8) > > Filter: (many < 10000) > > ----------------------------------------------------------------------------- > > maybe something like below: > Gather (cost=1000.00..108738.90 rows=0 width=8) > -> Create t_test1 > -> Parallel Seq Scan on t_test > > I don't know what is the best thing to do here. I think for the > temporary purpose you can keep something like above then once the > patch is matured then we can take a separate opinion for this. > Agreed. Here's a snapshot of explain with the change suggested. postgres=# EXPLAIN (ANALYZE, COSTS OFF) CREATE TABLE t1_test AS SELECT * FROM t1; QUERY PLAN --------------------------------------------------------------------------------- Gather (actual time=970.524..972.913 rows=0 loops=1) -> Create t1_test Workers Planned: 2 Workers Launched: 2 -> Parallel Seq Scan on t1 (actual time=0.028..86.623 rows=333333 loops=3) Planning Time: 0.049 ms Execution Time: 973.733 ms > > I think there is no reason why one can't use ORDER BY in the > statements we are talking about here. But, I think we can't enable > parallelism for GatherMerge is because for that node we always need to > fetch the data in the leader backend to perform the final merge phase. > So, I was expecting a small comment saying something on those lines. > Added comments. > > 2. Addition of new test cases. > Added new test cases. > > Analysis on the 2 mismatches in write_parallel.sql regression test. > Done. It needed a small code change in costsize.c. Now, both make check and make check-world passes. Apart from above, a couple of other things I have finished with the v3 patch. 2. I enabled parallel inserts in case of materialized views. Hope that's fine. Attaching v3 patch herewith. I'm done with all the open points in my list. Please review the v3 patch and provide comments. |
On Mon, Oct 19, 2020 at 10:47 PM Bharath Rupireddy
<[hidden email]> wrote: > > Attaching v3 patch herewith. > > I'm done with all the open points in my list. Please review the v3 patch and provide comments. > Attaching v4 patch, rebased on the latest master 68b1a4877e. Also, added this feature to commitfest - https://commitfest.postgresql.org/31/2841/ With Regards, Bharath Rupireddy. EnterpriseDB: http://www.enterprisedb.com |
Hi,
I'm very interested in this feature, and I'm looking at the patch, here are some comments. 1. + if (!TupIsNull(outerTupleSlot)) + { + (void) node->ps.dest->receiveSlot(outerTupleSlot, node->ps.dest); + node->ps.state->es_processed++; + } + + if(TupIsNull(outerTupleSlot)) + break; + } How about the following style: if(TupIsNull(outerTupleSlot)) Break; (void) node->ps.dest->receiveSlot(outerTupleSlot, node->ps.dest); node->ps.state->es_processed++; Which looks cleaner. 2. + + if (into != NULL && + IsA(into, IntoClause)) + { The check can be replaced by ISCTAS(into). 3. + /* + * For parallelizing inserts in CTAS i.e. making each + * parallel worker inerst it's tuples, we must send + * information such as intoclause(for each worker 'inerst' looks like a typo (insert). 4. + /* Estimate space for into clause for CTAS. */ + if (ISCTAS(planstate->intoclause)) + { + intoclausestr = nodeToString(planstate->intoclause); + shm_toc_estimate_chunk(&pcxt->estimator, strlen(intoclausestr) + 1); + shm_toc_estimate_keys(&pcxt->estimator, 1); + } ... + if (intoclausestr != NULL) + { + char *shmptr = (char *)shm_toc_allocate(pcxt->toc, + strlen(intoclausestr) + 1); + strcpy(shmptr, intoclausestr); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, shmptr); + } The code here call strlen(intoclausestr) for two times, After checking the existing code in ExecInitParallelPlan, It used to store the strlen in a variable. So how about the following style: intoclause_len = strlen(intoclausestr); ... /* Store serialized intoclause. */ intoclause_space = shm_toc_allocate(pcxt->toc, intoclause_len + 1); memcpy(shmptr, intoclausestr, intoclause_len + 1); shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, intoclause_space); the code in ExecInitParallelPlan 5. + if (intoclausestr != NULL) + { + char *shmptr = (char *)shm_toc_allocate(pcxt->toc, + strlen(intoclausestr) + 1); + strcpy(shmptr, intoclausestr); + shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, shmptr); + } + /* Set up the tuple queues that the workers will write into. */ - pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); + if (intoclausestr == NULL) + pei->tqueue = ExecParallelSetupTupleQueues(pcxt, false); The two check about intoclausestr seems can be combined like: if (intoclausestr != NULL) { ... } else { ... } Best regards, houzj |
On Tue, Nov 24, 2020 at 4:43 PM Hou, Zhijie <[hidden email]> wrote:
> > I'm very interested in this feature, > and I'm looking at the patch, here are some comments. > Thanks for the review. > > How about the following style: > > if(TupIsNull(outerTupleSlot)) > Break; > > (void) node->ps.dest->receiveSlot(outerTupleSlot, node->ps.dest); > node->ps.state->es_processed++; > > Which looks cleaner. > > > The check can be replaced by ISCTAS(into). > Done. > > 'inerst' looks like a typo (insert). > Corrected. > > The code here call strlen(intoclausestr) for two times, > After checking the existing code in ExecInitParallelPlan, > It used to store the strlen in a variable. > > So how about the following style: > > intoclause_len = strlen(intoclausestr); > ... > /* Store serialized intoclause. */ > intoclause_space = shm_toc_allocate(pcxt->toc, intoclause_len + 1); > memcpy(shmptr, intoclausestr, intoclause_len + 1); > shm_toc_insert(pcxt->toc, PARALLEL_KEY_INTO_CLAUSE, intoclause_space); > > > The two check about intoclausestr seems can be combined like: > > if (intoclausestr != NULL) > { > ... > } > else > { > ... > } > Attaching v5 patch. Please consider it for further review. With Regards, Bharath Rupireddy. EnterpriseDB: http://www.enterprisedb.com |
Hi,
I have an issue about the following code: econtext = node->ps.ps_ExprContext; ResetExprContext(econtext); + if (ISCTAS(node->ps.intoclause)) + { + ExecParallelInsertInCTAS(node); + return NULL; + } /* If no projection is required, we're done. */ if (node->ps.ps_ProjInfo == NULL) return slot; /* * Form the result tuple using ExecProject(), and return it. */ econtext->ecxt_outertuple = slot; return ExecProject(node->ps.ps_ProjInfo); It seems the projection will be skipped. Is this because projection is not required in this case ? (I'm not very familiar with where the projection will be.) If projection is not required here, shall we add some comments here? Best regards, houzj |
On Thu, Nov 26, 2020 at 7:47 AM Hou, Zhijie <[hidden email]> wrote:
> > Hi, > > I have an issue about the following code: > > econtext = node->ps.ps_ExprContext; > ResetExprContext(econtext); > > + if (ISCTAS(node->ps.intoclause)) > + { > + ExecParallelInsertInCTAS(node); > + return NULL; > + } > > /* If no projection is required, we're done. */ > if (node->ps.ps_ProjInfo == NULL) > return slot; > > /* > * Form the result tuple using ExecProject(), and return it. > */ > econtext->ecxt_outertuple = slot; > return ExecProject(node->ps.ps_ProjInfo); > > It seems the projection will be skipped. > Is this because projection is not required in this case ? > (I'm not very familiar with where the projection will be.) > For parallel inserts in CTAS, I don't think we need to project the tuples being returned from the underlying plan nodes, and also we have nothing to project from the Gather node further up. The required projection will happen while the tuples are being returned from the underlying nodes and the projected tuples are being directly fed to CTAS's dest receiver intorel_receive(), from there into the created table. We don't need ExecProject again in ExecParallelInsertInCTAS(). For instance, projection will always be done when the tuple is being returned from an underlying sequential scan node(see ExecScan() --> ExecProject() and this is true for both leader and workers. In both leader and workers, we are just calling CTAS's dest receiver intorel_receive(). Thoughts? > > If projection is not required here, shall we add some comments here? > If the above point looks okay, I can add a comment. With Regards, Bharath Rupireddy. EnterpriseDB: http://www.enterprisedb.com |
Hi ,
> On Thu, Nov 26, 2020 at 7:47 AM Hou, Zhijie <[hidden email]> > wrote: > > > > Hi, > > > > I have an issue about the following code: > > > > econtext = node->ps.ps_ExprContext; > > ResetExprContext(econtext); > > > > + if (ISCTAS(node->ps.intoclause)) > > + { > > + ExecParallelInsertInCTAS(node); > > + return NULL; > > + } > > > > /* If no projection is required, we're done. */ > > if (node->ps.ps_ProjInfo == NULL) > > return slot; > > > > /* > > * Form the result tuple using ExecProject(), and return it. > > */ > > econtext->ecxt_outertuple = slot; > > return ExecProject(node->ps.ps_ProjInfo); > > > > It seems the projection will be skipped. > > Is this because projection is not required in this case ? > > (I'm not very familiar with where the projection will be.) > > > > For parallel inserts in CTAS, I don't think we need to project the tuples > being returned from the underlying plan nodes, and also we have nothing > to project from the Gather node further up. The required projection will > happen while the tuples are being returned from the underlying nodes and > the projected tuples are being directly fed to CTAS's dest receiver > intorel_receive(), from there into the created table. We don't need > ExecProject again in ExecParallelInsertInCTAS(). > > For instance, projection will always be done when the tuple is being returned > from an underlying sequential scan node(see ExecScan() --> > ExecProject() and this is true for both leader and workers. In both leader > and workers, we are just calling CTAS's dest receiver intorel_receive(). > > Thoughts? I took a deep look at the projection logic. In most cases, you are right that Gather node does not need projection. In some rare cases, such as Subplan (or initplan I guess). The projection will happen in Gather node. The example: Create table test(i int); Create table test2(a int, b int); insert into test values(generate_series(1,10000000,1)); insert into test2 values(generate_series(1,1000,1), generate_series(1,1000,1)); postgres=# explain(verbose, costs off) select test.i,(select i from (select * from test2) as tt limit 1) from test where test.i < 2000; QUERY PLAN ---------------------------------------- Gather Output: test.i, (SubPlan 1) Workers Planned: 2 -> Parallel Seq Scan on public.test Output: test.i Filter: (test.i < 2000) SubPlan 1 -> Limit Output: (test.i) -> Seq Scan on public.test2 Output: test.i In this case, projection is necessary, because the subplan will be executed in projection. If skipped, the table created will loss some data. Best regards, houzj |
On Thu, Nov 26, 2020 at 12:15 PM Hou, Zhijie <[hidden email]> wrote:
> > I took a deep look at the projection logic. > In most cases, you are right that Gather node does not need projection. > > In some rare cases, such as Subplan (or initplan I guess). > The projection will happen in Gather node. > > The example: > > Create table test(i int); > Create table test2(a int, b int); > insert into test values(generate_series(1,10000000,1)); > insert into test2 values(generate_series(1,1000,1), generate_series(1,1000,1)); > > postgres=# explain(verbose, costs off) select test.i,(select i from (select * from test2) as tt limit 1) from test where test.i < 2000; > QUERY PLAN > ---------------------------------------- > Gather > Output: test.i, (SubPlan 1) > Workers Planned: 2 > -> Parallel Seq Scan on public.test > Output: test.i > Filter: (test.i < 2000) > SubPlan 1 > -> Limit > Output: (test.i) > -> Seq Scan on public.test2 > Output: test.i > > In this case, projection is necessary, > because the subplan will be executed in projection. > > If skipped, the table created will loss some data. > Thanks a lot for the use case. Yes with the current patch table will lose data related to the subplan. On analyzing further, I think we can not allow parallel inserts in the cases when the Gather node has some projections to do. Because the workers can not perform that projection. So, having ps_ProjInfo in the Gather node is an indication for us to disable parallel inserts and only the leader can do the insertions after the Gather node does the required projections. Thoughts? With Regards, Bharath Rupireddy. EnterpriseDB: http://www.enterprisedb.com |
Free forum by Nabble | Edit this page |