This is an archive of the discontinued Mercurial Phabricator instance.

worker: Use buffered input from the pickle stream
ClosedPublic

Authored by heftig on Jan 30 2020, 4:29 PM.

Details

Summary

On Python 3, "pickle.load" will raise an exception ("_pickle.UnpicklingError:
pickle data was truncated") when it gets a short read, i.e. it receives fewer
bytes than it requested.

On our build machine, Mercurial seems to frequently hit this problem while
updating a mozilla-central clone iff it gets scheduled in batch mode. It is easy
to trigger with:

#wipe the workdir
rm -rf *
hg update null

chrt -b 0 hg update default

I've also written the following program, which demonstrates the core problem:

from __future__ import print_function

import io
import os
import pickle
import time

obj = {"a": 1, "b": 2}
obj_data = pickle.dumps(obj)
assert len(obj_data) > 10

rfd, wfd = os.pipe()

pid = os.fork()
if pid == 0:
    os.close(rfd)

    for _ in range(4):
        time.sleep(0.5)
        print("First write")
        os.write(wfd, obj_data[:10])

        time.sleep(0.5)
        print("Second write")
        os.write(wfd, obj_data[10:])

    os._exit(0)

try:
    os.close(wfd)

    rfile = os.fdopen(rfd, "rb", 0)

    print("Reading")
    while True:
        try:
            obj_copy = pickle.load(rfile)
            assert obj == obj_copy
        except EOFError:
            break
    print("Success")
finally:
    os.kill(pid, 15)

The program reliably fails with Python 3.8 and succeeds with Python 2.7.

Providing the unpickler with a buffered reader fixes the issue, so let
"os.fdopen" create one.

https://bugzilla.mozilla.org/show_bug.cgi?id=1604486

Diff Detail

Repository
rHG Mercurial
Branch
stable
Lint
No Linters Available
Unit
No Unit Test Coverage

Event Timeline

heftig created this revision.Jan 30 2020, 4:29 PM
heftig edited the summary of this revision. (Show Details)Jan 30 2020, 4:33 PM
This revision was not accepted when it landed; it landed in state Needs Review.
This revision was automatically updated to reflect the committed changes.
yuja added a subscriber: yuja.Feb 4 2020, 7:38 AM
for rfd, wfd in pipes:
    os.close(wfd)
  • selector.register(os.fdopen(rfd, 'rb', 0), selectors.EVENT_READ)

+ selector.register(os.fdopen(rfd, 'rb'), selectors.EVENT_READ)

Using buffered I/O can cause a deadlock (until the worker process exits.)
The master process expects EVENT_READ will be asserted (i.e. level-triggered)
if there are more than one readable items, but buffered file won't
since almost all readable items will be moved to its internal buffer.

import time
from mercurial import (
    ui as uimod,
    worker,
)

def some_work(n):
    # send back many items at once
    for i in range(10):
        yield (n, i)
    # and don't close() the pipe for a while
    time.sleep(10)

ui = uimod.ui()
ui.setconfig(b'worker', b'numcpus', b'2')
gen = worker._posixworker(ui, some_work, staticargs=(), args=[0, 1],
                          hasretval=False)
for x in gen:
    ui.write(b'%r\n' % (x,))
    ui.flush()