This is an archive of the discontinued Mercurial Phabricator instance.

worker: Manually buffer reads from pickle stream
ClosedPublic

Authored by heftig on Feb 4 2020, 7:17 PM.

Details

Summary

My previous fix (D8051, which added Python's built-in buffering to the pickle
stream) has the problem that the selector will ignore the buffer. When multiple
pickled objects are read from the pipe into the buffer at once, only one object
will be loaded.

This can repeat until the buffer is full and delays the processing of completed
items until the worker exits, at which point the pipe is always considered
readable and all remaining items are processed.

This changeset reverts D8051, removing the buffer again. Instead, on Python 3
only, we use a wrapper to modify the "read" provided to the Unpickler to behave
more like a buffered read. We never read more bytes from the pipe than the
Unpickler requests, so the selector behaves as expected.

Also add a test case for "pickle data was truncated" issue.

https://phab.mercurial-scm.org/D8051#119193

Diff Detail

Repository
rHG Mercurial
Branch
stable
Lint
No Linters Available
Unit
No Unit Test Coverage

Event Timeline

heftig created this revision.Feb 4 2020, 7:17 PM
heftig added a reviewer: yuja.Feb 4 2020, 7:19 PM
heftig added a subscriber: yuja.
heftig added a comment.Feb 4 2020, 7:35 PM

I went for the wrapper overriding read after failing to find a way to ask BufferedReader whether data remains in the buffer. Even peek will trigger a blocking read when the buffer is empty.

yuja added a comment.Feb 6 2020, 10:11 AM

+if pycompat.ispy3:
+
+ class _blockingreader(object):
+ def init(self, wrapped):
+ self._wrapped = wrapped
+
+ def getattr(self, attr):
+ return getattr(self._wrapped, attr)
+
+ # issue multiple reads until size is fulfilled
+ def read(self, size=-1):
+ if size < 0:
+ return self._wrapped.readall()
+
+ buf = bytearray(size)
+ view = memoryview(buf)
+ pos = 0
+
+ while pos < size:
+ ret = self._wrapped.readinto(view[pos:])
+ if not ret:
+ break
+ pos += ret
+
+ del view
+ del buf[pos:]
+ return buf

Might be better to optimize the common case wrapped.read(size) == size.
FWIW, if we don't mind issuing extra read() syscalls, maybe we can abuse
BufferedReader of buffer_size=1.

Another option is to rewrite the select loop to fully manage response buffer
by ourselves.

for key, events in selector.select():
    ...
    our_buffer.extend(key.fileobj.read())
    temp_io = BytesIO(our_buffer)
    while ...:
        try:
            pickle.load(temp_io)
        except ...
            ...
    del our_buffer[:temp_io.tell()]
heftig added a comment.EditedFeb 7 2020, 9:50 AM
In D8076#119526, @yuja wrote:

Might be better to optimize the common case wrapped.read(size) == size.

I thought my code was already pretty optimized: We allocate the buffer to read into just once (no matter how many reads we issue) and give it to the unpickler without copying the data.

FWIW, if we don't mind issuing extra read() syscalls, maybe we can abuse
BufferedReader of buffer_size=1.

My gut says that using just read(1) will be way worse when we try to transmit large objects. That said, I don't know if the Unpickler tries to do such large reads.

Another option is to rewrite the select loop to fully manage response buffer
by ourselves.

for key, events in selector.select():
    ...
    our_buffer.extend(key.fileobj.read())
    temp_io = BytesIO(our_buffer)
    while ...:
        try:
            pickle.load(temp_io)
        except ...
            ...
    del our_buffer[:temp_io.tell()]

That would be an option, too, although bytearray.extend + BytesIO.read + bytearray.__delitem__ looks a bit heavy on the memcpys/memmoves. We also have to restart the unpickling whenever the data was truncated. And remember when we're actually EOF.

Wouldn't it look more like this?

for key, events in selector.select():
    try:
        read_data = key.fileobj.read()
    except IOError as e:
        if e.errno == errno.EINTR:
            continue
        raise:
    eof = not read_data
    if eof:
        selector.unregister(key.fileobj)
        key.fileobj.close()
        openpipes -= 1
    buf = buffers[key.fileobj]
    buf.extend(read_data)
    temp_io = BytesIO(buf)
    pos = 0
    while True:
        try:
            res = pickle.load(temp_io)
        except EOFError:
            break
        except UnpicklingError:
            if eof:
                raise
            break
        pos = temp_io.tell()
        if hasretval and res[0]:
            retval.update(res[1])
        else:
            yield res
    del buf[:pos]

This gets pretty convoluted.

yuja added a comment.Feb 7 2020, 10:52 AM
> Might be better to optimize the common case `wrapped.read(size) == size`.
I thought my code was already pretty optimized: We allocate the buffer to read into just once (no matter how many reads we issue) and give it to the unpickler without copying the data.

My gut feeling is that running Python code is generally slow compared to calling
a C function, but yeah, that wouldn't matter here.

Queued for stable since the previous attempt was in stable. Thanks.

> FWIW, if we don't mind issuing extra `read()` syscalls, maybe we can abuse
> BufferedReader of `buffer_size=1`.
My gut says that using just read(1) will be way worse when we try to transmit large objects. That said, I don't know if the Unpickler tries to do such large reads.

Unpickler does issue a few small read()s + one read(serialized_obj_size), so
using unbuffered IO (like the original implementation) isn't optimal. Using
a buffered I/O of buffer_size=1 appears to split read(serialized_obj_size)
into read(1) + read(serialized_obj_size - 1) on Python 3.7, but that is
an implementation detail and I don't know how it differs across Python versions.

That would be an option, too, although `bytearray.extend` + `BytesIO.read` + `bytearray.__delitem__` looks a bit heavy on the memcpys/memmoves. We also have to restart the unpickling whenever the data was truncated. And remember when we're actually EOF.

Suppose repeated os.read()s are way slower than memcpy, I guess this is faster,
but I agree it's a bit more complex.

This revision was not accepted when it landed; it landed in state Needs Review.
This revision was automatically updated to reflect the committed changes.