This significantly speeds up lfs prefetch. With fast network we are
seeing ~50% improvement of overall prefetch times
Because of worker's API in posix we do lose finegrained progress update and only
see progress when a file finished downloading.
Details
- Reviewers
quark mharbison72 - Group Reviewers
hg-reviewers - Commits
- rHGf98fac24b757: lfs: using workers in lfs prefetch
rHG2b7c0cba308f: lfs: using workers in lfs prefetch
Run tests:
./run-tests.py -l test-lfs*
....
Ran 4 tests, 0 skipped, 0 failed.
Run commands resulting in lfs prefetch e.g. hg sparse --enable-profile
Diff Detail
- Repository
- rHG Mercurial
- Lint
Automatic diff as part of commit; lint not applicable. - Unit
Automatic diff as part of commit; unit tests not applicable.
Event Timeline
Actually, test-lfs-test-server.t is probably the only way to trigger this code path. So you might want to install that binary and re-run the test.
There's a test that I copied into core from that repo that references 'https://dewey-lfs.vip.facebook.com/lfs', which isn't accessible from the outside. I wasn't able to delete an lfs file and get the same error via lfs-test-server. So I'm not sure if I should drop it or what. Just an FYI if you're going to rely on core tests after dropping the extension.
Tested with the server:
[wlis@dev9680.prn1 ~/hg-committed/tests] ./run-tests.py -l test-lfs* .... # Ran 4 tests, 0 skipped, 0 failed.
Windows doesn't like this for some reason.
+ Traceback (most recent call last): + File "c:/Users/Matt/projects/hg/hg", line 41, in <module> + dispatch.run() + File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 88, in run + status = (dispatch(req) or 0) & 255 + File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 177, in dispatch + ret = _runcatch(req) + File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 317, in _runcatch + return _callcatch(ui, _runcatchfunc) + File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 325, in _callcatch + return scmutil.callcatch(ui, func) + File "c:\Users\Matt\projects\hg\mercurial\scmutil.py", line 154, in callcatch + return func() + File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 307, in _runcatchfunc + return _dispatch(req) + File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 911, in _dispatch + cmdpats, cmdoptions) + File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 666, in runcommand + ret = _runcommand(ui, options, cmd, d) + File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 919, in _runcommand + return cmdfunc() + File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 908, in <lambda> + d = lambda: util.checksignature(func)(ui, *args, **strcmdopt) + File "c:\Users\Matt\projects\hg\mercurial\util.py", line 1188, in check + return func(*args, **kwargs) + File "c:\Users\Matt\projects\hg\mercurial\commands.py", line 4163, in push + opargs=opargs) + File "c:\Users\Matt\projects\hg\mercurial\exchange.py", line 473, in push + _pushbundle2(pushop) + File "c:\Users\Matt\projects\hg\mercurial\exchange.py", line 963, in _pushbundle2 + ret = partgen(pushop, bundler) + File "c:\Users\Matt\projects\hg\mercurial\exchange.py", line 775, in _pushb2ctx + pushop.repo.prepushoutgoinghooks(pushop) + File "c:\Users\Matt\projects\hg\mercurial\util.py", line 3125, in __call__ + results.append(hook(*args)) + File "c:\Users\Matt\projects\hg\hgext\lfs\wrapper.py", line 261, in prepush + return uploadblobsfromrevs(pushop.repo, pushop.outgoing.missing) + File "c:\Users\Matt\projects\hg\hgext\lfs\wrapper.py", line 252, in uploadblobsfromrevs + uploadblobs(repo, pointers) + File "c:\Users\Matt\projects\hg\hgext\lfs\wrapper.py", line 304, in uploadblobs + remoteblob.writebatch(pointers, repo.svfs.lfslocalblobstore) + File "c:\Users\Matt\projects\hg\hgext\lfs\blobstore.py", line 100, in writebatch + self._batch(pointers, fromstore, 'upload') + File "c:\Users\Matt\projects\hg\hgext\lfs\blobstore.py", line 261, in _batch + for _one, oid in oids: + File "c:\Users\Matt\projects\hg\hgext\lfs\blobstore.py", line 245, in transfer + self._basictransfer(obj, action, localstore) + File "c:\Users\Matt\projects\hg\hgext\lfs\blobstore.py", line 202, in _basictransfer + req = self.urlopener.open(request) + File "c:\Python27\lib\urllib2.py", line 427, in open + req = meth(req) + File "c:\Python27\lib\urllib2.py", line 1136, in do_request_ + 'Content-length', '%d' % len(data)) + TypeError: object of type 'file' has no len() + [1]
@mharbison72 you are right, the upload doesn't work because I removed the fliewithprogress wrapper around the file that adds couple functions that I didn't realize. That includes len.
Will fix very soon.
hgext/lfs/blobstore.py | ||
---|---|---|
227 | this line is at fault |
I must have messed up something when running tests previously- probably wrong revision. The tests actually catch the failure above:
[~/hg-committed/tests] ./run-tests.py -l test-lfs-test-server.t --- /home/wlis/hg-committed/tests/test-lfs-test-server.t +++ /home/wlis/hg-committed/tests/test-lfs-test-server.t.err @@ -43,54 +43,98 @@ pushing to ../repo2 searching for changes lfs: uploading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes) - 1 changesets found - uncompressed size of bundle content: - * (changelog) (glob) - * (manifests) (glob) - * a (glob) - adding changesets - adding manifests - adding file changes - added 1 changesets with 1 changes to 1 files + lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 5) + lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 4) + lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 3) + lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 2) + lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 1) + ** unknown exception encountered, please report by visiting + ** https://mercurial-scm.org/wiki/BugTracker + ** Python 2.7.5 (default, Aug 4 2017, 00:39:18) [GCC 4.8.5 20150623 (Red Hat 4.8.5-16)] + ** Mercurial Distributed SCM (version 4.4.1+203-4da86512789c) + ** Extensions loaded: lfs + Traceback (most recent call last): + File "/home/wlis/hg-committed/hg", line 41, in <module> + dispatch.run() + File "/home/wlis/hg-committed/mercurial/dispatch.py", line 88, in run + status = (dispatch(req) or 0) & 255 + File "/home/wlis/hg-committed/mercurial/dispatch.py", line 177, in dispatch + ret = _runcatch(req) + File "/home/wlis/hg-committed/mercurial/dispatch.py", line 318, in _runcatch + return _callcatch(ui, _runcatchfunc) + File "/home/wlis/hg-committed/mercurial/dispatch.py", line 326, in _callcatch + return scmutil.callcatch(ui, func) + File "/home/wlis/hg-committed/mercurial/scmutil.py", line 154, in callcatch + return func() + File "/home/wlis/hg-committed/mercurial/dispatch.py", line 308, in _runcatchfunc + return _dispatch(req) + File "/home/wlis/hg-committed/mercurial/dispatch.py", line 912, in _dispatch + cmdpats, cmdoptions) + File "/home/wlis/hg-committed/mercurial/dispatch.py", line 667, in runcommand + ret = _runcommand(ui, options, cmd, d) + File "/home/wlis/hg-committed/mercurial/dispatch.py", line 920, in _runcommand + return cmdfunc() + File "/home/wlis/hg-committed/mercurial/dispatch.py", line 909, in <lambda> + d = lambda: util.checksignature(func)(ui, *args, **strcmdopt) + File "/home/wlis/hg-committed/mercurial/util.py", line 1188, in check + return func(*args, **kwargs) + File "/home/wlis/hg-committed/mercurial/commands.py", line 4160, in push + opargs=opargs) + File "/home/wlis/hg-committed/mercurial/exchange.py", line 475, in push + _pushbundle2(pushop) + File "/home/wlis/hg-committed/mercurial/exchange.py", line 1023, in _pushbundle2 + ret = partgen(pushop, bundler) + File "/home/wlis/hg-committed/mercurial/exchange.py", line 797, in _pushb2ctx + pushop.repo.prepushoutgoinghooks(pushop) + File "/home/wlis/hg-committed/mercurial/util.py", line 3125, in __call__ + results.append(hook(*args)) + File "/home/wlis/hg-committed/hgext/lfs/wrapper.py", line 263, in prepush + return uploadblobsfromrevs(pushop.repo, pushop.outgoing.missing) + File "/home/wlis/hg-committed/hgext/lfs/wrapper.py", line 254, in uploadblobsfromrevs + uploadblobs(repo, pointers) + File "/home/wlis/hg-committed/hgext/lfs/wrapper.py", line 306, in uploadblobs + remoteblob.writebatch(pointers, repo.svfs.lfslocalblobstore) + File "/home/wlis/hg-committed/hgext/lfs/blobstore.py", line 133, in writebatch + self._batch(pointers, fromstore, 'upload') + File "/home/wlis/hg-committed/hgext/lfs/blobstore.py", line 294, in _batch + for _one, oid in oids: + File "/home/wlis/hg-committed/hgext/lfs/blobstore.py", line 278, in transfer + self._basictransfer(obj, action, localstore) + File "/home/wlis/hg-committed/hgext/lfs/blobstore.py", line 235, in _basictransfer + req = self.urlopener.open(request) + File "/usr/lib64/python2.7/urllib2.py", line 429, in open + req = meth(req) + File "/usr/lib64/python2.7/urllib2.py", line 1152, in do_request_ + 'Content-length', '%d' % len(data)) + TypeError: object of type 'file' has no len() + [1] Clear the cache to force a download $ rm -rf `hg config lfs.usercache` $ cd ../repo2 $ hg update tip -v - resolving manifests - getting a - lfs: downloading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes) - 1 files updated, 0 files merged, 0 files removed, 0 files unresolved + 0 files updated, 0 files merged, 0 files removed, 0 files unresolved When the server has some blobs already $ hg mv a b + a: $ENOENT$ + abort: no files to copy + [255] $ echo ANOTHER-LARGE-FILE > c $ echo ANOTHER-LARGE-FILE2 > d $ hg commit -m b-and-c -A b c d + b: $ENOENT$ + abort: failed to mark all new/missing files as added/removed + [255] $ hg push ../repo1 -v | grep -v '^ ' pushing to ../repo1 - searching for changes - lfs: need to transfer 2 objects (39 bytes) - lfs: uploading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes) - lfs: uploading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes) - 1 changesets found - uncompressed size of bundle content: - adding changesets - adding manifests - adding file changes - added 1 changesets with 3 changes to 3 files + no changes found Clear the cache to force a download $ rm -rf `hg config lfs.usercache` $ hg --repo ../repo1 update tip -v - resolving manifests - getting b - getting c - lfs: downloading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes) - getting d - lfs: downloading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes) - 3 files updated, 0 files merged, 0 files removed, 0 files unresolved + 0 files updated, 0 files merged, 0 files removed, 0 files unresolved Check error message when the remote missed a blob: ERROR: test-lfs-test-server.t output changed ! Failed test-lfs-test-server.t: output changed # Ran 1 tests, 0 skipped, 1 failed. python hash seed: 1273209229 [~/hg-committed/tests]
Updated the test (as my changes change the output) and retested. Now everything works fine.
Works for me on Windows, thanks.
Are we going to have stability problems with these tests printing out the oid on completion? Maybe just set worker.numcpus = 1 for these tests?
I know nothing about the workers, so out of curiosity, why does the API on POSIX mean losing fine-grained progress?
@mharbison72 I am not sure if these tests are able to satisfy conditions to actually multithread. But you are right it there is an issue we can force 1 worker.
The workers on posix are implemented by forking and the only way of communication is through pipes created by worker.py code. Once forked they only communicate every some # of tasks (file fetches in this case) has been finished by the worker (I think # ~ 100 but not sure). We would have to change POSIX behaviour to allow reporting smaller pieces of progress through pipe (potentially 0 tasks finished). This would need changes in bunch of layers (worker, merge, blobstore) instead the current simple use of progress(...) function.
It is possible to implement that communication, but it is significant amount of work and testing.
I'm seeing some corruption that appears to be related to workers, so maybe we should default the lfs workers to 'disabled'?
The setup is the latest default + some convert extension hacking pushing between local repos on CentOS 7.4. The lfs host is gitbucket on another server running CentOS 7.4. There are 36 blobs to transfer, totaling 348MB. It prints out a bunch of "uploading" and "processed" messages, but then grinds to a halt, and eventually prints a "500: Internal Server Error" after a couple minutes. I added more logging, including printing out the httperror data, which ends up being a socket timeout exception. The server side is only getting part of some files. Typically it is one or two of the first files to be sent that end up being truncated. Since they are ~2MB C files, I can diff and see that it is the beginning of the file that is missing (different amounts each time). I also added code to filewithprogress.read() to tally up the length of the data read, and it is the expected size every time fp.read() returns None.
Setting worker.enabled = False seems to avoid the problem, and it can be reproduced on demand by toggling it back on. When trying to get a wireshark trace, I also noticed that changing lfs.url to http instead of https also seems to avoid the problem. (The response URLs for where to send the blobs were still all https, so I couldn't watch those. A coworker changed some config to try to make them http, and now a 400 is being returned, so I'm going to have him put it back.)
So this may not be a Mercurial problem. But I thought there were initial concerns about using workers safely, so I thought I'd report back and see if there were any ideas.
@mharbison72 Thank you for commenting with this issue. We didn't roll this to many people yet and didn't see the issue. I will try to test the scenario with upload of many large files and I'll comment back here soon.
It looks like keepalive.safesend() isn't sending everything. I can send this to the mailing list if it gets mangled, but I figured I'd try to keep this thread together if possible.
# HG changeset patch # User Matt Harbison <matt_harbison@yahoo.com> # Date 1515094298 18000 # Thu Jan 04 14:31:38 2018 -0500 # Branch bzr_convert # Node ID b0abd77f295edbf1df58674bc1ef7a6bc57e4096 # Parent 0653b44c70685f9ed6f5cf1689ca08f7bd2dbe34 xxx-lfs: attempt to isolate upload corruption with workers (Please don't queue this- it's only to help Wojciech isolate this.) It doesn't look like all of the data is being sent over the socket, although it appears that all of the data is being read from the file. Here's a sample transcript of an upload, with the subsequent wall of html removed. In this case, all 4 files failed checksum verification on the server side. The size in the "Sent %d bytes of data" line corresponds to what the server ends up with, and the other sizes match what the client starts with. I have no idea why this seems to work when 'lfs.url' is http instead of https. $ hg push ... pushing to lighttpd searching for changes 12f746f3f2493b2f39da7ecf63d7ee19c6ac9ec6a4fcd8c229da8a522cb12765: size 5278176 932b4ee4def2b434f85435d9e3e19ca8ba99ce9a065a61524b429a9d5e9b2e9c: size 5258384 ccdf7e788769838f8285b3ee672ed573358202305ee361cfec7a4a4fb005bbc7: size 2062258 dbc0ae9f8b05a7f84770ea303db5c1601500295548b3253e51f8889fcb38cc0a: size 5103733 Broken pipe! Broken pipe! Broken pipe! No more data on ccdf...bbc7. Read 2062258, expected 2062258 Sent 939954 bytes of data No more data on dbc0...cc0a. Read 5103733, expected 5103733 Sent 4931701 bytes of data No more data on 932b...2e9c. Read 5258384, expected 5258384 Sent 5176464 bytes of data No more data on 12f7...2765. Read 5278176, expected 5278176 Sent 5138912 bytes of data <html with java.net.SocketTimeoutException stacktrace removed> diff --git a/hgext/lfs/blobstore.py b/hgext/lfs/blobstore.py --- a/hgext/lfs/blobstore.py +++ b/hgext/lfs/blobstore.py @@ -64,12 +64,14 @@ class filewithprogress(object): Useful to provide progress information for how many bytes are read. """ - def __init__(self, fp, callback): + def __init__(self, fp, callback, name): self._fp = fp self._callback = callback # func(readsize) fp.seek(0, os.SEEK_END) self._len = fp.tell() + self.total = 0 fp.seek(0) + self.name = name def __len__(self): return self._len @@ -79,9 +81,12 @@ class filewithprogress(object): return b'' data = self._fp.read(size) if data: + self.total += len(data) if self._callback: self._callback(len(data)) else: + print('No more data on %s. Read %d, expected %d' % + (self.name, self.total, self._len)) self._fp.close() self._fp = None return data @@ -100,6 +105,12 @@ class local(object): self.cachevfs = lfsvfs(usercache) self.ui = repo.ui + def open(self, oid): + """Open a file descriptor for the named blob.""" + if self.cachevfs.exists(oid): + return self.cachevfs(oid) + return self.vfs(oid) + def write(self, oid, data, verify=True): """Write blob to local blobstore.""" if verify: @@ -254,9 +265,9 @@ class _gitlfsremote(object): request = util.urlreq.request(href) if action == 'upload': # If uploading blobs, read data from local blobstore. - with localstore.vfs(oid) as fp: + with localstore.open(oid) as fp: _verifyfile(oid, fp) - request.data = filewithprogress(localstore.vfs(oid), None) + request.data = filewithprogress(localstore.open(oid), None, oid) request.get_method = lambda: 'PUT' for k, v in headers: @@ -271,6 +282,8 @@ class _gitlfsremote(object): break response += data except util.urlerr.httperror as ex: + print('%s: %s' % (oid, ex.read())) + print('%s: %s' % (oid, ex.info())) raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)') % (ex, oid, action)) @@ -288,6 +301,7 @@ class _gitlfsremote(object): sizes = {} for obj in objects: sizes[obj.get('oid')] = obj.get('size', 0) + print('%s: size %d' % (obj.get('oid'), obj.get('size', 0))) topic = {'upload': _('lfs uploading'), 'download': _('lfs downloading')}[action] if len(objects) > 1: diff --git a/mercurial/keepalive.py b/mercurial/keepalive.py --- a/mercurial/keepalive.py +++ b/mercurial/keepalive.py @@ -553,14 +553,18 @@ def safesend(self, str): if self.debuglevel > 0: print("sending a read()able") data = read(blocksize) + total = 0 while data: + total += len(data) self.sock.sendall(data) data = read(blocksize) + print('Sent %d bytes of data' % total) else: self.sock.sendall(str) except socket.error as v: reraise = True if v[0] == errno.EPIPE: # Broken pipe + print('Broken pipe!') if self._HTTPConnection__state == httplib._CS_REQ_SENT: self._broken_pipe_resp = None self._broken_pipe_resp = self.getresponse()
I finally got around to testing this properly and I can reproduce the issue. I looked into the code a bit and it is possible that we create keepalive connections before forking and we are illegally multiplexing same connection.
The quick fix on our side is to not use workers on upload action, and it fixes the issue right away. Proper fix would be to fix the https handler, but it doesn't look like an easy one. I don't think I'll get to that any time soon.
I'll try to submit a patch to only use workers for download to mitigate the issue.
Thanks @wlis . That makes sense, and aligns with what I saw (a couple of downloads worked with workers, though I still wasn't sure if that was safe or I was getting lucky). If you don't get the chance to follow up, I've got a patch that puts lfs workers behind an experimental knob. But there's no reason to disable for downloads if you're thinking that isn't buggy.
this line is at fault