Parallel copy

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
127 messages Options
1234567
Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Andres Freund
Hi,

On 2020-04-13 14:13:46 -0400, Robert Haas wrote:

> On Fri, Apr 10, 2020 at 2:26 PM Andres Freund <[hidden email]> wrote:
> > > Still, it might be the case that having the process that is reading
> > > the data also find the line endings is so fast that it makes no sense
> > > to split those two tasks. After all, whoever just read the data must
> > > have it in cache, and that helps a lot.
> >
> > Yea. And if it's not fast enough to split lines, then we have a problem
> > regardless of which process does the splitting.
>
> Still, if the reader does the splitting, then you don't need as much
> IPC, right? The shared memory data structure is just a ring of bytes,
> and whoever reads from it is responsible for the rest.

I don't think so. If only one process does the splitting, the
exclusively locked section is just popping off a bunch of offsets of the
ring. And that could fairly easily be done with atomic ops (since what
we need is basically a single producer multiple consumer queue, which
can be done lock free fairly easily ). Whereas in the case of each
process doing the splitting, the exclusively locked part is splitting
along lines - which takes considerably longer than just popping off a
few offsets.

Greetings,

Andres Freund


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Robert Haas
On Mon, Apr 13, 2020 at 4:16 PM Andres Freund <[hidden email]> wrote:
> I don't think so. If only one process does the splitting, the
> exclusively locked section is just popping off a bunch of offsets of the
> ring. And that could fairly easily be done with atomic ops (since what
> we need is basically a single producer multiple consumer queue, which
> can be done lock free fairly easily ). Whereas in the case of each
> process doing the splitting, the exclusively locked part is splitting
> along lines - which takes considerably longer than just popping off a
> few offsets.

Hmm, that does seem believable.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Kuntal Ghosh
Hello,

I was going through some literatures on parsing CSV files in a fully
parallelized way and found (from [1]) an interesting approach
implemented in the open-source project ParaText[2]. The algorithm
follows a two-phase approach: the first pass identifies the adjusted
chunks in parallel by exploiting the simplicity of CSV formats and the
second phase processes complete records within each adjusted chunk by
one of the available workers. Here is the sketch:

1. Each worker scans a distinct fixed sized chunk of the CSV file and
collects the following three stats from the chunk:
a) number of quotes
b) position of the first new line after even number of quotes
c) position of the first new line after odd number of quotes
2. Once stats from all the chunks are collected, the leader identifies
the adjusted chunk boundaries by iterating over the stats linearly:
- For the k-th chunk, the leader adds the number of quotes in k-1 chunks.
- If the number is even, then the k-th chunk does not start in the
middle of a quoted field, and the first newline after an even number
of quotes (the second collected information) is the first record
delimiter in this chunk.
- Otherwise, if the number is odd, the first newline after an odd
number of quotes (the third collected information) is the first record
delimiter.
- The end position of the adjusted chunk is obtained based on the
starting position of the next adjusted chunk.
3. Once the boundaries of the chunks are determined (forming adjusted
chunks), individual worker may take up one adjusted chunk and process
the tuples independently.

Although this approach parses the CSV in parallel, it requires two
scan on the CSV file. So, given a system with spinning hard-disk and
small RAM, as per my understanding, the algorithm will perform very
poorly. But, if we use this algorithm to parse a CSV file on a
multi-core system with a large RAM, the performance might be improved
significantly [1].

Hence, I was trying to think whether we can leverage this idea for
implementing parallel COPY in PG. We can design an algorithm similar
to parallel hash-join where the workers pass through different phases.
1. Phase 1 - Read fixed size chunks in parallel, store the chunks and
the small stats about each chunk in the shared memory. If the shared
memory is full, go to phase 2.
2. Phase 2 - Allow a single worker to process the stats and decide the
actual chunk boundaries so that no tuple spans across two different
chunks. Go to phase 3.
3. Phase 3 - Each worker picks one adjusted chunk, parse and process
tuples from the same. Once done with one chunk, it picks the next one
and so on.
4. If there are still some unread contents, go back to phase 1.

We can probably use separate workers for phase 1 and phase 3 so that
they can work concurrently.

Advantages:
1. Each worker spends some significant time in each phase. Gets
benefit of the instruction cache - at least in phase 1.
2. It also has the same advantage of parallel hash join - fast workers
get to work more.
3. We can extend this solution for reading data from STDIN. Of course,
the phase 1 and phase 2 must be performed by the leader process who
can read from the socket.

Disadvantages:
1. Surely doesn't work if we don't have enough shared memory.
2. Probably, this approach is just impractical for PG due to certain
limitations.

Thoughts?

[1] https://www.microsoft.com/en-us/research/uploads/prod/2019/04/chunker-sigmod19.pdf
[2] ParaText. https://github.com/wiseio/paratext.


--
Thanks & Regards,
Kuntal Ghosh
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Ants Aasma-2
On Tue, 14 Apr 2020 at 22:40, Kuntal Ghosh <[hidden email]> wrote:

> 1. Each worker scans a distinct fixed sized chunk of the CSV file and
> collects the following three stats from the chunk:
> a) number of quotes
> b) position of the first new line after even number of quotes
> c) position of the first new line after odd number of quotes
> 2. Once stats from all the chunks are collected, the leader identifies
> the adjusted chunk boundaries by iterating over the stats linearly:
> - For the k-th chunk, the leader adds the number of quotes in k-1 chunks.
> - If the number is even, then the k-th chunk does not start in the
> middle of a quoted field, and the first newline after an even number
> of quotes (the second collected information) is the first record
> delimiter in this chunk.
> - Otherwise, if the number is odd, the first newline after an odd
> number of quotes (the third collected information) is the first record
> delimiter.
> - The end position of the adjusted chunk is obtained based on the
> starting position of the next adjusted chunk.

The trouble is that, at least with current coding, the number of
quotes in a chunk can depend on whether the chunk started in a quote
or not. That's because escape characters only count inside quotes. See
for example the following csv:

foo,\"bar
baz",\"xyz"

This currently parses as one line and the number of parsed quotes
doesn't change if you add a quote in front.

But the general approach of doing the tokenization in parallel and
then a serial pass over the tokenization would still work. The quote
counting and new line finding just has to be done for both starting in
quote and not starting in quote case.

Using phases doesn't look like the correct approach - the tokenization
can be prepared just in time for the serial pass and processing the
chunk can proceed immediately after. This could all be done by having
the data in a single ringbuffer with a processing pipeline where one
process does the reading, then workers grab tokenization chunks as
they become available, then one process handles determining the chunk
boundaries, after which the chunks are processed.

But I still don't think this is something to worry about for the first
version. Just a better line splitting algorithm should go a looong way
in feeding a large number of workers, even when inserting to an
unindexed unlogged table. If we get the SIMD line splitting in, it
will be enough to overwhelm most I/O subsystems available today.

Regards,
Ants Aasma


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Ants Aasma-2
In reply to this post by Andres Freund
On Mon, 13 Apr 2020 at 23:16, Andres Freund <[hidden email]> wrote:

> > Still, if the reader does the splitting, then you don't need as much
> > IPC, right? The shared memory data structure is just a ring of bytes,
> > and whoever reads from it is responsible for the rest.
>
> I don't think so. If only one process does the splitting, the
> exclusively locked section is just popping off a bunch of offsets of the
> ring. And that could fairly easily be done with atomic ops (since what
> we need is basically a single producer multiple consumer queue, which
> can be done lock free fairly easily ). Whereas in the case of each
> process doing the splitting, the exclusively locked part is splitting
> along lines - which takes considerably longer than just popping off a
> few offsets.

I see the benefit of having one process responsible for splitting as
being able to run ahead of the workers to queue up work when many of
them need new data at the same time. I don't think the locking
benefits of a ring are important in this case. At current rather
conservative chunk sizes we are looking at ~100k chunks per second at
best, normal locking should be perfectly adequate. And chunk size can
easily be increased. I see the main value in it being simple.

But there is a point that having a layer of indirection instead of a
linear buffer allows for some workers to fall behind. Either because
the kernel scheduled them out for a time slice, or they need to do I/O
or because inserting some tuple hit an unique conflict and needs to
wait for a tx to complete or abort to resolve. With a ring buffer
reading has to wait on the slowest worker reading its chunk. Having
workers copy the data to a local buffer as the first step would reduce
the probability of hitting any issues. But still, at GB/s rates,
hiding a 10ms timeslice of delay would need 10's of megabytes of
buffer.

FWIW. I think just increasing the buffer is good enough - the CPUs
processing this workload are likely to have tens to hundreds of
megabytes of cache on board.


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
In reply to this post by Kuntal Ghosh
On Wed, Apr 15, 2020 at 1:10 AM Kuntal Ghosh <[hidden email]> wrote:

>
> Hence, I was trying to think whether we can leverage this idea for
> implementing parallel COPY in PG. We can design an algorithm similar
> to parallel hash-join where the workers pass through different phases.
> 1. Phase 1 - Read fixed size chunks in parallel, store the chunks and
> the small stats about each chunk in the shared memory. If the shared
> memory is full, go to phase 2.
> 2. Phase 2 - Allow a single worker to process the stats and decide the
> actual chunk boundaries so that no tuple spans across two different
> chunks. Go to phase 3.
>
> 3. Phase 3 - Each worker picks one adjusted chunk, parse and process
> tuples from the same. Once done with one chunk, it picks the next one
> and so on.
>
> 4. If there are still some unread contents, go back to phase 1.
>
> We can probably use separate workers for phase 1 and phase 3 so that
> they can work concurrently.
>
> Advantages:
> 1. Each worker spends some significant time in each phase. Gets
> benefit of the instruction cache - at least in phase 1.
> 2. It also has the same advantage of parallel hash join - fast workers
> get to work more.
> 3. We can extend this solution for reading data from STDIN. Of course,
> the phase 1 and phase 2 must be performed by the leader process who
> can read from the socket.
>
> Disadvantages:
> 1. Surely doesn't work if we don't have enough shared memory.
> 2. Probably, this approach is just impractical for PG due to certain
> limitations.
>

As I understand this, it needs to parse the lines twice (second time
in phase-3) and till the first two phases are over, we can't start the
tuple processing work which is done in phase-3.  So even if the
tokenization is done a bit faster but we will lose some on processing
the tuples which might not be an overall win and in fact, it can be
worse as compared to the single reader approach being discussed.
Now, if the work done in tokenization is a major (or significant)
portion of the copy then thinking of such a technique might be useful
but that is not the case as seen in the data shared above (the
tokenize time is very less as compared to data processing time) in
this email.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Robert Haas
On Wed, Apr 15, 2020 at 7:15 AM Amit Kapila <[hidden email]> wrote:

> As I understand this, it needs to parse the lines twice (second time
> in phase-3) and till the first two phases are over, we can't start the
> tuple processing work which is done in phase-3.  So even if the
> tokenization is done a bit faster but we will lose some on processing
> the tuples which might not be an overall win and in fact, it can be
> worse as compared to the single reader approach being discussed.
> Now, if the work done in tokenization is a major (or significant)
> portion of the copy then thinking of such a technique might be useful
> but that is not the case as seen in the data shared above (the
> tokenize time is very less as compared to data processing time) in
> this email.

It seems to me that a good first step here might be to forget about
parallelism for a minute and just write a patch to make the line
splitting as fast as possible.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Kuntal Ghosh
In reply to this post by Ants Aasma-2
On Wed, Apr 15, 2020 at 2:15 PM Ants Aasma <[hidden email]> wrote:

>
> On Tue, 14 Apr 2020 at 22:40, Kuntal Ghosh <[hidden email]> wrote:
> > 1. Each worker scans a distinct fixed sized chunk of the CSV file and
> > collects the following three stats from the chunk:
> > a) number of quotes
> > b) position of the first new line after even number of quotes
> > c) position of the first new line after odd number of quotes
> > 2. Once stats from all the chunks are collected, the leader identifies
> > the adjusted chunk boundaries by iterating over the stats linearly:
> > - For the k-th chunk, the leader adds the number of quotes in k-1 chunks.
> > - If the number is even, then the k-th chunk does not start in the
> > middle of a quoted field, and the first newline after an even number
> > of quotes (the second collected information) is the first record
> > delimiter in this chunk.
> > - Otherwise, if the number is odd, the first newline after an odd
> > number of quotes (the third collected information) is the first record
> > delimiter.
> > - The end position of the adjusted chunk is obtained based on the
> > starting position of the next adjusted chunk.
>
> The trouble is that, at least with current coding, the number of
> quotes in a chunk can depend on whether the chunk started in a quote
> or not. That's because escape characters only count inside quotes. See
> for example the following csv:
>
> foo,\"bar
> baz",\"xyz"
>
> This currently parses as one line and the number of parsed quotes
> doesn't change if you add a quote in front.
>
> But the general approach of doing the tokenization in parallel and
> then a serial pass over the tokenization would still work. The quote
> counting and new line finding just has to be done for both starting in
> quote and not starting in quote case.
>
Yeah, right.

> Using phases doesn't look like the correct approach - the tokenization
> can be prepared just in time for the serial pass and processing the
> chunk can proceed immediately after. This could all be done by having
> the data in a single ringbuffer with a processing pipeline where one
> process does the reading, then workers grab tokenization chunks as
> they become available, then one process handles determining the chunk
> boundaries, after which the chunks are processed.
>
I was thinking from this point of view - the sooner we introduce
parallelism in the process, the greater the benefits. Probably there
isn't any way to avoid a single-pass over the data (phase - 2 in the
above case) to tokenise the chunks. So yeah, if the reading and
tokenisation phase doesn't take much time, parallelising the same will
just be an overkill. As pointed by Andres and you, using a lock-free
circular buffer implementation sounds the way to go forward. AFAIK,
FIFO circular queue with CAS-based implementation suffers from two
problems - 1. (as pointed by you) slow workers may block producers. 2.
Since it doesn't partition the queue among the workers, does not
achieve good locality and cache-friendliness, limits their scalability
on NUMA systems.

> But I still don't think this is something to worry about for the first
> version. Just a better line splitting algorithm should go a looong way
> in feeding a large number of workers, even when inserting to an
> unindexed unlogged table. If we get the SIMD line splitting in, it
> will be enough to overwhelm most I/O subsystems available today.
>
Yeah. Parsing text is a great use case for data parallelism which can
be achieved by SIMD instructions. Consider processing 8-bit ASCII
characters in 512-bit SIMD word. A lot of code and complexity from
CopyReadLineText will surely go away. And further (I'm not sure in
this point), if we can use the schema of the table, perhaps JIT can
generate machine code to efficient read of fields based on their
types.


--
Thanks & Regards,
Kuntal Ghosh
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Andres Freund
In reply to this post by Robert Haas
On 2020-04-15 10:12:14 -0400, Robert Haas wrote:

> On Wed, Apr 15, 2020 at 7:15 AM Amit Kapila <[hidden email]> wrote:
> > As I understand this, it needs to parse the lines twice (second time
> > in phase-3) and till the first two phases are over, we can't start the
> > tuple processing work which is done in phase-3.  So even if the
> > tokenization is done a bit faster but we will lose some on processing
> > the tuples which might not be an overall win and in fact, it can be
> > worse as compared to the single reader approach being discussed.
> > Now, if the work done in tokenization is a major (or significant)
> > portion of the copy then thinking of such a technique might be useful
> > but that is not the case as seen in the data shared above (the
> > tokenize time is very less as compared to data processing time) in
> > this email.
>
> It seems to me that a good first step here might be to forget about
> parallelism for a minute and just write a patch to make the line
> splitting as fast as possible.

+1

Compared to all the rest of the efforts during COPY a fast "split rows"
implementation should not be a bottleneck anymore.


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Andres Freund
In reply to this post by Kuntal Ghosh
Hi,

On 2020-04-15 20:36:39 +0530, Kuntal Ghosh wrote:
> I was thinking from this point of view - the sooner we introduce
> parallelism in the process, the greater the benefits.

I don't really agree. Sure, that's true from a theoretical perspective,
but the incremental gains may be very small, and the cost in complexity
very high. If we can get single threaded splitting of rows to be >4GB/s,
which should very well be attainable, the rest of the COPY work is going
to dominate the time.  We shouldn't add complexity to parallelize more
of the line splitting, caring too much about scalable datastructures,
etc when the bottleneck after some straightforward optimization is
usually still in the parallelized part.

I'd expect that for now we'd likely hit scalability issues in other
parts of the system first (e.g. extension locks, buffer mapping).

Greetings,

Andres Freund


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Andres Freund
In reply to this post by Ants Aasma-2
Hi,

On 2020-04-15 12:05:47 +0300, Ants Aasma wrote:
> I see the benefit of having one process responsible for splitting as
> being able to run ahead of the workers to queue up work when many of
> them need new data at the same time.

Yea, I agree.


> I don't think the locking benefits of a ring are important in this
> case. At current rather conservative chunk sizes we are looking at
> ~100k chunks per second at best, normal locking should be perfectly
> adequate. And chunk size can easily be increased. I see the main value
> in it being simple.

I think the locking benefits of not needing to hold a lock *while*
splitting (as we'd need in some proposal floated earlier) is likely to
already be beneficial. I don't think we need to worry about lock
scalability protecting the queue of already split data, for now.

I don't think we really want to have a much larger chunk size,
btw. Makes it more likely for data to workers to take an uneven amount
of time.


> But there is a point that having a layer of indirection instead of a
> linear buffer allows for some workers to fall behind.

Yea. It'd probably make sense to read the input data into an array of
evenly sized blocks, and have the datastructure (still think a
ringbuffer makes sense) of split boundaries point into those entries. If
we don't require the input blocks to be in-order in that array, we can
reuse blocks therein that are fully processed, even if "earlier" data in
the input has not yet been fully processed.


> With a ring buffer reading has to wait on the slowest worker reading
> its chunk.

To be clear, I was only thinking of using a ringbuffer to indicate split
boundaries. And that workers would just pop entries from it before they
actually process the data (stored outside of the ringbuffer). Since the
split boundaries will always be read in order by workers, and the
entries will be tiny, there's no need to avoid copying out entries.


So basically what I was thinking we *eventually* may want (I'd forgo some
of this initially) is something like:

struct InputBlock
{
    uint32 unprocessed_chunk_parts;
    uint32 following_block;
    char data[INPUT_BLOCK_SIZE]
};

// array of input data, with > 2*nworkers entries
InputBlock *input_blocks;

struct ChunkedInputBoundary
{
    uint32 firstblock;
    uint32 startoff;
};

struct ChunkedInputBoundaries
{
    uint32 read_pos;
    uint32 write_end;
    ChunkedInputBoundary ring[RINGSIZE];
};

Where the leader would read data into InputBlocks with
unprocessed_chunk_parts == 0. Then it'd split the read input data into
chunks (presumably with chunk size << input block size), putting
identified chunks into ChunkedInputBoundaries. For each
ChunkedInputBoundary it'd increment the unprocessed_chunk_parts of each
InputBlock containing parts of the chunk.  For chunks across >1
InputBlocks each InputBlock's following_block would be set accordingly.

Workers would just pop an entry from the ringbuffer (making that entry
reusable), and process the chunk. The underlying data would not be
copied out of the InputBlocks, but obviously readers would need to take
care to handle InputBlock boundaries. Whenever a chunk is fully read, or
when crossing a InputBlock boundary, the InputBlock's
unprocessed_chunk_parts would be decremented.

Recycling of InputBlocks could probably just be an occasional linear
search for buffers with unprocessed_chunk_parts == 0.


Something roughly like this should not be too complicated to
implement. Unless extremely unluckly (very wide input data spanning many
InputBlocks) a straggling reader would not prevent global progress, it'd
just prevent reuse of the InputBlocks with data for its chunk (normally
that'd be two InputBlocks, not more).


> Having workers copy the data to a local buffer as the first
> step would reduce the probability of hitting any issues. But still, at
> GB/s rates, hiding a 10ms timeslice of delay would need 10's of
> megabytes of buffer.

Yea. Given the likelihood of blocking on resources (reading in index
data, writing out dirty buffers for reclaim, row locks for uniqueness
checks, extension locks, ...), as well as non uniform per-row costs
(partial indexes, index splits, ...) I think we ought to try to cope
well with that. IMO/IME it'll be common to see stalls that are much
longer than 10ms for processes that do COPY, even when the system is not
overloaded.


> FWIW. I think just increasing the buffer is good enough - the CPUs
> processing this workload are likely to have tens to hundreds of
> megabytes of cache on board.

It'll not necessarily be a cache shared between leader / workers though,
and some of the cache-cache transfers will be more expensive even within
a socket (between core complexes for AMD, multi chip processors for
Intel).

Greetings,

Andres Freund


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Kuntal Ghosh
In reply to this post by Andres Freund
On Wed, Apr 15, 2020 at 10:45 PM Andres Freund <[hidden email]> wrote:

>
> Hi,
>
> On 2020-04-15 20:36:39 +0530, Kuntal Ghosh wrote:
> > I was thinking from this point of view - the sooner we introduce
> > parallelism in the process, the greater the benefits.
>
> I don't really agree. Sure, that's true from a theoretical perspective,
> but the incremental gains may be very small, and the cost in complexity
> very high. If we can get single threaded splitting of rows to be >4GB/s,
> which should very well be attainable, the rest of the COPY work is going
> to dominate the time.  We shouldn't add complexity to parallelize more
> of the line splitting, caring too much about scalable datastructures,
> etc when the bottleneck after some straightforward optimization is
> usually still in the parallelized part.
>
> I'd expect that for now we'd likely hit scalability issues in other
> parts of the system first (e.g. extension locks, buffer mapping).
>
Got your point. In this particular case, a single producer is fast
enough (or probably we can make it fast enough) to generate enough
chunks for multiple consumers so that they don't stay idle and wait
for work.

--
Thanks & Regards,
Kuntal Ghosh
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
In reply to this post by Andres Freund
On Wed, Apr 15, 2020 at 11:49 PM Andres Freund <[hidden email]> wrote:
>
> To be clear, I was only thinking of using a ringbuffer to indicate split
> boundaries. And that workers would just pop entries from it before they
> actually process the data (stored outside of the ringbuffer). Since the
> split boundaries will always be read in order by workers, and the
> entries will be tiny, there's no need to avoid copying out entries.
>

I think the binary mode processing will be slightly different because
unlike text and csv format, the data is stored in Length, Value format
for each column and there are no line markers.  I don't think there
will be a big difference but still, we need to somewhere keep the
information what is the format of data in ring buffers.  Basically, we
can copy the data in Length, Value format and once the writers know
about the format, they will parse the data in the appropriate format.
We currently also have a different way of parsing the binary format,
see NextCopyFrom.  I think we need to be careful about avoiding
duplicate work as much as possible.

Apart from this, we have analyzed the other cases as mentioned below
where we need to decide whether we can allow parallelism for the copy
command.
Case-1:
Do we want to enable parallelism for a copy when transition tables are
involved?  Basically, during the copy, we do capture tuples in
transition tables for certain cases like when after statement trigger
accesses the same relation on which we have a trigger.  See the
example below [1].  We decide this in function
MakeTransitionCaptureState. For such cases, we collect minimal tuples
in the tuple store after processing them so that later after statement
triggers can access them.  Now, if we want to enable parallelism for
such cases, we instead need to store and access tuples from shared
tuple store (sharedtuplestore.c/sharedtuplestore.h).  However, it
doesn't have the facility to store tuples in-memory, so we always need
to store and access from a file which could be costly unless we also
have an additional way to store minimal tuples in shared memory till
work_memory and then in shared tuple store.  It is possible to do all
this or part of this work to enable parallel copy for such cases but I
am not sure if it is worth it. We can decide to not enable parallelism
for such cases and later allow if we see demand for the same and it
will also help us to not introduce additional work/complexity in the
first version of the patch.

Case-2:
The Single Insertion mode (CIM_SINGLE) is performed in various
scenarios and whether we can allow parallelism for those depends on
case to case basis which is discussed below:
a. When there are BEFORE/INSTEAD OF triggers on the table.  We don't
allow multi-inserts in such cases because such triggers might query
the table we're inserting into and act differently if the tuples that
have already been processed and prepared for insertion are not there.
Now, if we allow parallelism with such triggers the behavior would
depend on if the parallel worker has already inserted or not that
particular row.  I guess such functions should ideally be marked as
parallel-unsafe.  So, in short in this case whether to allow
parallelism or not depends upon the parallel-safety marking of this
function.
b. For partitioned tables, we can't support multi-inserts when there
are any statement-level insert triggers.  This is because as of now,
we expect that any before row insert and statement-level insert
triggers are on the same relation.  Now, there is no harm in allowing
parallelism for such cases but it depends upon if we have the
infrastructure (basically allow tuples to be collected in shared tuple
store) to support statement-level insert triggers.
c. For inserts into foreign tables.  We can't allow the parallelism in
this case because each worker needs to establish the FDW connection
and operate in a separate transaction.  Now unless we have a
capability to provide a two-phase commit protocol for "Transactions
involving multiple postgres foreign servers" (which is being discussed
in a separate thread [2]), we can't allow this.
d. If there are volatile default expressions or the where clause
contains a volatile expression.  Here, we can check if the expression
is parallel-safe, then we can allow parallelism.

Case-3:
In copy command, for performing foreign key checks, we take KEY SHARE
lock on primary key table rows which inturn will increment the command
counter and updates the snapshot.  Now, as we share the snapshots at
the beginning of the command, we can't allow it to be changed later.
So, unless we do something special for it, I think we can't allow
parallelism in such cases.

I couldn't think of many problems if we allow parallelism in such
cases.  One inconsistency, if we allow FK checks via workers, would be
that at the end of COPY the value of command_counter will not be what
we expect as we wouldn't have accounted for that from workers.  Now,
if COPY is being done in a  transaction it will not assign the correct
values to the next commands.  Also, for executing deferred triggers,
we use transaction snapshot, so if anything is changed in snapshot via
parallel workers, ideally it should have synced the changed snapshot
in the worker.

Now, the other concern could be that different workers can try to
acquire KEY SHARE lock on the same tuples which they will be able to
acquire due to group locking or otherwise but I don't see any problem
with it.

I am not sure if it above leads to any user-visible problem but I
might be missing something here. I think if we can think of any real
problems we can try to design a better solution to address those.

Case-4:
For Deferred Triggers, it seems we record CTIDs of tuples (via
ExecARInsertTriggers->AfterTriggerSaveEvent) and then execute deferred
triggers at transaction end using AfterTriggerFireDeferred or at end
of the statement.  The challenge to allow parallelism for such cases
is we need to capture the CTID events in shared memory.  For that, we
either need to invent a new infrastructure for event capturing in
shared memory which will be a huge task on its own. The other idea is
to get CTIDs via shared memory and then add those to event queues via
leader but I think in that case we need to ensure the order of CTIDs
(basically it should be in the same order in which we have processed
them).

[1] -
create or replace function dump_insert() returns trigger language plpgsql as
$$
  begin
    raise notice 'trigger = %, new table = %',
                 TG_NAME,
                 (select string_agg(new_table::text, ', ' order by a)
from new_table);
    return null;
  end;
$$;

create table test (a int);
create trigger trg1_test  after insert on test referencing new table
as new_table  for each statement execute procedure dump_insert();
copy test (a) from stdin;
1
2
3
\.

[2] - https://www.postgresql.org/message-id/20191206.173215.1818665441859410805.horikyota.ntt%40gmail.com

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Robert Haas
I wonder why you're still looking at this instead of looking at just
speeding up the current code, especially the line splitting, per
previous discussion. And then coming back to study this issue more
after that's done.

On Mon, May 11, 2020 at 8:12 AM Amit Kapila <[hidden email]> wrote:
> Apart from this, we have analyzed the other cases as mentioned below
> where we need to decide whether we can allow parallelism for the copy
> command.
> Case-1:
> Do we want to enable parallelism for a copy when transition tables are
> involved?

I think it would be OK not to support this.

> Case-2:
> a. When there are BEFORE/INSTEAD OF triggers on the table.
> b. For partitioned tables, we can't support multi-inserts when there
> are any statement-level insert triggers.
> c. For inserts into foreign tables.
> d. If there are volatile default expressions or the where clause
> contains a volatile expression.  Here, we can check if the expression
> is parallel-safe, then we can allow parallelism.

This all sounds fine.

> Case-3:
> In copy command, for performing foreign key checks, we take KEY SHARE
> lock on primary key table rows which inturn will increment the command
> counter and updates the snapshot.  Now, as we share the snapshots at
> the beginning of the command, we can't allow it to be changed later.
> So, unless we do something special for it, I think we can't allow
> parallelism in such cases.

This sounds like much more of a problem to me; it'd be a significant
restriction that would kick in routine cases where the user isn't
doing anything particularly exciting. The command counter presumably
only needs to be updated once per command, so maybe we could do that
before we start parallelism. However, I think we would need to have
some kind of dynamic memory structure to which new combo CIDs can be
added by any member of the group, and then discovered by other members
of the group later. At the end of the parallel operation, the leader
must discover any combo CIDs added by others to that table before
destroying it, even if it has no immediate use for the information. We
can't allow a situation where the group members have inconsistent
notions of which combo CIDs exist or what their mappings are, and if
KEY SHARE locks are being taken, new combo CIDs could be created.

> Case-4:
> For Deferred Triggers, it seems we record CTIDs of tuples (via
> ExecARInsertTriggers->AfterTriggerSaveEvent) and then execute deferred
> triggers at transaction end using AfterTriggerFireDeferred or at end
> of the statement.

I think this could be left for the future.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
On Mon, May 11, 2020 at 11:52 PM Robert Haas <[hidden email]> wrote:
>
> I wonder why you're still looking at this instead of looking at just
> speeding up the current code, especially the line splitting,
>

Because the line splitting is just 1-2% of overall work in common
cases.  See the data shared by Vignesh for various workloads [1].  The
time it takes is in range of 0.5-12% approximately and for cases like
a table with few indexes, it is not more than 1-2%.

[1] - https://www.postgresql.org/message-id/CALDaNm3r8cPsk0Vo_-6AXipTrVwd0o9U2S0nCmRdku1Dn-Tpqg%40mail.gmail.com

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
In reply to this post by Robert Haas
On Mon, May 11, 2020 at 11:52 PM Robert Haas <[hidden email]> wrote:

>
> > Case-3:
> > In copy command, for performing foreign key checks, we take KEY SHARE
> > lock on primary key table rows which inturn will increment the command
> > counter and updates the snapshot.  Now, as we share the snapshots at
> > the beginning of the command, we can't allow it to be changed later.
> > So, unless we do something special for it, I think we can't allow
> > parallelism in such cases.
>
> This sounds like much more of a problem to me; it'd be a significant
> restriction that would kick in routine cases where the user isn't
> doing anything particularly exciting. The command counter presumably
> only needs to be updated once per command, so maybe we could do that
> before we start parallelism. However, I think we would need to have
> some kind of dynamic memory structure to which new combo CIDs can be
> added by any member of the group, and then discovered by other members
> of the group later. At the end of the parallel operation, the leader
> must discover any combo CIDs added by others to that table before
> destroying it, even if it has no immediate use for the information. We
> can't allow a situation where the group members have inconsistent
> notions of which combo CIDs exist or what their mappings are, and if
> KEY SHARE locks are being taken, new combo CIDs could be created.
>

AFAIU, we don't generate combo CIDs for this case.  See below code in
heap_lock_tuple():

/*
* Store transaction information of xact locking the tuple.
*
* Note: Cmax is meaningless in this context, so don't set it; this avoids
* possibly generating a useless combo CID.  Moreover, if we're locking a
* previously updated tuple, it's important to preserve the Cmax.
*
* Also reset the HOT UPDATE bit, but only if there's no update; otherwise
* we would break the HOT chain.
*/
tuple->t_data->t_infomask &= ~HEAP_XMAX_BITS;
tuple->t_data->t_infomask2 &= ~HEAP_KEYS_UPDATED;
tuple->t_data->t_infomask |= new_infomask;
tuple->t_data->t_infomask2 |= new_infomask2;

I don't understand why we need to do something special for combo CIDs
if they are not generated during this operation?

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Robert Haas
On Tue, May 12, 2020 at 1:01 AM Amit Kapila <[hidden email]> wrote:
> I don't understand why we need to do something special for combo CIDs
> if they are not generated during this operation?

Hmm. Well I guess if they're not being generated then we don't need to
do anything about them, but I still think we should try to work around
having to disable parallelism for a table which is referenced by
foreign keys.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
On Thu, May 14, 2020 at 12:39 AM Robert Haas <[hidden email]> wrote:

>
> On Tue, May 12, 2020 at 1:01 AM Amit Kapila <[hidden email]> wrote:
> > I don't understand why we need to do something special for combo CIDs
> > if they are not generated during this operation?
>
> Hmm. Well I guess if they're not being generated then we don't need to
> do anything about them, but I still think we should try to work around
> having to disable parallelism for a table which is referenced by
> foreign keys.
>

Okay, just to be clear, we want to allow parallelism for a table that
has foreign keys.  Basically, a parallel copy should work while
loading data into tables having FK references.

To support that, we need to consider a few things.
a. Currently, we increment the command counter each time we take a key
share lock on a tuple during trigger execution.  I am really not sure
if this is required during Copy command execution or we can just
increment it once for the copy.   If we need to increment the command
counter just once for copy command then for the parallel copy we can
ensure that we do it just once at the end of the parallel copy but if
not then we might need some special handling.

b.  Another point is that after inserting rows we record CTIDs of the
tuples in the event queue and then once all tuples are processed we
call FK trigger for each CTID.  Now, with parallelism, the FK checks
will be processed once the worker processed one chunk.  I don't see
any problem with it but still, this will be a bit different from what
we do in serial case.  Do you see any problem with this?

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Dilip Kumar-2
On Thu, May 14, 2020 at 11:48 AM Amit Kapila <[hidden email]> wrote:

>
> On Thu, May 14, 2020 at 12:39 AM Robert Haas <[hidden email]> wrote:
> >
> > On Tue, May 12, 2020 at 1:01 AM Amit Kapila <[hidden email]> wrote:
> > > I don't understand why we need to do something special for combo CIDs
> > > if they are not generated during this operation?
> >
> > Hmm. Well I guess if they're not being generated then we don't need to
> > do anything about them, but I still think we should try to work around
> > having to disable parallelism for a table which is referenced by
> > foreign keys.
> >
>
> Okay, just to be clear, we want to allow parallelism for a table that
> has foreign keys.  Basically, a parallel copy should work while
> loading data into tables having FK references.
>
> To support that, we need to consider a few things.
> a. Currently, we increment the command counter each time we take a key
> share lock on a tuple during trigger execution.  I am really not sure
> if this is required during Copy command execution or we can just
> increment it once for the copy.   If we need to increment the command
> counter just once for copy command then for the parallel copy we can
> ensure that we do it just once at the end of the parallel copy but if
> not then we might need some special handling.
>
> b.  Another point is that after inserting rows we record CTIDs of the
> tuples in the event queue and then once all tuples are processed we
> call FK trigger for each CTID.  Now, with parallelism, the FK checks
> will be processed once the worker processed one chunk.  I don't see
> any problem with it but still, this will be a bit different from what
> we do in serial case.  Do you see any problem with this?

IMHO, it should not be a problem because without parallelism also we
trigger the foreign key check when we detect EOF and end of data from
STDIN.  And, with parallel workers also the worker will assume that it
has complete all the work and it can go for the foreign key check is
only after the leader receives EOF and end of data from STDIN.

The only difference is that each worker is not waiting for all the
data (from all workers) to get inserted before checking the
constraint.  Moreover, we are not supporting external triggers with
the parallel copy, otherwise, we might have to worry that those
triggers could do something on the primary table before we check the
constraint.  I am not sure if there are any other factors that I am
missing.

--
Regards,
Dilip Kumar
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Robert Haas
In reply to this post by akapila
On Thu, May 14, 2020 at 2:18 AM Amit Kapila <[hidden email]> wrote:
> To support that, we need to consider a few things.
> a. Currently, we increment the command counter each time we take a key
> share lock on a tuple during trigger execution.  I am really not sure
> if this is required during Copy command execution or we can just
> increment it once for the copy.   If we need to increment the command
> counter just once for copy command then for the parallel copy we can
> ensure that we do it just once at the end of the parallel copy but if
> not then we might need some special handling.

My sense is that it would be a lot more sensible to do it at the
*beginning* of the parallel operation. Once we do it once, we
shouldn't ever do it again; that's how it works now. Deferring it
until later seems much more likely to break things.

> b.  Another point is that after inserting rows we record CTIDs of the
> tuples in the event queue and then once all tuples are processed we
> call FK trigger for each CTID.  Now, with parallelism, the FK checks
> will be processed once the worker processed one chunk.  I don't see
> any problem with it but still, this will be a bit different from what
> we do in serial case.  Do you see any problem with this?

I think there could be some problems here. For instance, suppose that
there are two entries for different workers for the same CTID. If the
leader were trying to do all the work, they'd be handled
consecutively. If they were from completely unrelated processes,
locking would serialize them. But group locking won't, so there you
have an issue, I think. Also, it's not ideal from a work-distribution
perspective: one worker could finish early and be unable to help the
others.

--
Robert Haas
EnterpriseDB: http://www.enterprisedb.com
The Enterprise PostgreSQL Company


1234567