Parallel copy

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
127 messages Options
1234567
Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Dilip Kumar-2
On Wed, Feb 26, 2020 at 8:47 PM Ants Aasma <[hidden email]> wrote:

>
> On Tue, 25 Feb 2020 at 18:00, Tomas Vondra <[hidden email]> wrote:
> > Perhaps. I guess it'll depend on the CSV file (number of fields, ...),
> > so I still think we need to do some measurements first. I'm willing to
> > do that, but (a) I doubt I'll have time for that until after 2020-03,
> > and (b) it'd be good to agree on some set of typical CSV files.
>
> I agree that getting a nice varied dataset would be nice. Including
> things like narrow integer only tables, strings with newlines and
> escapes in them, extremely wide rows.
>
> I tried to capture a quick profile just to see what it looks like.
> Grabbed a random open data set from the web, about 800MB of narrow
> rows CSV [1].
>
> Script:
> CREATE TABLE census (year int,age int,ethnic int,sex int,area text,count text);
> COPY census FROM '.../Data8277.csv' WITH (FORMAT 'csv', HEADER true);
>
> Profile:
> # Samples: 59K of event 'cycles:u'
> # Event count (approx.): 57644269486
> #
> # Overhead  Command   Shared Object       Symbol
> # ........  ........  ..................
> .......................................
> #
>     18.24%  postgres  postgres            [.] CopyReadLine
>      9.23%  postgres  postgres            [.] NextCopyFrom
>      8.87%  postgres  postgres            [.] NextCopyFromRawFields
>      5.82%  postgres  postgres            [.] pg_verify_mbstr_len
>      5.45%  postgres  postgres            [.] pg_strtoint32
>      4.16%  postgres  postgres            [.] heap_fill_tuple
>      4.03%  postgres  postgres            [.] heap_compute_data_size
>      3.83%  postgres  postgres            [.] CopyFrom
>      3.78%  postgres  postgres            [.] AllocSetAlloc
>      3.53%  postgres  postgres            [.] heap_form_tuple
>      2.96%  postgres  postgres            [.] InputFunctionCall
>      2.89%  postgres  libc-2.30.so        [.] __memmove_avx_unaligned_erms
>      1.82%  postgres  libc-2.30.so        [.] __strlen_avx2
>      1.72%  postgres  postgres            [.] AllocSetReset
>      1.72%  postgres  postgres            [.] RelationPutHeapTuple
>      1.47%  postgres  postgres            [.] heap_prepare_insert
>      1.31%  postgres  postgres            [.] heap_multi_insert
>      1.25%  postgres  postgres            [.] textin
>      1.24%  postgres  postgres            [.] int4in
>      1.05%  postgres  postgres            [.] tts_buffer_heap_clear
>      0.85%  postgres  postgres            [.] pg_any_to_server
>      0.80%  postgres  postgres            [.] pg_comp_crc32c_sse42
>      0.77%  postgres  postgres            [.] cstring_to_text_with_len
>      0.69%  postgres  postgres            [.] AllocSetFree
>      0.60%  postgres  postgres            [.] appendBinaryStringInfo
>      0.55%  postgres  postgres            [.] tts_buffer_heap_materialize.part.0
>      0.54%  postgres  postgres            [.] palloc
>      0.54%  postgres  libc-2.30.so        [.] __memmove_avx_unaligned
>      0.51%  postgres  postgres            [.] palloc0
>      0.51%  postgres  postgres            [.] pg_encoding_max_length
>      0.48%  postgres  postgres            [.] enlargeStringInfo
>      0.47%  postgres  postgres            [.] ExecStoreVirtualTuple
>      0.45%  postgres  postgres            [.] PageAddItemExtended
>
> So that confirms that the parsing is a huge chunk of overhead with
> current splitting into lines being the largest portion. Amdahl's law
> says that splitting into tuples needs to be made fast before
> parallelizing makes any sense.
>

I have ran very simple case on table with 2 indexes and I can see a
lot of time is spent in index insertion.  I agree that there is a good
amount of time spent in tokanizing but it is not very huge compared to
index insertion.

I have expanded the time spent in the CopyFrom function from my perf
report and pasted here.  We can see that a lot of time is spent in
ExecInsertIndexTuples(77%).   I agree that we need to further evaluate
that out of which how much is I/O vs CPU operations.  But, the point I
want to make is that it's not true for all the cases that parsing is
taking maximum amout of time.

   - 99.50% CopyFrom
      - 82.90% CopyMultiInsertInfoFlush
         - 82.85% CopyMultiInsertBufferFlush
            + 77.68% ExecInsertIndexTuples
            + 3.74% table_multi_insert
            + 0.89% ExecClearTuple
      - 12.54% NextCopyFrom
         - 7.70% NextCopyFromRawFields
            - 5.72% CopyReadLine
                 3.96% CopyReadLineText
               + 1.49% pg_any_to_server
              1.86% CopyReadAttributesCSV
         + 3.68% InputFunctionCall
      + 2.11% ExecMaterializeSlot
      + 0.94% MemoryContextReset

My test:
-- Prepare:
CREATE TABLE t (a int, b int, c varchar);
insert into t select i,i, 'aaaaaaaaaaaaaaaaaaaaaaaa' from
generate_series(1,10000000) as i;
copy t to '/home/dilipkumar/a.csv'  WITH (FORMAT 'csv', HEADER true);
truncate table t;
create index idx on t(a);
create index idx1 on t(c);

-- Test CopyFrom and measure with perf:
copy t from '/home/dilipkumar/a.csv'  WITH (FORMAT 'csv', HEADER true);

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

vignesh C
In reply to this post by akapila
On Wed, Feb 26, 2020 at 4:24 PM Amit Kapila <[hidden email]> wrote:

>
> On Tue, Feb 25, 2020 at 9:30 PM Tomas Vondra
> <[hidden email]> wrote:
> >
> > On Sun, Feb 23, 2020 at 05:09:51PM -0800, Andres Freund wrote:
> > >Hi,
> > >
> > >> The one piece of information I'm missing here is at least a very rough
> > >> quantification of the individual steps of CSV processing - for example
> > >> if parsing takes only 10% of the time, it's pretty pointless to start by
> > >> parallelising this part and we should focus on the rest. If it's 50% it
> > >> might be a different story. Has anyone done any measurements?
> > >
> > >Not recently, but I'm pretty sure that I've observed CSV parsing to be
> > >way more than 10%.
> > >
> >
> > Perhaps. I guess it'll depend on the CSV file (number of fields, ...),
> > so I still think we need to do some measurements first.
> >
>
> Agreed.
>
> > I'm willing to
> > do that, but (a) I doubt I'll have time for that until after 2020-03,
> > and (b) it'd be good to agree on some set of typical CSV files.
> >
>
> Right, I don't know what is the best way to define that.  I can think
> of the below tests.
>
> 1. A table with 10 columns (with datatypes as integers, date, text).
> It has one index (unique/primary). Load with 1 million rows (basically
> the data should be probably 5-10 GB).
> 2. A table with 10 columns (with datatypes as integers, date, text).
> It has three indexes, one index can be (unique/primary). Load with 1
> million rows (basically the data should be probably 5-10 GB).
> 3. A table with 10 columns (with datatypes as integers, date, text).
> It has three indexes, one index can be (unique/primary). It has before
> and after trigeers. Load with 1 million rows (basically the data
> should be probably 5-10 GB).
> 4. A table with 10 columns (with datatypes as integers, date, text).
> It has five or six indexes, one index can be (unique/primary). Load
> with 1 million rows (basically the data should be probably 5-10 GB).
>

I have tried to capture the execution time taken for 3 scenarios which I felt could give a fair idea:
Test1 (Table with 3 indexes and 1 trigger)
Test2 (Table with 2 indexes)
Test3 (Table without indexes/triggers)

I have captured the following details:
File Read time - time taken to read the file from CopyGetData function.
Read line Time -  time taken to read line from NextCopyFrom function(read time & tokenise time excluded)
Tokenize Time - time taken to tokenize the contents from NextCopyFromRawFields function.
Data Execution Time - remaining execution time from the total time

The execution breakdown for the tests are  given below:
Test/ Time(In Seconds)Total TimeFile Read TimeRead line /Buffer Read TimeTokenize TimeData Execution Time
Test11693.3690.25634.1735.5781653.362
Test2736.0960.28839.7626.525689.521
Test3112.060.26639.1896.43366.172

Steps for the scenarios:
Test1(Table with 3 indexes and 1 trigger):
CREATE TABLE census2 (year int,age int,ethnic int,sex int,area text,count text);
CREATE TABLE census3(year int,age int,ethnic int,sex int,area text,count text);

CREATE INDEX idx1_census2 on census2(year);
CREATE INDEX idx2_census2 on census2(age);
CREATE INDEX idx2_census2 on census2(ethnic);

CREATE or REPLACE FUNCTION census2_afterinsert()
RETURNS TRIGGER
AS $$
BEGIN
  INSERT INTO census3  SELECT * FROM census2 limit 1;
  RETURN NEW;
END;
$$
LANGUAGE plpgsql;

CREATE TRIGGER census2_trigger AFTER INSERT  ON census2 FOR EACH ROW EXECUTE PROCEDURE census2_afterinsert();
COPY census2 FROM 'Data8277.csv' WITH (FORMAT 'csv', HEADER true);

Test2 (Table with 2 indexes):
CREATE TABLE census1 (year int,age int,ethnic int,sex int,area text,count text);
CREATE INDEX idx1_census1 on census1(year);
CREATE INDEX idx2_census1 on census1(age);
COPY census1 FROM 'Data8277.csv' WITH (FORMAT 'csv', HEADER true);

Test3 (Table without indexes/triggers):
CREATE TABLE census (year int,age int,ethnic int,sex int,area text,count text);
COPY census FROM 'Data8277.csv' WITH (FORMAT 'csv', HEADER true);

Note: The Data8277.csv used was the same data that Ants aasma had used.

From the above result we could infer that Read line will have to be done sequentially. Read line time takes about 2.01%, 5.40% and 34.97%of the total time. I felt we will be able to parallelise the remaining  phases of the copy. The performance improvement will vary based on the scenario(indexes/triggers), it will be proportionate to the number of indexes and triggers. Read line can also be parallelised in txt format(non csv). I feel parallelising copy could give significant improvement in quite some scenarios.

Further I'm planning to see how the execution will be for toast table. I'm also planning to do test on RAM disk where I will configure the data on RAM disk, so that we can further eliminate the I/O cost.

Thoughts?

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

vignesh C
In reply to this post by Ants Aasma-2
On Wed, Feb 26, 2020 at 8:47 PM Ants Aasma <[hidden email]> wrote:

>
> On Tue, 25 Feb 2020 at 18:00, Tomas Vondra <[hidden email]> wrote:
> > Perhaps. I guess it'll depend on the CSV file (number of fields, ...),
> > so I still think we need to do some measurements first. I'm willing to
> > do that, but (a) I doubt I'll have time for that until after 2020-03,
> > and (b) it'd be good to agree on some set of typical CSV files.
>
> I agree that getting a nice varied dataset would be nice. Including
> things like narrow integer only tables, strings with newlines and
> escapes in them, extremely wide rows.
>
> I tried to capture a quick profile just to see what it looks like.
> Grabbed a random open data set from the web, about 800MB of narrow
> rows CSV [1].
>
> Script:
> CREATE TABLE census (year int,age int,ethnic int,sex int,area text,count text);
> COPY census FROM '.../Data8277.csv' WITH (FORMAT 'csv', HEADER true);
>
> Profile:
> # Samples: 59K of event 'cycles:u'
> # Event count (approx.): 57644269486
> #
> # Overhead  Command   Shared Object       Symbol
> # ........  ........  ..................
> .......................................
> #
>     18.24%  postgres  postgres            [.] CopyReadLine
>      9.23%  postgres  postgres            [.] NextCopyFrom
>      8.87%  postgres  postgres            [.] NextCopyFromRawFields
>      5.82%  postgres  postgres            [.] pg_verify_mbstr_len
>      5.45%  postgres  postgres            [.] pg_strtoint32
>      4.16%  postgres  postgres            [.] heap_fill_tuple
>      4.03%  postgres  postgres            [.] heap_compute_data_size
>      3.83%  postgres  postgres            [.] CopyFrom
>      3.78%  postgres  postgres            [.] AllocSetAlloc
>      3.53%  postgres  postgres            [.] heap_form_tuple
>      2.96%  postgres  postgres            [.] InputFunctionCall
>      2.89%  postgres  libc-2.30.so        [.] __memmove_avx_unaligned_erms
>      1.82%  postgres  libc-2.30.so        [.] __strlen_avx2
>      1.72%  postgres  postgres            [.] AllocSetReset
>      1.72%  postgres  postgres            [.] RelationPutHeapTuple
>      1.47%  postgres  postgres            [.] heap_prepare_insert
>      1.31%  postgres  postgres            [.] heap_multi_insert
>      1.25%  postgres  postgres            [.] textin
>      1.24%  postgres  postgres            [.] int4in
>      1.05%  postgres  postgres            [.] tts_buffer_heap_clear
>      0.85%  postgres  postgres            [.] pg_any_to_server
>      0.80%  postgres  postgres            [.] pg_comp_crc32c_sse42
>      0.77%  postgres  postgres            [.] cstring_to_text_with_len
>      0.69%  postgres  postgres            [.] AllocSetFree
>      0.60%  postgres  postgres            [.] appendBinaryStringInfo
>      0.55%  postgres  postgres            [.] tts_buffer_heap_materialize.part.0
>      0.54%  postgres  postgres            [.] palloc
>      0.54%  postgres  libc-2.30.so        [.] __memmove_avx_unaligned
>      0.51%  postgres  postgres            [.] palloc0
>      0.51%  postgres  postgres            [.] pg_encoding_max_length
>      0.48%  postgres  postgres            [.] enlargeStringInfo
>      0.47%  postgres  postgres            [.] ExecStoreVirtualTuple
>      0.45%  postgres  postgres            [.] PageAddItemExtended
>
> So that confirms that the parsing is a huge chunk of overhead with
> current splitting into lines being the largest portion. Amdahl's law
> says that splitting into tuples needs to be made fast before
> parallelizing makes any sense.
>

I had taken perf report with the same test data that you had used, I was getting the following results:
.....
+   99.61%     0.00%  postgres  postgres            [.] PortalRun
+   99.61%     0.00%  postgres  postgres            [.] PortalRunMulti
+   99.61%     0.00%  postgres  postgres            [.] PortalRunUtility
+   99.61%     0.00%  postgres  postgres            [.] ProcessUtility
+   99.61%     0.00%  postgres  postgres            [.] standard_ProcessUtility
+   99.61%     0.00%  postgres  postgres            [.] DoCopy
+   99.30%     0.94%  postgres  postgres            [.] CopyFrom
+   51.61%     7.76%  postgres  postgres            [.] NextCopyFrom
+   23.66%     0.01%  postgres  postgres            [.] CopyMultiInsertInfoFlush
+   23.61%     0.28%  postgres  postgres            [.] CopyMultiInsertBufferFlush
+   21.99%     1.02%  postgres  postgres            [.] NextCopyFromRawFields
+   19.79%     0.01%  postgres  postgres            [.] table_multi_insert
+   19.32%     3.00%  postgres  postgres            [.] heap_multi_insert
+   18.27%     2.44%  postgres  postgres            [.] InputFunctionCall
+   15.19%     0.89%  postgres  postgres            [.] CopyReadLine
+   13.05%     0.18%  postgres  postgres            [.] ExecMaterializeSlot
+   13.00%     0.55%  postgres  postgres            [.] tts_buffer_heap_materialize
+   12.31%     1.77%  postgres  postgres            [.] heap_form_tuple
+   10.43%     0.45%  postgres  postgres            [.] int4in
+   10.18%     8.92%  postgres  postgres            [.] CopyReadLineText 
......

In my results I observed execution table_multi_insert was nearly 20%. Also I felt like once we have made few tuples from CopyReadLine, the parallel workers should be able to start consuming the data and process the data. We need not wait for the complete tokenisation to be finished. Once few tuples are tokenised parallel workers should start consuming the data parallelly and tokenisation should happen simultaneously. In this way once the copy is done parallelly total execution time should be CopyReadLine Time + delta processing time. 

Thoughts?

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com
Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

vignesh C
I have got the execution breakdown for few scenarios with normal disk and RAM disk.

Execution breakup in Normal disk:
Test/ Time(In Seconds)
Total TImeFile Read Timecopyreadline Time
Remaining 
Execution Time
Read line percentage
Test1(3 index + 1 trigger)2099.0170.31110.2562088.450.4886096682
Test2(2 index)657.9940.30310.171647.521.545758776
Test3(no index, no trigger)112.410.29610.996101.1189.782047861
Test4(toast)360.0281.4346.556312.04212.93121646

Execution breakup in RAM disk:
Test/ Time(In Seconds)
Total TImeFile Read Timecopyreadline Time
Remaining 
Execution Time
Read line percentage
Test1(3 index + 1 trigger)1571.5580.2596.9861564.3130.4445270235
Test2(2 index)369.9420.2636.848362.8311.851100983
Test3(no index, no trigger)54.0770.2396.80547.03312.58390813
Test4(toast)96.3230.91826.60368.80227.61853348

Steps for the scenarios:
Test1(Table with 3 indexes and 1 trigger):
CREATE TABLE census2 (year int,age int,ethnic int,sex int,area text,count text);
CREATE TABLE census3(year int,age int,ethnic int,sex int,area text,count text);

CREATE INDEX idx1_census2 on census2(year);
CREATE INDEX idx2_census2 on census2(age);
CREATE INDEX idx3_census2 on census2(ethnic);

CREATE or REPLACE FUNCTION census2_afterinsert()
RETURNS TRIGGER
AS $$
BEGIN
  INSERT INTO census3  SELECT * FROM census2 limit 1;
  RETURN NEW;
END;
$$
LANGUAGE plpgsql;

CREATE TRIGGER census2_trigger AFTER INSERT  ON census2 FOR EACH ROW EXECUTE PROCEDURE census2_afterinsert();
COPY census2 FROM 'Data8277.csv' WITH (FORMAT 'csv', HEADER true);

Test2 (Table with 2 indexes):
CREATE TABLE census1 (year int,age int,ethnic int,sex int,area text,count text);
CREATE INDEX idx1_census1 on census1(year);
CREATE INDEX idx2_census1 on census1(age);
COPY census1 FROM 'Data8277.csv' WITH (FORMAT 'csv', HEADER true);

Test3 (Table without indexes/triggers):
CREATE TABLE census (year int,age int,ethnic int,sex int,area text,count text);
COPY census FROM 'Data8277.csv' WITH (FORMAT 'csv', HEADER true);

Random open data set from the web, about 800MB of narrow rows CSV [1] was used in the above tests, the same which Ants Aasma had used.

Test4 (Toast table):
CREATE TABLE indtoasttest(descr text, cnt int DEFAULT 0, f1 text, f2 text);
alter table indtoasttest alter column f1 set storage external;
alter table indtoasttest alter column f2 set storage external;
inserted 262144 records
copy indtoasttest to '/mnt/magnetic/vignesh.c/postgres/toast_data3.csv'  WITH (FORMAT 'csv', HEADER true);

CREATE TABLE indtoasttest1(descr text, cnt int DEFAULT 0, f1 text, f2 text);
alter table indtoasttest1 alter column f1 set storage external;
alter table indtoasttest1 alter column f2 set storage external;
copy indtoasttest1 from '/mnt/magnetic/vignesh.c/postgres/toast_data3.csv'  WITH (FORMAT 'csv', HEADER true);

We could infer that Read line Time cannot be parallelized, this is mainly because if the data has quote present we will not be able to differentiate if it is part of previous record or it is part of current record. The rest of the execution time can be parallelized. Read line Time takes about 0.5%, 1.5%, 9.8% & 12.9% of the total time. We could parallelize the remaining  phases of the copy. The performance improvement will vary based on the scenario(indexes/triggers), it will be proportionate to the number of indexes and triggers. Read line can also be parallelized in txt format(non csv). We feel parallelize copy could give significant improvement in many scenarios.

Attached patch for reference which was used to capture the execution time breakup.

Thoughts?

Regards,
Vignesh

On Tue, Mar 3, 2020 at 11:44 AM vignesh C <[hidden email]> wrote:
On Wed, Feb 26, 2020 at 8:47 PM Ants Aasma <[hidden email]> wrote:

>
> On Tue, 25 Feb 2020 at 18:00, Tomas Vondra <[hidden email]> wrote:
> > Perhaps. I guess it'll depend on the CSV file (number of fields, ...),
> > so I still think we need to do some measurements first. I'm willing to
> > do that, but (a) I doubt I'll have time for that until after 2020-03,
> > and (b) it'd be good to agree on some set of typical CSV files.
>
> I agree that getting a nice varied dataset would be nice. Including
> things like narrow integer only tables, strings with newlines and
> escapes in them, extremely wide rows.
>
> I tried to capture a quick profile just to see what it looks like.
> Grabbed a random open data set from the web, about 800MB of narrow
> rows CSV [1].
>
> Script:
> CREATE TABLE census (year int,age int,ethnic int,sex int,area text,count text);
> COPY census FROM '.../Data8277.csv' WITH (FORMAT 'csv', HEADER true);
>
> Profile:
> # Samples: 59K of event 'cycles:u'
> # Event count (approx.): 57644269486
> #
> # Overhead  Command   Shared Object       Symbol
> # ........  ........  ..................
> .......................................
> #
>     18.24%  postgres  postgres            [.] CopyReadLine
>      9.23%  postgres  postgres            [.] NextCopyFrom
>      8.87%  postgres  postgres            [.] NextCopyFromRawFields
>      5.82%  postgres  postgres            [.] pg_verify_mbstr_len
>      5.45%  postgres  postgres            [.] pg_strtoint32
>      4.16%  postgres  postgres            [.] heap_fill_tuple
>      4.03%  postgres  postgres            [.] heap_compute_data_size
>      3.83%  postgres  postgres            [.] CopyFrom
>      3.78%  postgres  postgres            [.] AllocSetAlloc
>      3.53%  postgres  postgres            [.] heap_form_tuple
>      2.96%  postgres  postgres            [.] InputFunctionCall
>      2.89%  postgres  libc-2.30.so        [.] __memmove_avx_unaligned_erms
>      1.82%  postgres  libc-2.30.so        [.] __strlen_avx2
>      1.72%  postgres  postgres            [.] AllocSetReset
>      1.72%  postgres  postgres            [.] RelationPutHeapTuple
>      1.47%  postgres  postgres            [.] heap_prepare_insert
>      1.31%  postgres  postgres            [.] heap_multi_insert
>      1.25%  postgres  postgres            [.] textin
>      1.24%  postgres  postgres            [.] int4in
>      1.05%  postgres  postgres            [.] tts_buffer_heap_clear
>      0.85%  postgres  postgres            [.] pg_any_to_server
>      0.80%  postgres  postgres            [.] pg_comp_crc32c_sse42
>      0.77%  postgres  postgres            [.] cstring_to_text_with_len
>      0.69%  postgres  postgres            [.] AllocSetFree
>      0.60%  postgres  postgres            [.] appendBinaryStringInfo
>      0.55%  postgres  postgres            [.] tts_buffer_heap_materialize.part.0
>      0.54%  postgres  postgres            [.] palloc
>      0.54%  postgres  libc-2.30.so        [.] __memmove_avx_unaligned
>      0.51%  postgres  postgres            [.] palloc0
>      0.51%  postgres  postgres            [.] pg_encoding_max_length
>      0.48%  postgres  postgres            [.] enlargeStringInfo
>      0.47%  postgres  postgres            [.] ExecStoreVirtualTuple
>      0.45%  postgres  postgres            [.] PageAddItemExtended
>
> So that confirms that the parsing is a huge chunk of overhead with
> current splitting into lines being the largest portion. Amdahl's law
> says that splitting into tuples needs to be made fast before
> parallelizing makes any sense.
>

I had taken perf report with the same test data that you had used, I was getting the following results:
.....
+   99.61%     0.00%  postgres  postgres            [.] PortalRun
+   99.61%     0.00%  postgres  postgres            [.] PortalRunMulti
+   99.61%     0.00%  postgres  postgres            [.] PortalRunUtility
+   99.61%     0.00%  postgres  postgres            [.] ProcessUtility
+   99.61%     0.00%  postgres  postgres            [.] standard_ProcessUtility
+   99.61%     0.00%  postgres  postgres            [.] DoCopy
+   99.30%     0.94%  postgres  postgres            [.] CopyFrom
+   51.61%     7.76%  postgres  postgres            [.] NextCopyFrom
+   23.66%     0.01%  postgres  postgres            [.] CopyMultiInsertInfoFlush
+   23.61%     0.28%  postgres  postgres            [.] CopyMultiInsertBufferFlush
+   21.99%     1.02%  postgres  postgres            [.] NextCopyFromRawFields
+   19.79%     0.01%  postgres  postgres            [.] table_multi_insert
+   19.32%     3.00%  postgres  postgres            [.] heap_multi_insert
+   18.27%     2.44%  postgres  postgres            [.] InputFunctionCall
+   15.19%     0.89%  postgres  postgres            [.] CopyReadLine
+   13.05%     0.18%  postgres  postgres            [.] ExecMaterializeSlot
+   13.00%     0.55%  postgres  postgres            [.] tts_buffer_heap_materialize
+   12.31%     1.77%  postgres  postgres            [.] heap_form_tuple
+   10.43%     0.45%  postgres  postgres            [.] int4in
+   10.18%     8.92%  postgres  postgres            [.] CopyReadLineText 
......

In my results I observed execution table_multi_insert was nearly 20%. Also I felt like once we have made few tuples from CopyReadLine, the parallel workers should be able to start consuming the data and process the data. We need not wait for the complete tokenisation to be finished. Once few tuples are tokenised parallel workers should start consuming the data parallelly and tokenisation should happen simultaneously. In this way once the copy is done parallelly total execution time should be CopyReadLine Time + delta processing time. 

Thoughts?

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com

copy_execution_time_v2.patch (4K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

vignesh C
On Thu, Mar 12, 2020 at 6:39 PM vignesh C <[hidden email]> wrote:
>

Existing parallel copy code flow.  Copy supports copy operation from
csv, txt & bin format file. For processing csv & text format, it will
read 64kb chunk or lesser size if in case the file has lesser size
contents in the input file. Server will then read one tuple of data
and do the processing of the tuple. If the above tuple that is
generated was less than 64kb data, then the server will try to
generate another tuple for processing from the remaining unprocessed
data. If it is not able to generate one tuple from the unprocessed
data it will do a further 64kb data read or lesser remaining size that
is present in the file and send the tuple for processing. This process
is repeated till the complete file is processed. For processing bin
format file the flow is slightly different. Server will read the
number of columns that are present. Then read the column size data and
then read the actual column contents, repeat this for all the columns.
Server will then process the tuple that is generated. This process is
repeated for all the remaining tuples in the bin file. The tuple
processing flow is the same in all the formats. Currently all the
operations happen sequentially. This project will help in
parallelizing the copy operation.

I'm planning to do the POC of parallel copy with the below design:
Proposed Syntax:
COPY table_name FROM ‘copy_file' WITH (FORMAT ‘format’, PARALLEL ‘workers’);
Users can specify the number of workers that must be used for copying
the data in parallel. Here ‘workers’ is the number of workers that
must be used for parallel copy operation apart from the leader. Leader
is responsible for reading the data from the input file and generating
the work for the workers. Leader will start a transaction and share
this transaction with the workers. All workers will be using the same
transaction to insert the records. Leader will create a circular queue
and share it across the workers. The circular queue will be present in
DSM. Leader will be using a fixed size queue to share the contents
between the leader and the workers. Currently we will have 100
elements present in the queue. This will be created before the workers
are started and shared with the workers. The data structures that are
required by the parallel workers will be initialized by the leader,
the size required in dsm will be calculated and the necessary keys
will be loaded in the DSM. The specified number of workers will then
be launched. Leader will read the table data from the file and copy
the contents to the queue element by element. Each element in the
queue will have 64K size DSA. This DSA will be used to store tuple
contents from the file. The leader will try to copy as much content as
possible within one 64K DSA queue element. We intend to store at least
one tuple in each queue element. There are some cases where the 64K
space may not be enough to store a single tuple. Mostly in cases where
the table has toast data present and the single tuple can be more than
64K size. In these scenarios we will extend the DSA space accordingly.
We cannot change the size of the dsm once the workers are launched.
Whereas in case of DSA we can free the dsa pointer and reallocate the
dsa pointer based on the memory size required. This is the very reason
for choosing DSA over DSM for storing the data that must be inserted
into the relation. Leader will keep on loading the data into the queue
till the queue becomes full. Leader will transform his role into a
worker either when the Queue is full or the Complete file is
processed. Once the queue is full, the leader will switch its role to
become a worker, then the leader will continue to act as worker till
25% of the elements in the queue is consumed by all the workers. Once
there is at least 25% space available in the queue leader who was
working as a worker will switch its role back to become the leader
again. The above process of filling the queue will be continued by the
leader until the whole file is processed. Leader will wait until the
respective workers finish processing the queue elements. The copy from
functionality is also being used during initdb operations where the
copy is intended to be performed in single mode or the user can still
continue running in non-parallel mode. In case of non parallel mode,
memory allocation will happen using palloc instead of DSM/DSA and most
of the flow will be the same in both parallel and non parallel cases.

We had a couple of options for the way in which queue elements can be stored.
Option 1:  Each element (DSA chunk) will contain tuples such that each
tuple will be preceded by the length of the tuple.  So the tuples will
be arranged like (Length of tuple-1, tuple-1), (Length of tuple-2,
tuple-2), .... Or Option 2: Each element (DSA chunk) will contain only
tuples (tuple-1), (tuple-2), .....  And we will have a second
ring-buffer which contains a start-offset or length of each tuple. The
old design used to generate one tuple of data and process tuple by
tuple. In the new design, the server will generate multiple tuples of
data per queue element. The worker will then process data tuple by
tuple. As we are processing the data tuple by tuple, I felt both of
the options are almost the same. However Design1 was chosen over
Design 2 as we can save up on some space that was required by another
variable in each element of the queue.

The parallel workers will read the tuples from the queue and do the
following operations, all of these operations: a) where clause
handling, b) convert tuple to columns, c) add default null values for
the missing columns that are not present in that record, d) find the
partition if it is partitioned table, e) before row insert Triggers,
constraints  f) insertion of the data. Rest of the flow is the same as
the existing code.

Enhancements after POC is done:
Initially we plan to use the number of workers based on the worker
count user has specified, Later we will do some experiments and think
of an approach to choose workers automatically after processing sample
contents from the file.
Initially we plan to use 100 elements in the queue, Later we will
experiment to find the right size for the queue once the basic patch
is ready.
Initially we plan to generate the transaction from the leader and
share it across to the workers. Later we will change this in such a
way that the first process that will do an insert operation will
generate the transaction and share it with the rest of them.

Thoughts?

Regards,
Vignesh
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Ants Aasma-2
On Tue, 7 Apr 2020 at 08:24, vignesh C <[hidden email]> wrote:

> Leader will create a circular queue
> and share it across the workers. The circular queue will be present in
> DSM. Leader will be using a fixed size queue to share the contents
> between the leader and the workers. Currently we will have 100
> elements present in the queue. This will be created before the workers
> are started and shared with the workers. The data structures that are
> required by the parallel workers will be initialized by the leader,
> the size required in dsm will be calculated and the necessary keys
> will be loaded in the DSM. The specified number of workers will then
> be launched. Leader will read the table data from the file and copy
> the contents to the queue element by element. Each element in the
> queue will have 64K size DSA. This DSA will be used to store tuple
> contents from the file. The leader will try to copy as much content as
> possible within one 64K DSA queue element. We intend to store at least
> one tuple in each queue element. There are some cases where the 64K
> space may not be enough to store a single tuple. Mostly in cases where
> the table has toast data present and the single tuple can be more than
> 64K size. In these scenarios we will extend the DSA space accordingly.
> We cannot change the size of the dsm once the workers are launched.
> Whereas in case of DSA we can free the dsa pointer and reallocate the
> dsa pointer based on the memory size required. This is the very reason
> for choosing DSA over DSM for storing the data that must be inserted
> into the relation.

I think the element based approach and requirement that all tuples fit
into the queue makes things unnecessarily complex. The approach I
detailed earlier allows for tuples to be bigger than the buffer. In
that case a worker will claim the long tuple from the ring queue of
tuple start positions, and starts copying it into its local line_buf.
This can wrap around the buffer multiple times until the next start
position shows up. At that point this worker can proceed with
inserting the tuple and the next worker will claim the next tuple.

This way nothing needs to be resized, there is no risk of a file with
huge tuples running the system out of memory because each element will
be reallocated to be huge and the number of elements is not something
that has to be tuned.

> We had a couple of options for the way in which queue elements can be stored.
> Option 1:  Each element (DSA chunk) will contain tuples such that each
> tuple will be preceded by the length of the tuple.  So the tuples will
> be arranged like (Length of tuple-1, tuple-1), (Length of tuple-2,
> tuple-2), .... Or Option 2: Each element (DSA chunk) will contain only
> tuples (tuple-1), (tuple-2), .....  And we will have a second
> ring-buffer which contains a start-offset or length of each tuple. The
> old design used to generate one tuple of data and process tuple by
> tuple. In the new design, the server will generate multiple tuples of
> data per queue element. The worker will then process data tuple by
> tuple. As we are processing the data tuple by tuple, I felt both of
> the options are almost the same. However Design1 was chosen over
> Design 2 as we can save up on some space that was required by another
> variable in each element of the queue.

With option 1 it's not possible to read input data into shared memory
and there needs to be an extra memcpy in the time critical sequential
flow of the leader. With option 2 data could be read directly into the
shared memory buffer. With future async io support, reading and
looking for tuple boundaries could be performed concurrently.


Regards,
Ants Aasma
Cybertec


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
On Tue, Apr 7, 2020 at 7:08 PM Ants Aasma <[hidden email]> wrote:

>
> On Tue, 7 Apr 2020 at 08:24, vignesh C <[hidden email]> wrote:
> > Leader will create a circular queue
> > and share it across the workers. The circular queue will be present in
> > DSM. Leader will be using a fixed size queue to share the contents
> > between the leader and the workers. Currently we will have 100
> > elements present in the queue. This will be created before the workers
> > are started and shared with the workers. The data structures that are
> > required by the parallel workers will be initialized by the leader,
> > the size required in dsm will be calculated and the necessary keys
> > will be loaded in the DSM. The specified number of workers will then
> > be launched. Leader will read the table data from the file and copy
> > the contents to the queue element by element. Each element in the
> > queue will have 64K size DSA. This DSA will be used to store tuple
> > contents from the file. The leader will try to copy as much content as
> > possible within one 64K DSA queue element. We intend to store at least
> > one tuple in each queue element. There are some cases where the 64K
> > space may not be enough to store a single tuple. Mostly in cases where
> > the table has toast data present and the single tuple can be more than
> > 64K size. In these scenarios we will extend the DSA space accordingly.
> > We cannot change the size of the dsm once the workers are launched.
> > Whereas in case of DSA we can free the dsa pointer and reallocate the
> > dsa pointer based on the memory size required. This is the very reason
> > for choosing DSA over DSM for storing the data that must be inserted
> > into the relation.
>
> I think the element based approach and requirement that all tuples fit
> into the queue makes things unnecessarily complex. The approach I
> detailed earlier allows for tuples to be bigger than the buffer. In
> that case a worker will claim the long tuple from the ring queue of
> tuple start positions, and starts copying it into its local line_buf.
> This can wrap around the buffer multiple times until the next start
> position shows up. At that point this worker can proceed with
> inserting the tuple and the next worker will claim the next tuple.
>

IIUC, with the fixed size buffer, the parallelism might hit a bit
because till the worker copies the data from shared buffer to local
buffer the reader process won't be able to continue.  I think there
will be somewhat more leader-worker coordination is required with the
fixed buffer size. However, as you pointed out, we can't allow it to
increase it to max_size possible for all tuples as that might require
a lot of memory.  One idea could be that we allow it for first any
such tuple and then if any other element/chunk in the queue required
more memory than the default 64KB, then we will always fallback to use
the memory we have allocated for first chunk.  This will allow us to
not use more memory except for one tuple and won't hit parallelism
much as in many cases not all tuples will be so large.

I think in the proposed approach queue element is nothing but a way to
divide the work among workers based on size rather than based on
number of tuples.  Say if we try to divide the work among workers
based on start offsets, it can be more tricky.  Because it could lead
to either a lot of contentention if we choose say one offset
per-worker (basically copy the data for one tuple, process it and then
pick next tuple) or probably unequal division of work because some can
be smaller and others can be bigger.  I guess division based on size
would be a better idea. OTOH, I see the advantage of your approach as
well and I will think more on it.

>
> > We had a couple of options for the way in which queue elements can be stored.
> > Option 1:  Each element (DSA chunk) will contain tuples such that each
> > tuple will be preceded by the length of the tuple.  So the tuples will
> > be arranged like (Length of tuple-1, tuple-1), (Length of tuple-2,
> > tuple-2), .... Or Option 2: Each element (DSA chunk) will contain only
> > tuples (tuple-1), (tuple-2), .....  And we will have a second
> > ring-buffer which contains a start-offset or length of each tuple. The
> > old design used to generate one tuple of data and process tuple by
> > tuple. In the new design, the server will generate multiple tuples of
> > data per queue element. The worker will then process data tuple by
> > tuple. As we are processing the data tuple by tuple, I felt both of
> > the options are almost the same. However Design1 was chosen over
> > Design 2 as we can save up on some space that was required by another
> > variable in each element of the queue.
>
> With option 1 it's not possible to read input data into shared memory
> and there needs to be an extra memcpy in the time critical sequential
> flow of the leader. With option 2 data could be read directly into the
> shared memory buffer. With future async io support, reading and
> looking for tuple boundaries could be performed concurrently.
>

Yeah, option-2 sounds better.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Robert Haas
In reply to this post by Ants Aasma-2
On Tue, Apr 7, 2020 at 9:38 AM Ants Aasma <[hidden email]> wrote:

> I think the element based approach and requirement that all tuples fit
> into the queue makes things unnecessarily complex. The approach I
> detailed earlier allows for tuples to be bigger than the buffer. In
> that case a worker will claim the long tuple from the ring queue of
> tuple start positions, and starts copying it into its local line_buf.
> This can wrap around the buffer multiple times until the next start
> position shows up. At that point this worker can proceed with
> inserting the tuple and the next worker will claim the next tuple.
>
> This way nothing needs to be resized, there is no risk of a file with
> huge tuples running the system out of memory because each element will
> be reallocated to be huge and the number of elements is not something
> that has to be tuned.

+1. This seems like the right way to do it.

> > We had a couple of options for the way in which queue elements can be stored.
> > Option 1:  Each element (DSA chunk) will contain tuples such that each
> > tuple will be preceded by the length of the tuple.  So the tuples will
> > be arranged like (Length of tuple-1, tuple-1), (Length of tuple-2,
> > tuple-2), .... Or Option 2: Each element (DSA chunk) will contain only
> > tuples (tuple-1), (tuple-2), .....  And we will have a second
> > ring-buffer which contains a start-offset or length of each tuple. The
> > old design used to generate one tuple of data and process tuple by
> > tuple. In the new design, the server will generate multiple tuples of
> > data per queue element. The worker will then process data tuple by
> > tuple. As we are processing the data tuple by tuple, I felt both of
> > the options are almost the same. However Design1 was chosen over
> > Design 2 as we can save up on some space that was required by another
> > variable in each element of the queue.
>
> With option 1 it's not possible to read input data into shared memory
> and there needs to be an extra memcpy in the time critical sequential
> flow of the leader. With option 2 data could be read directly into the
> shared memory buffer. With future async io support, reading and
> looking for tuple boundaries could be performed concurrently.

But option 2 still seems significantly worse than your proposal above, right?

I really think we don't want a single worker in charge of finding
tuple boundaries for everybody. That adds a lot of unnecessary
inter-process communication and synchronization. Each process should
just get the next tuple starting after where the last one ended, and
then advance the end pointer so that the next process can do the same
thing. Vignesh's proposal involves having a leader process that has to
switch roles - he picks an arbitrary 25% threshold - and if it doesn't
switch roles at the right time, performance will be impacted. If the
leader doesn't get scheduled in time to refill the queue before it
runs completely empty, workers will have to wait. Ants's scheme avoids
that risk: whoever needs the next tuple reads the next line. There's
no need to ever wait for the leader because there is no leader.

I think it's worth enumerating some of the other ways that a project
in this area can fail to achieve good speedups, so that we can try to
avoid those that are avoidable and be aware of the others:

- If we're unable to supply data to the COPY process as fast as the
workers could load it, then speed will be limited at that point. We
know reading the file from disk is pretty fast compared to what a
single process can do. I'm not sure we've tested what happens with a
network socket. It will depend on the network speed some, but it might
be useful to know how many MB/s we can pump through over a UNIX
socket.

- The portion of the time that is used to split the lines is not
easily parallelizable. That seems to be a fairly small percentage for
a reasonably wide table, but it looks significant (13-18%) for a
narrow table. Such cases will gain less performance and be limited to
a smaller number of workers. I think we also need to be careful about
files whose lines are longer than the size of the buffer. If we're not
careful, we could get a significant performance drop-off in such
cases. We should make sure to pick an algorithm that seems like it
will handle such cases without serious regressions and check that a
file composed entirely of such long lines is handled reasonably
efficiently.

- There could be index contention. Let's suppose that we can read data
super fast and break it up into lines super fast. Maybe the file we're
reading is fully RAM-cached and the lines are long. Now all of the
backends are inserting into the indexes at the same time, and they
might be trying to insert into the same pages. If so, lock contention
could become a factor that hinders performance.

- There could also be similar contention on the heap. Say the tuples
are narrow, and many backends are trying to insert tuples into the
same heap page at the same time. This would lead to many lock/unlock
cycles. This could be avoided if the backends avoid targeting the same
heap pages, but I'm not sure there's any reason to expect that they
would do so unless we make some special provision for it.

- These problems could also arise with respect to TOAST table
insertions, either on the TOAST table itself or on its index. This
would only happen if the table contains a lot of toastable values, but
that could be the case: imagine a table with a bunch of columns each
of which contains a long string that isn't very compressible.

- What else? I bet the above list is not comprehensive.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Ants Aasma-2
On Wed, 8 Apr 2020 at 22:30, Robert Haas <[hidden email]> wrote:
> - If we're unable to supply data to the COPY process as fast as the
> workers could load it, then speed will be limited at that point. We
> know reading the file from disk is pretty fast compared to what a
> single process can do. I'm not sure we've tested what happens with a
> network socket. It will depend on the network speed some, but it might
> be useful to know how many MB/s we can pump through over a UNIX
> socket.

This raises a good point. If at some point we want to minimize the
amount of memory copies then we might want to allow for RDMA to
directly write incoming network traffic into a distributing ring
buffer, which would include the protocol level headers. But at this
point we are so far off from network reception becoming a bottleneck I
don't think it's worth holding anything up for not allowing for zero
copy transfers.

> - The portion of the time that is used to split the lines is not
> easily parallelizable. That seems to be a fairly small percentage for
> a reasonably wide table, but it looks significant (13-18%) for a
> narrow table. Such cases will gain less performance and be limited to
> a smaller number of workers. I think we also need to be careful about
> files whose lines are longer than the size of the buffer. If we're not
> careful, we could get a significant performance drop-off in such
> cases. We should make sure to pick an algorithm that seems like it
> will handle such cases without serious regressions and check that a
> file composed entirely of such long lines is handled reasonably
> efficiently.

I don't have a proof, but my gut feel tells me that it's fundamentally
impossible to ingest csv without a serial line-ending/comment
tokenization pass. The current line splitting algorithm is terrible.
I'm currently working with some scientific data where on ingestion
CopyReadLineText() is about 25% on profiles. I prototyped a
replacement that can do ~8GB/s on narrow rows, more on wider ones.

For rows that are consistently wider than the input buffer I think
parallelism will still give a win - the serial phase is just memcpy
through a ringbuffer, after which a worker goes away to perform the
actual insert, letting the next worker read the data. The memcpy is
already happening today, CopyReadLineText() copies the input buffer
into a StringInfo, so the only extra work is synchronization between
leader and worker.

> - There could be index contention. Let's suppose that we can read data
> super fast and break it up into lines super fast. Maybe the file we're
> reading is fully RAM-cached and the lines are long. Now all of the
> backends are inserting into the indexes at the same time, and they
> might be trying to insert into the same pages. If so, lock contention
> could become a factor that hinders performance.

Different data distribution strategies can have an effect on that.
Dealing out input data in larger or smaller chunks will have a
considerable effect on contention, btree page splits and all kinds of
things. I think the common theme would be a push to increase chunk
size to reduce contention..

> - There could also be similar contention on the heap. Say the tuples
> are narrow, and many backends are trying to insert tuples into the
> same heap page at the same time. This would lead to many lock/unlock
> cycles. This could be avoided if the backends avoid targeting the same
> heap pages, but I'm not sure there's any reason to expect that they
> would do so unless we make some special provision for it.

I thought there already was a provision for that. Am I mis-remembering?

> - What else? I bet the above list is not comprehensive.

I think parallel copy patch needs to concentrate on splitting input
data to workers. After that any performance issues would be basically
the same as a normal parallel insert workload. There may well be
bottlenecks there, but those could be tackled independently.

Regards,
Ants Aasma
Cybertec


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
In reply to this post by Robert Haas
On Thu, Apr 9, 2020 at 1:00 AM Robert Haas <[hidden email]> wrote:

>
> On Tue, Apr 7, 2020 at 9:38 AM Ants Aasma <[hidden email]> wrote:
> >
> > With option 1 it's not possible to read input data into shared memory
> > and there needs to be an extra memcpy in the time critical sequential
> > flow of the leader. With option 2 data could be read directly into the
> > shared memory buffer. With future async io support, reading and
> > looking for tuple boundaries could be performed concurrently.
>
> But option 2 still seems significantly worse than your proposal above, right?
>
> I really think we don't want a single worker in charge of finding
> tuple boundaries for everybody. That adds a lot of unnecessary
> inter-process communication and synchronization. Each process should
> just get the next tuple starting after where the last one ended, and
> then advance the end pointer so that the next process can do the same
> thing. Vignesh's proposal involves having a leader process that has to
> switch roles - he picks an arbitrary 25% threshold - and if it doesn't
> switch roles at the right time, performance will be impacted. If the
> leader doesn't get scheduled in time to refill the queue before it
> runs completely empty, workers will have to wait. Ants's scheme avoids
> that risk: whoever needs the next tuple reads the next line. There's
> no need to ever wait for the leader because there is no leader.
>

Hmm, I think in his scheme also there is a single reader process.  See
the email above [1] where he described how it should work.  I think
the difference is in the division of work.  AFAIU, in Ants scheme, the
worker needs to pick the work from tuple_offset queue whereas in
Vignesh's scheme it will be based on the size (each worker will get
probably 64KB of work).  I think in his scheme the main thing to find
out is how many tuple offsets to be assigned to each worker in one-go
so that we don't unnecessarily add contention for finding the work
unit.  I think we need to find the right balance between size and
number of tuples.  I am trying to consider size here because larger
sized tuples will probably require more time as we need to allocate
more space for them and also probably requires more processing time.
One way to achieve that could be each worker will try to claim 500
tuples (or some other threshold number) but if their size is greater
than 64K (or some other threshold size) then the worker will try with
lesser number of tuples (such that the size of the chunk of tuples is
less than a threshold size.).

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
On Thu, Apr 9, 2020 at 4:20 PM Amit Kapila <[hidden email]> wrote:

>
> On Thu, Apr 9, 2020 at 1:00 AM Robert Haas <[hidden email]> wrote:
> >
> > On Tue, Apr 7, 2020 at 9:38 AM Ants Aasma <[hidden email]> wrote:
> > >
> > > With option 1 it's not possible to read input data into shared memory
> > > and there needs to be an extra memcpy in the time critical sequential
> > > flow of the leader. With option 2 data could be read directly into the
> > > shared memory buffer. With future async io support, reading and
> > > looking for tuple boundaries could be performed concurrently.
> >
> > But option 2 still seems significantly worse than your proposal above, right?
> >
> > I really think we don't want a single worker in charge of finding
> > tuple boundaries for everybody. That adds a lot of unnecessary
> > inter-process communication and synchronization. Each process should
> > just get the next tuple starting after where the last one ended, and
> > then advance the end pointer so that the next process can do the same
> > thing. Vignesh's proposal involves having a leader process that has to
> > switch roles - he picks an arbitrary 25% threshold - and if it doesn't
> > switch roles at the right time, performance will be impacted. If the
> > leader doesn't get scheduled in time to refill the queue before it
> > runs completely empty, workers will have to wait. Ants's scheme avoids
> > that risk: whoever needs the next tuple reads the next line. There's
> > no need to ever wait for the leader because there is no leader.
> >
>
> Hmm, I think in his scheme also there is a single reader process.  See
> the email above [1] where he described how it should work.
>

oops, I forgot to specify the link to the email.  See
https://www.postgresql.org/message-id/CANwKhkO87A8gApobOz_o6c9P5auuEG1W2iCz0D5CfOeGgAnk3g%40mail.gmail.com


--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
In reply to this post by Ants Aasma-2
On Thu, Apr 9, 2020 at 3:55 AM Ants Aasma <[hidden email]> wrote:

>
> On Wed, 8 Apr 2020 at 22:30, Robert Haas <[hidden email]> wrote:
>
> > - The portion of the time that is used to split the lines is not
> > easily parallelizable. That seems to be a fairly small percentage for
> > a reasonably wide table, but it looks significant (13-18%) for a
> > narrow table. Such cases will gain less performance and be limited to
> > a smaller number of workers. I think we also need to be careful about
> > files whose lines are longer than the size of the buffer. If we're not
> > careful, we could get a significant performance drop-off in such
> > cases. We should make sure to pick an algorithm that seems like it
> > will handle such cases without serious regressions and check that a
> > file composed entirely of such long lines is handled reasonably
> > efficiently.
>
> I don't have a proof, but my gut feel tells me that it's fundamentally
> impossible to ingest csv without a serial line-ending/comment
> tokenization pass.
>

I think even if we try to do it via multiple workers it might not be
better.  In such a scheme,  every worker needs to update the end
boundaries and the next worker to keep a check if the previous has
updated the end pointer.  I think this can add a significant
synchronization effort for cases where tuples are of 100 or so bytes
which will be a common case.

> The current line splitting algorithm is terrible.
> I'm currently working with some scientific data where on ingestion
> CopyReadLineText() is about 25% on profiles. I prototyped a
> replacement that can do ~8GB/s on narrow rows, more on wider ones.
>

Good to hear.  I think that will be a good project on its own and that
might give a boost to parallel copy as with that we can further reduce
the non-parallelizable work unit.

> For rows that are consistently wider than the input buffer I think
> parallelism will still give a win - the serial phase is just memcpy
> through a ringbuffer, after which a worker goes away to perform the
> actual insert, letting the next worker read the data. The memcpy is
> already happening today, CopyReadLineText() copies the input buffer
> into a StringInfo, so the only extra work is synchronization between
> leader and worker.
>
>
> > - There could also be similar contention on the heap. Say the tuples
> > are narrow, and many backends are trying to insert tuples into the
> > same heap page at the same time. This would lead to many lock/unlock
> > cycles. This could be avoided if the backends avoid targeting the same
> > heap pages, but I'm not sure there's any reason to expect that they
> > would do so unless we make some special provision for it.
>
> I thought there already was a provision for that. Am I mis-remembering?
>

The copy uses heap_multi_insert to insert batch of tuples and I think
each batch should ideally use a different page mostly it will be a new
page. So, not sure if this will be a problem or a problem of a level
for which we need to do some special handling.  But if this turns out
to be a problem, we definetly need some better way to deal with it.

> > - What else? I bet the above list is not comprehensive.
>
> I think parallel copy patch needs to concentrate on splitting input
> data to workers. After that any performance issues would be basically
> the same as a normal parallel insert workload. There may well be
> bottlenecks there, but those could be tackled independently.
>

I agree.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Dilip Kumar-2
In reply to this post by Robert Haas
On Thu, Apr 9, 2020 at 1:00 AM Robert Haas <[hidden email]> wrote:

>
> On Tue, Apr 7, 2020 at 9:38 AM Ants Aasma <[hidden email]> wrote:
> > I think the element based approach and requirement that all tuples fit
> > into the queue makes things unnecessarily complex. The approach I
> > detailed earlier allows for tuples to be bigger than the buffer. In
> > that case a worker will claim the long tuple from the ring queue of
> > tuple start positions, and starts copying it into its local line_buf.
> > This can wrap around the buffer multiple times until the next start
> > position shows up. At that point this worker can proceed with
> > inserting the tuple and the next worker will claim the next tuple.
> >
> > This way nothing needs to be resized, there is no risk of a file with
> > huge tuples running the system out of memory because each element will
> > be reallocated to be huge and the number of elements is not something
> > that has to be tuned.
>
> +1. This seems like the right way to do it.
>
> > > We had a couple of options for the way in which queue elements can be stored.
> > > Option 1:  Each element (DSA chunk) will contain tuples such that each
> > > tuple will be preceded by the length of the tuple.  So the tuples will
> > > be arranged like (Length of tuple-1, tuple-1), (Length of tuple-2,
> > > tuple-2), .... Or Option 2: Each element (DSA chunk) will contain only
> > > tuples (tuple-1), (tuple-2), .....  And we will have a second
> > > ring-buffer which contains a start-offset or length of each tuple. The
> > > old design used to generate one tuple of data and process tuple by
> > > tuple. In the new design, the server will generate multiple tuples of
> > > data per queue element. The worker will then process data tuple by
> > > tuple. As we are processing the data tuple by tuple, I felt both of
> > > the options are almost the same. However Design1 was chosen over
> > > Design 2 as we can save up on some space that was required by another
> > > variable in each element of the queue.
> >
> > With option 1 it's not possible to read input data into shared memory
> > and there needs to be an extra memcpy in the time critical sequential
> > flow of the leader. With option 2 data could be read directly into the
> > shared memory buffer. With future async io support, reading and
> > looking for tuple boundaries could be performed concurrently.
>
> But option 2 still seems significantly worse than your proposal above, right?
>
> I really think we don't want a single worker in charge of finding
> tuple boundaries for everybody. That adds a lot of unnecessary
> inter-process communication and synchronization. Each process should
> just get the next tuple starting after where the last one ended, and
> then advance the end pointer so that the next process can do the same
> thing. Vignesh's proposal involves having a leader process that has to
> switch roles - he picks an arbitrary 25% threshold - and if it doesn't
> switch roles at the right time, performance will be impacted. If the
> leader doesn't get scheduled in time to refill the queue before it
> runs completely empty, workers will have to wait. Ants's scheme avoids
> that risk: whoever needs the next tuple reads the next line. There's
> no need to ever wait for the leader because there is no leader.

I agree that if the leader switches the role, then it is possible that
sometimes the leader might not produce the work before the queue is
empty.  OTOH, the problem with the approach you are suggesting is that
the work will be generated on-demand, i.e. there is no specific
process who is generating the data while workers are busy inserting
the data.  So IMHO, if we have a specific leader process then there
will always be work available for all the workers.  I agree that we
need to find the correct point when the leader will work as a worker.
One idea could be that when the queue is full and there is no space to
push more work to queue then the leader himself processes that work.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Robert Haas
On Thu, Apr 9, 2020 at 7:49 AM Dilip Kumar <[hidden email]> wrote:
> I agree that if the leader switches the role, then it is possible that
> sometimes the leader might not produce the work before the queue is
> empty.  OTOH, the problem with the approach you are suggesting is that
> the work will be generated on-demand, i.e. there is no specific
> process who is generating the data while workers are busy inserting
> the data.

I think you have a point. The way I think things could go wrong if we
don't have a leader is if it tends to happen that everyone wants new
work at the same time. In that case, everyone will wait at once,
whereas if there is a designated process that aggressively queues up
work, we could perhaps avoid that. Note that you really have to have
the case where everyone wants new work at the exact same moment,
because otherwise they just all take turns finding work for
themselves, and everything is fine, because nobody's waiting for
anybody else to do any work, so everyone is always making forward
progress.

Now on the other hand, if we do have a leader, and for some reason
it's slow in responding, everyone will have to wait. That could happen
either because the leader also has other responsibilities, like
reading data or helping with the main work when the queue is full, or
just because the system is really busy and the leader doesn't get
scheduled on-CPU for a while. I am inclined to think that's likely to
be a more serious problem.

The thing is, the problem of everyone needing new work at the same
time can't really keep on repeating. Say that everyone finishes
processing their first chunk at the same time. Now everyone needs a
second chunk, and in a leaderless system, they must take turns getting
it. So they will go in some order. The ones who go later will
presumably also finish later, so the end times for the second and
following chunks will be scattered. You shouldn't get repeated
pile-ups with everyone finishing at the same time, because each time
it happens, it will force a little bit of waiting that will spread
things out. If they clump up again, that will happen again, but it
shouldn't happen every time.

But in the case where there is a leader, I don't think there's any
similar protection. Suppose we go with the design Vignesh proposes
where the leader switches to processing chunks when the queue is more
than 75% full. If the leader has a "hiccup" where it gets swapped out
or is busy with processing a chunk for a longer-than-normal time, all
of the other processes have to wait for it. Now we can probably tune
this to some degree by adjusting the queue size and fullness
thresholds, but the optimal values for those parameters might be quite
different on different systems, depending on load, I/O performance,
CPU architecture, etc. If there's a system or configuration where the
leader tends not to respond fast enough, it will probably just keep
happening, because nothing in the algorithm will tend to shake it out
of that bad pattern.

I'm not 100% certain that my analysis here is right, so it will be
interesting to hear from other people. However, as a general rule, I
think we want to minimize the amount of work that can only be done by
one process (the leader) and maximize the amount that can be done by
any process with whichever one is available taking on the job. In the
case of COPY FROM STDIN, the reads from the network socket can only be
done by the one process connected to it. In the case of COPY from a
file, even that could be rotated around, if all processes open the
file individually and seek to the appropriate offset.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Andres Freund
In reply to this post by akapila
Hi,

On April 9, 2020 4:01:43 AM PDT, Amit Kapila <[hidden email]> wrote:

>On Thu, Apr 9, 2020 at 3:55 AM Ants Aasma <[hidden email]> wrote:
>>
>> On Wed, 8 Apr 2020 at 22:30, Robert Haas <[hidden email]>
>wrote:
>>
>> > - The portion of the time that is used to split the lines is not
>> > easily parallelizable. That seems to be a fairly small percentage
>for
>> > a reasonably wide table, but it looks significant (13-18%) for a
>> > narrow table. Such cases will gain less performance and be limited
>to
>> > a smaller number of workers. I think we also need to be careful
>about
>> > files whose lines are longer than the size of the buffer. If we're
>not
>> > careful, we could get a significant performance drop-off in such
>> > cases. We should make sure to pick an algorithm that seems like it
>> > will handle such cases without serious regressions and check that a
>> > file composed entirely of such long lines is handled reasonably
>> > efficiently.
>>
>> I don't have a proof, but my gut feel tells me that it's
>fundamentally
>> impossible to ingest csv without a serial line-ending/comment
>> tokenization pass.

I can't quite see a way either. But even if it were, I have a hard time seeing parallelizing that path as the right thing.


>I think even if we try to do it via multiple workers it might not be
>better.  In such a scheme,  every worker needs to update the end
>boundaries and the next worker to keep a check if the previous has
>updated the end pointer.  I think this can add a significant
>synchronization effort for cases where tuples are of 100 or so bytes
>which will be a common case.

It seems like it'd also have terrible caching and instruction level parallelism behavior. By constantly switching the process that analyzes boundaries, the current data will have to be brought into l1/register, rather than staying there.

I'm fairly certain that we do *not* want to distribute input data between processes on a single tuple basis. Probably not even below a few hundred kb. If there's any sort of natural clustering in the loaded data - extremely common, think timestamps - splitting on a granular basis will make indexing much more expensive. And have a lot more contention.


>> The current line splitting algorithm is terrible.
>> I'm currently working with some scientific data where on ingestion
>> CopyReadLineText() is about 25% on profiles. I prototyped a
>> replacement that can do ~8GB/s on narrow rows, more on wider ones.

We should really replace the entire copy parsing code. It's terrible.

Andres
--
Sent from my Android device with K-9 Mail. Please excuse my brevity.


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Robert Haas
On Thu, Apr 9, 2020 at 2:55 PM Andres Freund <[hidden email]> wrote:
> I'm fairly certain that we do *not* want to distribute input data between processes on a single tuple basis. Probably not even below a few hundred kb. If there's any sort of natural clustering in the loaded data - extremely common, think timestamps - splitting on a granular basis will make indexing much more expensive. And have a lot more contention.

That's a fair point. I think the solution ought to be that once any
process starts finding line endings, it continues until it's grabbed
at least a certain amount of data for itself. Then it stops and lets
some other process grab a chunk of data.

Or are you are arguing that there should be only one process that's
allowed to find line endings for the entire duration of the load?

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Andres Freund
Hi,

On April 9, 2020 12:29:09 PM PDT, Robert Haas <[hidden email]> wrote:

>On Thu, Apr 9, 2020 at 2:55 PM Andres Freund <[hidden email]>
>wrote:
>> I'm fairly certain that we do *not* want to distribute input data
>between processes on a single tuple basis. Probably not even below a
>few hundred kb. If there's any sort of natural clustering in the loaded
>data - extremely common, think timestamps - splitting on a granular
>basis will make indexing much more expensive. And have a lot more
>contention.
>
>That's a fair point. I think the solution ought to be that once any
>process starts finding line endings, it continues until it's grabbed
>at least a certain amount of data for itself. Then it stops and lets
>some other process grab a chunk of data.
>
>Or are you are arguing that there should be only one process that's
>allowed to find line endings for the entire duration of the load?

I've not yet read the whole thread. So I'm probably restating ideas.

Imo, yes, there should be only one process doing the chunking. For ilp, cache efficiency, but also because the leader is the only process with access to the network socket. It should load input data into one large buffer that's shared across processes. There should be a separate ringbuffer with tuple/partial tuple (for huge tuples) offsets. Worker processes should grab large chunks of offsets from the offset ringbuffer. If the ringbuffer is not full, the worker chunks should be reduced in size.  

Given that everything stalls if the leader doesn't accept further input data, as well as when there are no available splitted chunks, it doesn't seem like a good idea to have the leader do other work.


I don't think optimizing/targeting copy from local files, where multiple processes could read, is useful. COPY STDIN is the only thing that practically matters.

Andres


--
Sent from my Android device with K-9 Mail. Please excuse my brevity.


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Robert Haas
On Thu, Apr 9, 2020 at 4:00 PM Andres Freund <[hidden email]> wrote:
> I've not yet read the whole thread. So I'm probably restating ideas.

Yeah, but that's OK.

> Imo, yes, there should be only one process doing the chunking. For ilp, cache efficiency, but also because the leader is the only process with access to the network socket. It should load input data into one large buffer that's shared across processes. There should be a separate ringbuffer with tuple/partial tuple (for huge tuples) offsets. Worker processes should grab large chunks of offsets from the offset ringbuffer. If the ringbuffer is not full, the worker chunks should be reduced in size.

My concern here is that it's going to be hard to avoid processes going
idle. If the leader does nothing at all once the ring buffer is full,
it's wasting time that it could spend processing a chunk. But if it
picks up a chunk, then it might not get around to refilling the buffer
before other processes are idle with no work to do.

Still, it might be the case that having the process that is reading
the data also find the line endings is so fast that it makes no sense
to split those two tasks. After all, whoever just read the data must
have it in cache, and that helps a lot.

> Given that everything stalls if the leader doesn't accept further input data, as well as when there are no available splitted chunks, it doesn't seem like a good idea to have the leader do other work.
>
> I don't think optimizing/targeting copy from local files, where multiple processes could read, is useful. COPY STDIN is the only thing that practically matters.

Yeah, I think Amit has been thinking primarily in terms of COPY from
files, and I've been encouraging him to at least consider the STDIN
case. But I think you're right, and COPY FROM STDIN should be the
design center for this feature.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Andres Freund
Hi,

On 2020-04-10 07:40:06 -0400, Robert Haas wrote:
> On Thu, Apr 9, 2020 at 4:00 PM Andres Freund <[hidden email]> wrote:
> > Imo, yes, there should be only one process doing the chunking. For ilp, cache efficiency, but also because the leader is the only process with access to the network socket. It should load input data into one large buffer that's shared across processes. There should be a separate ringbuffer with tuple/partial tuple (for huge tuples) offsets. Worker processes should grab large chunks of offsets from the offset ringbuffer. If the ringbuffer is not full, the worker chunks should be reduced in size.
>
> My concern here is that it's going to be hard to avoid processes going
> idle. If the leader does nothing at all once the ring buffer is full,
> it's wasting time that it could spend processing a chunk. But if it
> picks up a chunk, then it might not get around to refilling the buffer
> before other processes are idle with no work to do.

An idle process doesn't cost much. Processes that use CPU inefficiently
however...


> Still, it might be the case that having the process that is reading
> the data also find the line endings is so fast that it makes no sense
> to split those two tasks. After all, whoever just read the data must
> have it in cache, and that helps a lot.

Yea. And if it's not fast enough to split lines, then we have a problem
regardless of which process does the splitting.

Greetings,

Andres Freund


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Robert Haas
On Fri, Apr 10, 2020 at 2:26 PM Andres Freund <[hidden email]> wrote:
> > Still, it might be the case that having the process that is reading
> > the data also find the line endings is so fast that it makes no sense
> > to split those two tasks. After all, whoever just read the data must
> > have it in cache, and that helps a lot.
>
> Yea. And if it's not fast enough to split lines, then we have a problem
> regardless of which process does the splitting.

Still, if the reader does the splitting, then you don't need as much
IPC, right? The shared memory data structure is just a ring of bytes,
and whoever reads from it is responsible for the rest.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


1234567