Parallel copy

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
36 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Parallel copy

akapila
This work is to parallelize the copy command and in particular "Copy
<table_name> from 'filename' Where <condition>;" command.

Before going into how and what portion of 'copy command' processing we
can parallelize, let us see in brief what are the top-level operations
we perform while copying from the file into a table.  We read the file
in 64KB chunks, then find the line endings and process that data line
by line, where each line corresponds to one tuple.  We first form the
tuple (in form of value/null array) from that line, check if it
qualifies the where condition and if it qualifies, then perform
constraint check and few other checks and then finally store it in
local tuple array.  Once we reach 1000 tuples or consumed 64KB
(whichever occurred first), we insert them together via
table_multi_insert API and then for each tuple insert into the
index(es) and execute after row triggers.

So if we see here we do a lot of work after reading each 64K chunk.
We can read the next chunk only after all the tuples are processed in
the previous chunk we read.  This brings us an opportunity to
parallelize each 64K chunk processing.  I think we can do this in more
than one way.

The first idea is that we allocate each chunk to a worker and once the
worker has finished processing the current chunk, it can start with
the next unprocessed chunk.  Here, we need to see how to handle the
partial tuples at the end or beginning of each chunk.  We can read the
chunks in dsa/dsm instead of in local buffer for processing.
Alternatively, if we think that accessing shared memory can be costly
we can read the entire chunk in local memory, but copy the partial
tuple at the beginning of a chunk (if any) to a dsa.  We mainly need
partial tuple in the shared memory area. The worker which has found
the initial part of the partial tuple will be responsible to process
the entire tuple. Now, to detect whether there is a partial tuple at
the beginning of a chunk, we always start reading one byte, prior to
the start of the current chunk and if that byte is not a terminating
line byte, we know that it is a partial tuple.  Now, while processing
the chunk, we will ignore this first line and start after the first
terminating line.

To connect the partial tuple in two consecutive chunks, we need to
have another data structure (for the ease of reference in this email,
I call it CTM (chunk-tuple-map)) in shared memory where we store some
per-chunk information like the chunk-number, dsa location of that
chunk and a variable which indicates whether we can free/reuse the
current entry.  Whenever we encounter the partial tuple at the
beginning of a chunk we note down its chunk number, and dsa location
in CTM.  Next, whenever we encounter any partial tuple at the end of
the chunk, we search CTM for next chunk-number and read from
corresponding dsa location till we encounter terminating line byte.
Once we have read and processed this partial tuple, we can mark the
entry as available for reuse.  There are some loose ends here like how
many entries shall we allocate in this data structure.  It depends on
whether we want to allow the worker to start reading the next chunk
before the partial tuple of the previous chunk is processed.  To keep
it simple, we can allow the worker to process the next chunk only when
the partial tuple in the previous chunk is processed.  This will allow
us to keep the entries equal to a number of workers in CTM.  I think
we can easily improve this if we want but I don't think it will matter
too much as in most cases by the time we processed the tuples in that
chunk, the partial tuple would have been consumed by the other worker.

Another approach that came up during an offlist discussion with Robert
is that we have one dedicated worker for reading the chunks from file
and it copies the complete tuples of one chunk in the shared memory
and once that is done, a handover that chunks to another worker which
can process tuples in that area.  We can imagine that the reader
worker is responsible to form some sort of work queue that can be
processed by the other workers.  In this idea, we won't be able to get
the benefit of initial tokenization (forming tuple boundaries) via
parallel workers and might need some additional memory processing as
after reader worker has handed the initial shared memory segment, we
need to somehow identify tuple boundaries and then process them.

Another thing we need to figure out is the how many workers to use for
the copy command.  I think we can use it based on the file size which
needs some experiments or may be based on user input.

I think we have two related problems to solve for this (a) relation
extension lock (required for extending the relation) which won't
conflict among workers due to group locking, we are working on a
solution for this in another thread [1], (b) Use of Page locks in Gin
indexes, we can probably disallow parallelism if the table has Gin
index which is not a great thing but not bad either.

To be clear, this work is for PG14.

Thoughts?

[1] - https://www.postgresql.org/message-id/CAD21AoCmT3cFQUN4aVvzy5chw7DuzXrJCbrjTU05B%2BSs%3DGn1LA%40mail.gmail.com

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Thomas Munro-5
On Fri, Feb 14, 2020 at 9:12 PM Amit Kapila <[hidden email]> wrote:
> This work is to parallelize the copy command and in particular "Copy
> <table_name> from 'filename' Where <condition>;" command.

Nice project, and a great stepping stone towards parallel DML.

> The first idea is that we allocate each chunk to a worker and once the
> worker has finished processing the current chunk, it can start with
> the next unprocessed chunk.  Here, we need to see how to handle the
> partial tuples at the end or beginning of each chunk.  We can read the
> chunks in dsa/dsm instead of in local buffer for processing.
> Alternatively, if we think that accessing shared memory can be costly
> we can read the entire chunk in local memory, but copy the partial
> tuple at the beginning of a chunk (if any) to a dsa.  We mainly need
> partial tuple in the shared memory area. The worker which has found
> the initial part of the partial tuple will be responsible to process
> the entire tuple. Now, to detect whether there is a partial tuple at
> the beginning of a chunk, we always start reading one byte, prior to
> the start of the current chunk and if that byte is not a terminating
> line byte, we know that it is a partial tuple.  Now, while processing
> the chunk, we will ignore this first line and start after the first
> terminating line.

That's quiet similar to the approach I took with a parallel file_fdw
patch[1], which mostly consisted of parallelising the reading part of
copy.c, except that...

> To connect the partial tuple in two consecutive chunks, we need to
> have another data structure (for the ease of reference in this email,
> I call it CTM (chunk-tuple-map)) in shared memory where we store some
> per-chunk information like the chunk-number, dsa location of that
> chunk and a variable which indicates whether we can free/reuse the
> current entry.  Whenever we encounter the partial tuple at the
> beginning of a chunk we note down its chunk number, and dsa location
> in CTM.  Next, whenever we encounter any partial tuple at the end of
> the chunk, we search CTM for next chunk-number and read from
> corresponding dsa location till we encounter terminating line byte.
> Once we have read and processed this partial tuple, we can mark the
> entry as available for reuse.  There are some loose ends here like how
> many entries shall we allocate in this data structure.  It depends on
> whether we want to allow the worker to start reading the next chunk
> before the partial tuple of the previous chunk is processed.  To keep
> it simple, we can allow the worker to process the next chunk only when
> the partial tuple in the previous chunk is processed.  This will allow
> us to keep the entries equal to a number of workers in CTM.  I think
> we can easily improve this if we want but I don't think it will matter
> too much as in most cases by the time we processed the tuples in that
> chunk, the partial tuple would have been consumed by the other worker.

... I didn't use a shm 'partial tuple' exchanging mechanism, I just
had each worker follow the final tuple in its chunk into the next
chunk, and have each worker ignore the first tuple in chunk after
chunk 0 because it knows someone else is looking after that.  That
means that there was some double reading going on near the boundaries,
and considering how much I've been complaining about bogus extra
system calls on this mailing list lately, yeah, your idea of doing a
bit more coordination is a better idea.  If you go this way, you might
at least find the copy.c part of the patch I wrote useful as stand-in
scaffolding code in the meantime while you prototype the parallel
writing side, if you don't already have something better for this?

> Another approach that came up during an offlist discussion with Robert
> is that we have one dedicated worker for reading the chunks from file
> and it copies the complete tuples of one chunk in the shared memory
> and once that is done, a handover that chunks to another worker which
> can process tuples in that area.  We can imagine that the reader
> worker is responsible to form some sort of work queue that can be
> processed by the other workers.  In this idea, we won't be able to get
> the benefit of initial tokenization (forming tuple boundaries) via
> parallel workers and might need some additional memory processing as
> after reader worker has handed the initial shared memory segment, we
> need to somehow identify tuple boundaries and then process them.

Yeah, I have also wondered about something like this in a slightly
different context.  For parallel query in general, I wondered if there
should be a Parallel Scatter node, that can be put on top of any
parallel-safe plan, and it runs it in a worker process that just
pushes tuples into a single-producer multi-consumer shm queue, and
then other workers read from that whenever they need a tuple.  Hmm,
but for COPY, I suppose you'd want to push the raw lines with minimal
examination, not tuples, into a shm queue, so I guess that's a bit
different.

> Another thing we need to figure out is the how many workers to use for
> the copy command.  I think we can use it based on the file size which
> needs some experiments or may be based on user input.

It seems like we don't even really have a general model for that sort
of thing in the rest of the system yet, and I guess some kind of
fairly dumb explicit system would make sense in the early days...

> Thoughts?

This is cool.

[1] https://www.postgresql.org/message-id/CA%2BhUKGKZu8fpZo0W%3DPOmQEN46kXhLedzqqAnt5iJZy7tD0x6sw%40mail.gmail.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
On Fri, Feb 14, 2020 at 3:36 PM Thomas Munro <[hidden email]> wrote:
>
> On Fri, Feb 14, 2020 at 9:12 PM Amit Kapila <[hidden email]> wrote:
> > This work is to parallelize the copy command and in particular "Copy
> > <table_name> from 'filename' Where <condition>;" command.
>
> Nice project, and a great stepping stone towards parallel DML.
>

Thanks.

> > The first idea is that we allocate each chunk to a worker and once the
> > worker has finished processing the current chunk, it can start with
> > the next unprocessed chunk.  Here, we need to see how to handle the
> > partial tuples at the end or beginning of each chunk.  We can read the
> > chunks in dsa/dsm instead of in local buffer for processing.
> > Alternatively, if we think that accessing shared memory can be costly
> > we can read the entire chunk in local memory, but copy the partial
> > tuple at the beginning of a chunk (if any) to a dsa.  We mainly need
> > partial tuple in the shared memory area. The worker which has found
> > the initial part of the partial tuple will be responsible to process
> > the entire tuple. Now, to detect whether there is a partial tuple at
> > the beginning of a chunk, we always start reading one byte, prior to
> > the start of the current chunk and if that byte is not a terminating
> > line byte, we know that it is a partial tuple.  Now, while processing
> > the chunk, we will ignore this first line and start after the first
> > terminating line.
>
> That's quiet similar to the approach I took with a parallel file_fdw
> patch[1], which mostly consisted of parallelising the reading part of
> copy.c, except that...
>
> > To connect the partial tuple in two consecutive chunks, we need to
> > have another data structure (for the ease of reference in this email,
> > I call it CTM (chunk-tuple-map)) in shared memory where we store some
> > per-chunk information like the chunk-number, dsa location of that
> > chunk and a variable which indicates whether we can free/reuse the
> > current entry.  Whenever we encounter the partial tuple at the
> > beginning of a chunk we note down its chunk number, and dsa location
> > in CTM.  Next, whenever we encounter any partial tuple at the end of
> > the chunk, we search CTM for next chunk-number and read from
> > corresponding dsa location till we encounter terminating line byte.
> > Once we have read and processed this partial tuple, we can mark the
> > entry as available for reuse.  There are some loose ends here like how
> > many entries shall we allocate in this data structure.  It depends on
> > whether we want to allow the worker to start reading the next chunk
> > before the partial tuple of the previous chunk is processed.  To keep
> > it simple, we can allow the worker to process the next chunk only when
> > the partial tuple in the previous chunk is processed.  This will allow
> > us to keep the entries equal to a number of workers in CTM.  I think
> > we can easily improve this if we want but I don't think it will matter
> > too much as in most cases by the time we processed the tuples in that
> > chunk, the partial tuple would have been consumed by the other worker.
>
> ... I didn't use a shm 'partial tuple' exchanging mechanism, I just
> had each worker follow the final tuple in its chunk into the next
> chunk, and have each worker ignore the first tuple in chunk after
> chunk 0 because it knows someone else is looking after that.  That
> means that there was some double reading going on near the boundaries,
>

Right and especially if the part in the second chunk is bigger, then
we might need to read most of the second chunk.

> and considering how much I've been complaining about bogus extra
> system calls on this mailing list lately, yeah, your idea of doing a
> bit more coordination is a better idea.  If you go this way, you might
> at least find the copy.c part of the patch I wrote useful as stand-in
> scaffolding code in the meantime while you prototype the parallel
> writing side, if you don't already have something better for this?
>

No, I haven't started writing anything yet, but I have some ideas on
how to achieve this.  I quickly skimmed through your patch and I think
that can be used as a starting point though if we decide to go with
accumulating the partial tuple or all the data in shm, then the things
might differ.

> > Another approach that came up during an offlist discussion with Robert
> > is that we have one dedicated worker for reading the chunks from file
> > and it copies the complete tuples of one chunk in the shared memory
> > and once that is done, a handover that chunks to another worker which
> > can process tuples in that area.  We can imagine that the reader
> > worker is responsible to form some sort of work queue that can be
> > processed by the other workers.  In this idea, we won't be able to get
> > the benefit of initial tokenization (forming tuple boundaries) via
> > parallel workers and might need some additional memory processing as
> > after reader worker has handed the initial shared memory segment, we
> > need to somehow identify tuple boundaries and then process them.
>
> Yeah, I have also wondered about something like this in a slightly
> different context.  For parallel query in general, I wondered if there
> should be a Parallel Scatter node, that can be put on top of any
> parallel-safe plan, and it runs it in a worker process that just
> pushes tuples into a single-producer multi-consumer shm queue, and
> then other workers read from that whenever they need a tuple.
>

The idea sounds great but the past experience shows that shoving all
the tuples through queue might add a significant overhead.  However, I
don't know how exactly you are planning to use it?

>  Hmm,
> but for COPY, I suppose you'd want to push the raw lines with minimal
> examination, not tuples, into a shm queue, so I guess that's a bit
> different.
>

Yeah.

> > Another thing we need to figure out is the how many workers to use for
> > the copy command.  I think we can use it based on the file size which
> > needs some experiments or may be based on user input.
>
> It seems like we don't even really have a general model for that sort
> of thing in the rest of the system yet, and I guess some kind of
> fairly dumb explicit system would make sense in the early days...
>

makes sense.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Alastair Turner-2
On Fri, 14 Feb 2020 at 11:57, Amit Kapila <[hidden email]> wrote:
On Fri, Feb 14, 2020 at 3:36 PM Thomas Munro <[hidden email]> wrote:
>
> On Fri, Feb 14, 2020 at 9:12 PM Amit Kapila <[hidden email]> wrote:
 ...
> > Another approach that came up during an offlist discussion with Robert
> > is that we have one dedicated worker for reading the chunks from file
> > and it copies the complete tuples of one chunk in the shared memory
> > and once that is done, a handover that chunks to another worker which
> > can process tuples in that area.  We can imagine that the reader
> > worker is responsible to form some sort of work queue that can be
> > processed by the other workers.  In this idea, we won't be able to get
> > the benefit of initial tokenization (forming tuple boundaries) via
> > parallel workers and might need some additional memory processing as
> > after reader worker has handed the initial shared memory segment, we
> > need to somehow identify tuple boundaries and then process them.

Parsing rows from the raw input (the work done by CopyReadLine()) in a single process would accommodate line returns in quoted fields. I don't think there's a way of getting parallel workers to manage the in-quote/out-of-quote state required. A single worker could also process a stream without having to reread/rewind so it would be able to process input from STDIN or PROGRAM sources, making the improvements applicable to load operations done by third party tools and scripted \copy in psql.
 
>
...

> > Another thing we need to figure out is the how many workers to use for
> > the copy command.  I think we can use it based on the file size which
> > needs some experiments or may be based on user input.
>
> It seems like we don't even really have a general model for that sort
> of thing in the rest of the system yet, and I guess some kind of
> fairly dumb explicit system would make sense in the early days...
>

makes sense.
The ratio between chunking or line parsing processes and the parallel worker pool would vary with the width of the table, complexity of the data or file (dates, encoding conversions), complexity of constraints and acceptable impact of the load. Being able to control it through user input would be great.

--
Alastair
Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
On Fri, Feb 14, 2020 at 7:16 PM Alastair Turner <[hidden email]> wrote:

>
> On Fri, 14 Feb 2020 at 11:57, Amit Kapila <[hidden email]> wrote:
>>
>> On Fri, Feb 14, 2020 at 3:36 PM Thomas Munro <[hidden email]> wrote:
>> >
>> > On Fri, Feb 14, 2020 at 9:12 PM Amit Kapila <[hidden email]> wrote:
>
>  ...
>>
>> > > Another approach that came up during an offlist discussion with Robert
>> > > is that we have one dedicated worker for reading the chunks from file
>> > > and it copies the complete tuples of one chunk in the shared memory
>> > > and once that is done, a handover that chunks to another worker which
>> > > can process tuples in that area.  We can imagine that the reader
>> > > worker is responsible to form some sort of work queue that can be
>> > > processed by the other workers.  In this idea, we won't be able to get
>> > > the benefit of initial tokenization (forming tuple boundaries) via
>> > > parallel workers and might need some additional memory processing as
>> > > after reader worker has handed the initial shared memory segment, we
>> > > need to somehow identify tuple boundaries and then process them.
>
>
> Parsing rows from the raw input (the work done by CopyReadLine()) in a single process would accommodate line returns in quoted fields. I don't think there's a way of getting parallel workers to manage the in-quote/out-of-quote state required.
>

AFAIU, the whole of this in-quote/out-of-quote state is manged inside
CopyReadLineText which will be done by each of the parallel workers,
something on the lines of what Thomas did in his patch [1].
Basically, we need to invent a mechanism to allocate chunks to
individual workers and then the whole processing will be done as we
are doing now except for special handling for partial tuples which I
have explained in my previous email.  Am, I missing something here?

>>
>> >
>
> ...
>>
>>
>> > > Another thing we need to figure out is the how many workers to use for
>> > > the copy command.  I think we can use it based on the file size which
>> > > needs some experiments or may be based on user input.
>> >
>> > It seems like we don't even really have a general model for that sort
>> > of thing in the rest of the system yet, and I guess some kind of
>> > fairly dumb explicit system would make sense in the early days...
>> >
>>
>> makes sense.
>
> The ratio between chunking or line parsing processes and the parallel worker pool would vary with the width of the table, complexity of the data or file (dates, encoding conversions), complexity of constraints and acceptable impact of the load. Being able to control it through user input would be great.
>

Okay, I think one simple way could be that we compute the number of
workers based on filesize (some experiments are required to determine
this) unless the user has given the input.  If the user has provided
the input then we can use that with an upper limit to
max_parallel_workers.


[1] - https://www.postgresql.org/message-id/CA%2BhUKGKZu8fpZo0W%3DPOmQEN46kXhLedzqqAnt5iJZy7tD0x6sw%40mail.gmail.com

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Alastair Turner-2
On Sat, 15 Feb 2020 at 04:55, Amit Kapila <[hidden email]> wrote:
>
> On Fri, Feb 14, 2020 at 7:16 PM Alastair Turner <[hidden email]> wrote:
> >
...

> >
> > Parsing rows from the raw input (the work done by CopyReadLine()) in a single process would accommodate line returns in quoted fields. I don't think there's a way of getting parallel workers to manage the in-quote/out-of-quote state required.
> >
>
> AFAIU, the whole of this in-quote/out-of-quote state is manged inside
> CopyReadLineText which will be done by each of the parallel workers,
> something on the lines of what Thomas did in his patch [1].
> Basically, we need to invent a mechanism to allocate chunks to
> individual workers and then the whole processing will be done as we
> are doing now except for special handling for partial tuples which I
> have explained in my previous email.  Am, I missing something here?
>
The problem case that I see is the chunk boundary falling in the
middle of a quoted field where
 - The quote opens in chunk 1
 - The quote closes in chunk 2
 - There is an EoL character between the start of chunk 2 and the closing quote

When the worker processing chunk 2 starts, it believes itself to be in
out-of-quote state, so only data between the start of the chunk and
the EoL is regarded as belonging to the partial line. From that point
on the parsing of the rest of the chunk goes off track.

Some of the resulting errors can be avoided by, for instance,
requiring a quote to be preceded by a delimiter or EoL. That answer
fails when fields end with EoL characters, which happens often enough
in the wild.

Recovering from an incorrect in-quote/out-of-quote state assumption at
the start of parsing a chunk just seems like a hole with no bottom. So
it looks to me like it's best done in a single process which can keep
track of that state reliably.

--
Aastair


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
On Sat, Feb 15, 2020 at 4:08 PM Alastair Turner <[hidden email]> wrote:

>
> On Sat, 15 Feb 2020 at 04:55, Amit Kapila <[hidden email]> wrote:
> >
> > On Fri, Feb 14, 2020 at 7:16 PM Alastair Turner <[hidden email]> wrote:
> > >
> ...
> > >
> > > Parsing rows from the raw input (the work done by CopyReadLine()) in a single process would accommodate line returns in quoted fields. I don't think there's a way of getting parallel workers to manage the in-quote/out-of-quote state required.
> > >
> >
> > AFAIU, the whole of this in-quote/out-of-quote state is manged inside
> > CopyReadLineText which will be done by each of the parallel workers,
> > something on the lines of what Thomas did in his patch [1].
> > Basically, we need to invent a mechanism to allocate chunks to
> > individual workers and then the whole processing will be done as we
> > are doing now except for special handling for partial tuples which I
> > have explained in my previous email.  Am, I missing something here?
> >
> The problem case that I see is the chunk boundary falling in the
> middle of a quoted field where
>  - The quote opens in chunk 1
>  - The quote closes in chunk 2
>  - There is an EoL character between the start of chunk 2 and the closing quote
>
> When the worker processing chunk 2 starts, it believes itself to be in
> out-of-quote state, so only data between the start of the chunk and
> the EoL is regarded as belonging to the partial line. From that point
> on the parsing of the rest of the chunk goes off track.
>
> Some of the resulting errors can be avoided by, for instance,
> requiring a quote to be preceded by a delimiter or EoL. That answer
> fails when fields end with EoL characters, which happens often enough
> in the wild.
>
> Recovering from an incorrect in-quote/out-of-quote state assumption at
> the start of parsing a chunk just seems like a hole with no bottom. So
> it looks to me like it's best done in a single process which can keep
> track of that state reliably.
>

Good point and I agree with you that having a single process would
avoid any such stuff.   However, I will think some more on it and if
you/anyone else gets some idea on how to deal with this in a
multi-worker system (where we can allow each worker to read and
process the chunk) then feel free to share your thoughts.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

David Fetter
On Sat, Feb 15, 2020 at 06:02:06PM +0530, Amit Kapila wrote:

> On Sat, Feb 15, 2020 at 4:08 PM Alastair Turner <[hidden email]> wrote:
> >
> > On Sat, 15 Feb 2020 at 04:55, Amit Kapila <[hidden email]> wrote:
> > >
> > > On Fri, Feb 14, 2020 at 7:16 PM Alastair Turner <[hidden email]> wrote:
> > > >
> > ...
> > > >
> > > > Parsing rows from the raw input (the work done by CopyReadLine()) in a single process would accommodate line returns in quoted fields. I don't think there's a way of getting parallel workers to manage the in-quote/out-of-quote state required.
> > > >
> > >
> > > AFAIU, the whole of this in-quote/out-of-quote state is manged inside
> > > CopyReadLineText which will be done by each of the parallel workers,
> > > something on the lines of what Thomas did in his patch [1].
> > > Basically, we need to invent a mechanism to allocate chunks to
> > > individual workers and then the whole processing will be done as we
> > > are doing now except for special handling for partial tuples which I
> > > have explained in my previous email.  Am, I missing something here?
> > >
> > The problem case that I see is the chunk boundary falling in the
> > middle of a quoted field where
> >  - The quote opens in chunk 1
> >  - The quote closes in chunk 2
> >  - There is an EoL character between the start of chunk 2 and the closing quote
> >
> > When the worker processing chunk 2 starts, it believes itself to be in
> > out-of-quote state, so only data between the start of the chunk and
> > the EoL is regarded as belonging to the partial line. From that point
> > on the parsing of the rest of the chunk goes off track.
> >
> > Some of the resulting errors can be avoided by, for instance,
> > requiring a quote to be preceded by a delimiter or EoL. That answer
> > fails when fields end with EoL characters, which happens often enough
> > in the wild.
> >
> > Recovering from an incorrect in-quote/out-of-quote state assumption at
> > the start of parsing a chunk just seems like a hole with no bottom. So
> > it looks to me like it's best done in a single process which can keep
> > track of that state reliably.
> >
>
> Good point and I agree with you that having a single process would
> avoid any such stuff.   However, I will think some more on it and if
> you/anyone else gets some idea on how to deal with this in a
> multi-worker system (where we can allow each worker to read and
> process the chunk) then feel free to share your thoughts.

I see two pieces of this puzzle: an input format we control, and the
ones we don't.

In the former case, we could encode all fields with base85 (or
something similar that reduces the input alphabet efficiently), then
reserve bytes that denote delimiters of various types. ASCII has
separators for file, group, record, and unit that we could use as
inspiration.

I don't have anything to offer for free-form input other than to agree
that it looks like a hole with no bottom, and maybe we should just
keep that process serial, at least until someone finds a bottom.

Best,
David.
--
David Fetter <david(at)fetter(dot)org> http://fetter.org/
Phone: +1 415 235 3778

Remember to vote!
Consider donating to Postgres: http://www.postgresql.org/about/donate


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Andrew Dunstan-8
In reply to this post by akapila

On 2/15/20 7:32 AM, Amit Kapila wrote:

> On Sat, Feb 15, 2020 at 4:08 PM Alastair Turner <[hidden email]> wrote:
>> On Sat, 15 Feb 2020 at 04:55, Amit Kapila <[hidden email]> wrote:
>>> On Fri, Feb 14, 2020 at 7:16 PM Alastair Turner <[hidden email]> wrote:
>> ...
>>>> Parsing rows from the raw input (the work done by CopyReadLine()) in a single process would accommodate line returns in quoted fields. I don't think there's a way of getting parallel workers to manage the in-quote/out-of-quote state required.
>>>>
>>> AFAIU, the whole of this in-quote/out-of-quote state is manged inside
>>> CopyReadLineText which will be done by each of the parallel workers,
>>> something on the lines of what Thomas did in his patch [1].
>>> Basically, we need to invent a mechanism to allocate chunks to
>>> individual workers and then the whole processing will be done as we
>>> are doing now except for special handling for partial tuples which I
>>> have explained in my previous email.  Am, I missing something here?
>>>
>> The problem case that I see is the chunk boundary falling in the
>> middle of a quoted field where
>>  - The quote opens in chunk 1
>>  - The quote closes in chunk 2
>>  - There is an EoL character between the start of chunk 2 and the closing quote
>>
>> When the worker processing chunk 2 starts, it believes itself to be in
>> out-of-quote state, so only data between the start of the chunk and
>> the EoL is regarded as belonging to the partial line. From that point
>> on the parsing of the rest of the chunk goes off track.
>>
>> Some of the resulting errors can be avoided by, for instance,
>> requiring a quote to be preceded by a delimiter or EoL. That answer
>> fails when fields end with EoL characters, which happens often enough
>> in the wild.
>>
>> Recovering from an incorrect in-quote/out-of-quote state assumption at
>> the start of parsing a chunk just seems like a hole with no bottom. So
>> it looks to me like it's best done in a single process which can keep
>> track of that state reliably.
>>
> Good point and I agree with you that having a single process would
> avoid any such stuff.   However, I will think some more on it and if
> you/anyone else gets some idea on how to deal with this in a
> multi-worker system (where we can allow each worker to read and
> process the chunk) then feel free to share your thoughts.
>


IIRC, in_quote only matters here in CSV mode (because CSV fields can
have embedded newlines). So why not just forbid parallel copy in CSV
mode, at least for now? I guess it depends on the actual use case. If we
expect to be parallel loading humungous CSVs then that won't fly.


cheers


andrew

--
Andrew Dunstan                https://www.2ndQuadrant.com
PostgreSQL Development, 24x7 Support, Remote DBA, Training & Services



Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
On Sun, Feb 16, 2020 at 12:21 PM Andrew Dunstan
<[hidden email]> wrote:

> On 2/15/20 7:32 AM, Amit Kapila wrote:
> > On Sat, Feb 15, 2020 at 4:08 PM Alastair Turner <[hidden email]> wrote:
> >>>
> >> The problem case that I see is the chunk boundary falling in the
> >> middle of a quoted field where
> >>  - The quote opens in chunk 1
> >>  - The quote closes in chunk 2
> >>  - There is an EoL character between the start of chunk 2 and the closing quote
> >>
> >> When the worker processing chunk 2 starts, it believes itself to be in
> >> out-of-quote state, so only data between the start of the chunk and
> >> the EoL is regarded as belonging to the partial line. From that point
> >> on the parsing of the rest of the chunk goes off track.
> >>
> >> Some of the resulting errors can be avoided by, for instance,
> >> requiring a quote to be preceded by a delimiter or EoL. That answer
> >> fails when fields end with EoL characters, which happens often enough
> >> in the wild.
> >>
> >> Recovering from an incorrect in-quote/out-of-quote state assumption at
> >> the start of parsing a chunk just seems like a hole with no bottom. So
> >> it looks to me like it's best done in a single process which can keep
> >> track of that state reliably.
> >>
> > Good point and I agree with you that having a single process would
> > avoid any such stuff.   However, I will think some more on it and if
> > you/anyone else gets some idea on how to deal with this in a
> > multi-worker system (where we can allow each worker to read and
> > process the chunk) then feel free to share your thoughts.
> >
>
>
> IIRC, in_quote only matters here in CSV mode (because CSV fields can
> have embedded newlines).
>

AFAIU, that is correct.

> So why not just forbid parallel copy in CSV
> mode, at least for now? I guess it depends on the actual use case. If we
> expect to be parallel loading humungous CSVs then that won't fly.
>

I am not sure about this part.  However, I guess we should at the very
least have some extendable solution that can deal with csv, otherwise,
we might end up re-designing everything if someday we want to deal
with CSV.  One naive idea is that in csv mode, we can set up the
things slightly differently like the worker, won't start processing
the chunk unless the previous chunk is completely parsed.  So each
worker would first parse and tokenize the entire chunk and then start
writing it.  So, this will make the reading/parsing part serialized,
but writes can still be parallel.  Now, I don't know if it is a good
idea to process in a different way for csv mode.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Ants Aasma-2
In reply to this post by akapila
On Sat, 15 Feb 2020 at 14:32, Amit Kapila <[hidden email]> wrote:
> Good point and I agree with you that having a single process would
> avoid any such stuff.   However, I will think some more on it and if
> you/anyone else gets some idea on how to deal with this in a
> multi-worker system (where we can allow each worker to read and
> process the chunk) then feel free to share your thoughts.

I think having a single process handle splitting the input into tuples makes
most sense. It's possible to parse csv at multiple GB/s rates [1], finding
tuple boundaries is a subset of that task.

My first thought for a design would be to have two shared memory ring buffers,
one for data and one for tuple start positions. Reader process reads the CSV
data into the main buffer, finds tuple start locations in there and writes
those to the secondary buffer.

Worker processes claim a chunk of tuple positions from the secondary buffer and
update their "keep this data around" position with the first position. Then
proceed to parse and insert the tuples, updating their position until they find
the end of the last tuple in the chunk.

Buffer size, maximum and minimum chunk size could be tunable. Ideally the
buffers would be at least big enough to absorb one of the workers getting
scheduled out for a timeslice, which could be up to tens of megabytes.

Regards,
Ants Aasma

[1] https://github.com/geofflangdale/simdcsv/


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Kyotaro Horiguchi-4
In reply to this post by akapila
At Mon, 17 Feb 2020 16:49:22 +0530, Amit Kapila <[hidden email]> wrote in

> On Sun, Feb 16, 2020 at 12:21 PM Andrew Dunstan
> <[hidden email]> wrote:
> > On 2/15/20 7:32 AM, Amit Kapila wrote:
> > > On Sat, Feb 15, 2020 at 4:08 PM Alastair Turner <[hidden email]> wrot> > So why not just forbid parallel copy in CSV
> > mode, at least for now? I guess it depends on the actual use case. If we
> > expect to be parallel loading humungous CSVs then that won't fly.
> >
>
> I am not sure about this part.  However, I guess we should at the very
> least have some extendable solution that can deal with csv, otherwise,
> we might end up re-designing everything if someday we want to deal
> with CSV.  One naive idea is that in csv mode, we can set up the
> things slightly differently like the worker, won't start processing
> the chunk unless the previous chunk is completely parsed.  So each
> worker would first parse and tokenize the entire chunk and then start
> writing it.  So, this will make the reading/parsing part serialized,
> but writes can still be parallel.  Now, I don't know if it is a good
> idea to process in a different way for csv mode.

In an extreme case, if we didn't see a QUOTE in a chunk, we cannot
know the chunk is in a quoted section or not, until all the past
chunks are parsed.  After all we are forced to parse fully
sequentially as far as we allow QUOTE.

On the other hand, if we allowed "COPY t FROM f WITH (FORMAT CSV,
QUOTE '')" in order to signal that there's no quoted section in the
file then all chunks would be fully concurrently parsable.

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Thomas Munro-5
In reply to this post by Ants Aasma-2
On Tue, Feb 18, 2020 at 4:04 AM Ants Aasma <[hidden email]> wrote:

> On Sat, 15 Feb 2020 at 14:32, Amit Kapila <[hidden email]> wrote:
> > Good point and I agree with you that having a single process would
> > avoid any such stuff.   However, I will think some more on it and if
> > you/anyone else gets some idea on how to deal with this in a
> > multi-worker system (where we can allow each worker to read and
> > process the chunk) then feel free to share your thoughts.
>
> I think having a single process handle splitting the input into tuples makes
> most sense. It's possible to parse csv at multiple GB/s rates [1], finding
> tuple boundaries is a subset of that task.

Yeah, this is compelling.  Even though it has to read the file
serially, the real gains from parallel COPY should come from doing the
real work in parallel: data-type parsing, tuple forming, WHERE clause
filtering, partition routing, buffer management, insertion and
associated triggers, FKs and index maintenance.

The reason I used the other approach for the file_fdw patch is that I
was trying to make it look as much as possible like parallel
sequential scan and not create an extra worker, because I didn't feel
like an FDW should be allowed to do that (what if executor nodes all
over the query tree created worker processes willy-nilly?).  Obviously
it doesn't work correctly for embedded newlines, and even if you
decree that multi-line values aren't allowed in parallel COPY, the
stuff about tuples crossing chunk boundaries is still a bit unpleasant
(whether solved by double reading as I showed, or a bunch of tap
dancing in shared memory) and creates overheads.

> My first thought for a design would be to have two shared memory ring buffers,
> one for data and one for tuple start positions. Reader process reads the CSV
> data into the main buffer, finds tuple start locations in there and writes
> those to the secondary buffer.
>
> Worker processes claim a chunk of tuple positions from the secondary buffer and
> update their "keep this data around" position with the first position. Then
> proceed to parse and insert the tuples, updating their position until they find
> the end of the last tuple in the chunk.

+1.  That sort of two-queue scheme is exactly how I sketched out a
multi-consumer queue for a hypothetical Parallel Scatter node.  It
probably gets a bit trickier when the payload has to be broken up into
fragments to wrap around the "data" buffer N times.


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Ants Aasma-2
On Tue, 18 Feb 2020 at 04:40, Thomas Munro <[hidden email]> wrote:
> +1.  That sort of two-queue scheme is exactly how I sketched out a
> multi-consumer queue for a hypothetical Parallel Scatter node.  It
> probably gets a bit trickier when the payload has to be broken up into
> fragments to wrap around the "data" buffer N times.

At least for copy it should be easy enough - it already has to handle reading
data block by block. If worker updates its position while doing so the reader
can wrap around the data buffer.

There will be no parallelism while one worker is buffering up a line larger
than the data buffer, but that doesn't seem like a major issue. Once the line is
buffered and begins inserting next worker can start buffering the next tuple.

Regards,
Ants Aasma


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
In reply to this post by Ants Aasma-2
On Mon, Feb 17, 2020 at 8:34 PM Ants Aasma <[hidden email]> wrote:

>
> On Sat, 15 Feb 2020 at 14:32, Amit Kapila <[hidden email]> wrote:
> > Good point and I agree with you that having a single process would
> > avoid any such stuff.   However, I will think some more on it and if
> > you/anyone else gets some idea on how to deal with this in a
> > multi-worker system (where we can allow each worker to read and
> > process the chunk) then feel free to share your thoughts.
>
> I think having a single process handle splitting the input into tuples makes
> most sense. It's possible to parse csv at multiple GB/s rates [1], finding
> tuple boundaries is a subset of that task.
>
> My first thought for a design would be to have two shared memory ring buffers,
> one for data and one for tuple start positions. Reader process reads the CSV
> data into the main buffer, finds tuple start locations in there and writes
> those to the secondary buffer.
>
> Worker processes claim a chunk of tuple positions from the secondary buffer and
> update their "keep this data around" position with the first position. Then
> proceed to parse and insert the tuples, updating their position until they find
> the end of the last tuple in the chunk.
>

This is something similar to what I had also in mind for this idea.  I
had thought of handing over complete chunk (64K or whatever we
decide).  The one thing that slightly bothers me is that we will add
some additional overhead of copying to and from shared memory which
was earlier from local process memory.  And, the tokenization (finding
line boundaries) would be serial.  I think that tokenization should be
a small part of the overall work we do during the copy operation, but
will do some measurements to ascertain the same.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
In reply to this post by Kyotaro Horiguchi-4
On Tue, Feb 18, 2020 at 7:28 AM Kyotaro Horiguchi
<[hidden email]> wrote:

>
> At Mon, 17 Feb 2020 16:49:22 +0530, Amit Kapila <[hidden email]> wrote in
> > On Sun, Feb 16, 2020 at 12:21 PM Andrew Dunstan
> > <[hidden email]> wrote:
> > > On 2/15/20 7:32 AM, Amit Kapila wrote:
> > > > On Sat, Feb 15, 2020 at 4:08 PM Alastair Turner <[hidden email]> wrot> > So why not just forbid parallel copy in CSV
> > > mode, at least for now? I guess it depends on the actual use case. If we
> > > expect to be parallel loading humungous CSVs then that won't fly.
> > >
> >
> > I am not sure about this part.  However, I guess we should at the very
> > least have some extendable solution that can deal with csv, otherwise,
> > we might end up re-designing everything if someday we want to deal
> > with CSV.  One naive idea is that in csv mode, we can set up the
> > things slightly differently like the worker, won't start processing
> > the chunk unless the previous chunk is completely parsed.  So each
> > worker would first parse and tokenize the entire chunk and then start
> > writing it.  So, this will make the reading/parsing part serialized,
> > but writes can still be parallel.  Now, I don't know if it is a good
> > idea to process in a different way for csv mode.
>
> In an extreme case, if we didn't see a QUOTE in a chunk, we cannot
> know the chunk is in a quoted section or not, until all the past
> chunks are parsed.  After all we are forced to parse fully
> sequentially as far as we allow QUOTE.
>

Right, I think the benefits of this as compared to single reader idea
would be (a) we can save accessing shared memory for the most part of
the chunk (b) for non-csv mode, even the tokenization (finding line
boundaries) would also be parallel.   OTOH, doing processing
differently for csv and non-csv mode might not be good.

> On the other hand, if we allowed "COPY t FROM f WITH (FORMAT CSV,
> QUOTE '')" in order to signal that there's no quoted section in the
> file then all chunks would be fully concurrently parsable.
>

Yeah, if we can provide such an option, we can probably make parallel
csv processing equivalent to non-csv.  However, users might not like
this as I think in some cases it won't be easier for them to tell
whether the file has quoted fields or not.  I am not very sure of this
point.

--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Kyotaro Horiguchi-4
At Tue, 18 Feb 2020 15:59:36 +0530, Amit Kapila <[hidden email]> wrote in

> On Tue, Feb 18, 2020 at 7:28 AM Kyotaro Horiguchi
> <[hidden email]> wrote:
> >
> > In an extreme case, if we didn't see a QUOTE in a chunk, we cannot
> > know the chunk is in a quoted section or not, until all the past
> > chunks are parsed.  After all we are forced to parse fully
> > sequentially as far as we allow QUOTE.
> >
>
> Right, I think the benefits of this as compared to single reader idea
> would be (a) we can save accessing shared memory for the most part of
> the chunk (b) for non-csv mode, even the tokenization (finding line
> boundaries) would also be parallel.   OTOH, doing processing
> differently for csv and non-csv mode might not be good.

Agreed. So I think it's a good point of compromize.

> > On the other hand, if we allowed "COPY t FROM f WITH (FORMAT CSV,
> > QUOTE '')" in order to signal that there's no quoted section in the
> > file then all chunks would be fully concurrently parsable.
> >
>
> Yeah, if we can provide such an option, we can probably make parallel
> csv processing equivalent to non-csv.  However, users might not like
> this as I think in some cases it won't be easier for them to tell
> whether the file has quoted fields or not.  I am not very sure of this
> point.

I'm not sure how large portion of the usage contains quoted sections,
so I'm not sure how it is useful..

regards.

--
Kyotaro Horiguchi
NTT Open Source Software Center


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Ants Aasma-2
In reply to this post by akapila
On Tue, 18 Feb 2020 at 12:20, Amit Kapila <[hidden email]> wrote:
> This is something similar to what I had also in mind for this idea.  I
> had thought of handing over complete chunk (64K or whatever we
> decide).  The one thing that slightly bothers me is that we will add
> some additional overhead of copying to and from shared memory which
> was earlier from local process memory.  And, the tokenization (finding
> line boundaries) would be serial.  I think that tokenization should be
> a small part of the overall work we do during the copy operation, but
> will do some measurements to ascertain the same.

I don't think any extra copying is needed. The reader can directly
fread()/pq_copymsgbytes() into shared memory, and the workers can run
CopyReadLineText() inner loop directly off of the buffer in shared memory.

For serial performance of tokenization into lines, I really think a SIMD
based approach will be fast enough for quite some time. I hacked up the code in
the simdcsv  project to only tokenize on line endings and it was able to
tokenize a CSV file with short lines at 8+ GB/s. There are going to be many
other bottlenecks before this one starts limiting. Patch attached if you'd
like to try that out.

Regards,
Ants Aasma

simdcsv-find-only-lineendings.diff (2K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

akapila
On Tue, Feb 18, 2020 at 5:59 PM Ants Aasma <[hidden email]> wrote:

>
> On Tue, 18 Feb 2020 at 12:20, Amit Kapila <[hidden email]> wrote:
> > This is something similar to what I had also in mind for this idea.  I
> > had thought of handing over complete chunk (64K or whatever we
> > decide).  The one thing that slightly bothers me is that we will add
> > some additional overhead of copying to and from shared memory which
> > was earlier from local process memory.  And, the tokenization (finding
> > line boundaries) would be serial.  I think that tokenization should be
> > a small part of the overall work we do during the copy operation, but
> > will do some measurements to ascertain the same.
>
> I don't think any extra copying is needed.
>

I am talking about access to shared memory instead of the process
local memory.  I understand that an extra copy won't be required.

> The reader can directly
> fread()/pq_copymsgbytes() into shared memory, and the workers can run
> CopyReadLineText() inner loop directly off of the buffer in shared memory.
>

I am slightly confused here.  AFAIU, the for(;;) loop in
CopyReadLineText is about finding the line endings which we thought
that the reader process will do.



--
With Regards,
Amit Kapila.
EnterpriseDB: http://www.enterprisedb.com


Reply | Threaded
Open this post in threaded view
|

Re: Parallel copy

Mike Blackwell-2
In reply to this post by Andrew Dunstan-8
On Sun, Feb 16, 2020 at 12:51 AM Andrew Dunstan <[hidden email]> wrote:

IIRC, in_quote only matters here in CSV mode (because CSV fields can
have embedded newlines). So why not just forbid parallel copy in CSV
mode, at least for now? I guess it depends on the actual use case. If we
expect to be parallel loading humungous CSVs then that won't fly.

Loading large CSV files is pretty common here.  I hope this can be supported.



MIKE BLACKWELL


[hidden email]



12