Page MenuHomePhabricator

lfs: using workers in lfs prefetch
ClosedPublic

Authored by wlis on Nov 30 2017, 8:16 PM.

Details

Summary

This significantly speeds up lfs prefetch. With fast network we are
seeing ~50% improvement of overall prefetch times
Because of worker's API in posix we do lose finegrained progress update and only
see progress when a file finished downloading.

Test Plan

Run tests:
./run-tests.py -l test-lfs*
....

Ran 4 tests, 0 skipped, 0 failed.

Run commands resulting in lfs prefetch e.g. hg sparse --enable-profile

Diff Detail

Repository
rHG Mercurial
Lint
Lint Skipped
Unit
Unit Tests Skipped

Event Timeline

wlis created this revision.Nov 30 2017, 8:16 PM
quark edited the test plan for this revision. (Show Details)Nov 30 2017, 8:18 PM
wlis edited the test plan for this revision. (Show Details)Nov 30 2017, 8:18 PM
quark accepted this revision.Nov 30 2017, 8:20 PM
quark added a subscriber: quark.

If this lands, we can then remove hgext3rd/lfs from our repo.

wlis edited the test plan for this revision. (Show Details)Nov 30 2017, 8:24 PM
quark added a comment.Nov 30 2017, 9:19 PM

Actually, test-lfs-test-server.t is probably the only way to trigger this code path. So you might want to install that binary and re-run the test.

quark requested changes to this revision.Nov 30 2017, 9:20 PM
This revision now requires changes to proceed.Nov 30 2017, 9:20 PM
In D1568#26548, @quark wrote:

If this lands, we can then remove hgext3rd/lfs from our repo.

There's a test that I copied into core from that repo that references 'https://dewey-lfs.vip.facebook.com/lfs', which isn't accessible from the outside. I wasn't able to delete an lfs file and get the same error via lfs-test-server. So I'm not sure if I should drop it or what. Just an FYI if you're going to rely on core tests after dropping the extension.

wlis requested review of this revision.EditedDec 1 2017, 5:25 PM

Tested with the server:

[wlis@dev9680.prn1 ~/hg-committed/tests] ./run-tests.py -l test-lfs*
....
# Ran 4 tests, 0 skipped, 0 failed.
quark edited the test plan for this revision. (Show Details)Dec 1 2017, 9:16 PM
quark accepted this revision.

I have changed the test plan to not include the "skipped" part.

mharbison72 requested changes to this revision.Dec 4 2017, 10:48 PM

Windows doesn't like this for some reason.

+  Traceback (most recent call last):
+    File "c:/Users/Matt/projects/hg/hg", line 41, in <module>
+      dispatch.run()
+    File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 88, in run
+      status = (dispatch(req) or 0) & 255
+    File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 177, in dispatch
+      ret = _runcatch(req)
+    File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 317, in _runcatch
+      return _callcatch(ui, _runcatchfunc)
+    File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 325, in _callcatch
+      return scmutil.callcatch(ui, func)
+    File "c:\Users\Matt\projects\hg\mercurial\scmutil.py", line 154, in callcatch
+      return func()
+    File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 307, in _runcatchfunc
+      return _dispatch(req)
+    File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 911, in _dispatch
+      cmdpats, cmdoptions)
+    File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 666, in runcommand
+      ret = _runcommand(ui, options, cmd, d)
+    File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 919, in _runcommand
+      return cmdfunc()
+    File "c:\Users\Matt\projects\hg\mercurial\dispatch.py", line 908, in <lambda>
+      d = lambda: util.checksignature(func)(ui, *args, **strcmdopt)
+    File "c:\Users\Matt\projects\hg\mercurial\util.py", line 1188, in check
+      return func(*args, **kwargs)
+    File "c:\Users\Matt\projects\hg\mercurial\commands.py", line 4163, in push
+      opargs=opargs)
+    File "c:\Users\Matt\projects\hg\mercurial\exchange.py", line 473, in push
+      _pushbundle2(pushop)
+    File "c:\Users\Matt\projects\hg\mercurial\exchange.py", line 963, in _pushbundle2
+      ret = partgen(pushop, bundler)
+    File "c:\Users\Matt\projects\hg\mercurial\exchange.py", line 775, in _pushb2ctx
+      pushop.repo.prepushoutgoinghooks(pushop)
+    File "c:\Users\Matt\projects\hg\mercurial\util.py", line 3125, in __call__
+      results.append(hook(*args))
+    File "c:\Users\Matt\projects\hg\hgext\lfs\wrapper.py", line 261, in prepush
+      return uploadblobsfromrevs(pushop.repo, pushop.outgoing.missing)
+    File "c:\Users\Matt\projects\hg\hgext\lfs\wrapper.py", line 252, in uploadblobsfromrevs
+      uploadblobs(repo, pointers)
+    File "c:\Users\Matt\projects\hg\hgext\lfs\wrapper.py", line 304, in uploadblobs
+      remoteblob.writebatch(pointers, repo.svfs.lfslocalblobstore)
+    File "c:\Users\Matt\projects\hg\hgext\lfs\blobstore.py", line 100, in writebatch
+      self._batch(pointers, fromstore, 'upload')
+    File "c:\Users\Matt\projects\hg\hgext\lfs\blobstore.py", line 261, in _batch
+      for _one, oid in oids:
+    File "c:\Users\Matt\projects\hg\hgext\lfs\blobstore.py", line 245, in transfer
+      self._basictransfer(obj, action, localstore)
+    File "c:\Users\Matt\projects\hg\hgext\lfs\blobstore.py", line 202, in _basictransfer
+      req = self.urlopener.open(request)
+    File "c:\Python27\lib\urllib2.py", line 427, in open
+      req = meth(req)
+    File "c:\Python27\lib\urllib2.py", line 1136, in do_request_
+      'Content-length', '%d' % len(data))
+  TypeError: object of type 'file' has no len()
+  [1]
This revision now requires changes to proceed.Dec 4 2017, 10:48 PM
wlis planned changes to this revision.Dec 11 2017, 7:20 PM

@mharbison72 you are right, the upload doesn't work because I removed the fliewithprogress wrapper around the file that adds couple functions that I didn't realize. That includes len.
Will fix very soon.

hgext/lfs/blobstore.py
193 ↗(On Diff #4025)

this line is at fault

wlis added a comment.EditedDec 11 2017, 7:37 PM

I must have messed up something when running tests previously- probably wrong revision. The tests actually catch the failure above:

[~/hg-committed/tests] ./run-tests.py -l test-lfs-test-server.t

--- /home/wlis/hg-committed/tests/test-lfs-test-server.t
+++ /home/wlis/hg-committed/tests/test-lfs-test-server.t.err
@@ -43,54 +43,98 @@
   pushing to ../repo2
   searching for changes
   lfs: uploading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes)
-  1 changesets found
-  uncompressed size of bundle content:
-       * (changelog) (glob)
-       * (manifests) (glob)
-       *  a (glob)
-  adding changesets
-  adding manifests
-  adding file changes
-  added 1 changesets with 1 changes to 1 files
+  lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 5)
+  lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 4)
+  lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 3)
+  lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 2)
+  lfs: failed: TypeError("object of type 'file' has no len()",) (remaining retry 1)
+  ** unknown exception encountered, please report by visiting
+  ** https://mercurial-scm.org/wiki/BugTracker
+  ** Python 2.7.5 (default, Aug  4 2017, 00:39:18) [GCC 4.8.5 20150623 (Red Hat 4.8.5-16)]
+  ** Mercurial Distributed SCM (version 4.4.1+203-4da86512789c)
+  ** Extensions loaded: lfs
+  Traceback (most recent call last):
+    File "/home/wlis/hg-committed/hg", line 41, in <module>
+      dispatch.run()
+    File "/home/wlis/hg-committed/mercurial/dispatch.py", line 88, in run
+      status = (dispatch(req) or 0) & 255
+    File "/home/wlis/hg-committed/mercurial/dispatch.py", line 177, in dispatch
+      ret = _runcatch(req)
+    File "/home/wlis/hg-committed/mercurial/dispatch.py", line 318, in _runcatch
+      return _callcatch(ui, _runcatchfunc)
+    File "/home/wlis/hg-committed/mercurial/dispatch.py", line 326, in _callcatch
+      return scmutil.callcatch(ui, func)
+    File "/home/wlis/hg-committed/mercurial/scmutil.py", line 154, in callcatch
+      return func()
+    File "/home/wlis/hg-committed/mercurial/dispatch.py", line 308, in _runcatchfunc
+      return _dispatch(req)
+    File "/home/wlis/hg-committed/mercurial/dispatch.py", line 912, in _dispatch
+      cmdpats, cmdoptions)
+    File "/home/wlis/hg-committed/mercurial/dispatch.py", line 667, in runcommand
+      ret = _runcommand(ui, options, cmd, d)
+    File "/home/wlis/hg-committed/mercurial/dispatch.py", line 920, in _runcommand
+      return cmdfunc()
+    File "/home/wlis/hg-committed/mercurial/dispatch.py", line 909, in <lambda>
+      d = lambda: util.checksignature(func)(ui, *args, **strcmdopt)
+    File "/home/wlis/hg-committed/mercurial/util.py", line 1188, in check
+      return func(*args, **kwargs)
+    File "/home/wlis/hg-committed/mercurial/commands.py", line 4160, in push
+      opargs=opargs)
+    File "/home/wlis/hg-committed/mercurial/exchange.py", line 475, in push
+      _pushbundle2(pushop)
+    File "/home/wlis/hg-committed/mercurial/exchange.py", line 1023, in _pushbundle2
+      ret = partgen(pushop, bundler)
+    File "/home/wlis/hg-committed/mercurial/exchange.py", line 797, in _pushb2ctx
+      pushop.repo.prepushoutgoinghooks(pushop)
+    File "/home/wlis/hg-committed/mercurial/util.py", line 3125, in __call__
+      results.append(hook(*args))
+    File "/home/wlis/hg-committed/hgext/lfs/wrapper.py", line 263, in prepush
+      return uploadblobsfromrevs(pushop.repo, pushop.outgoing.missing)
+    File "/home/wlis/hg-committed/hgext/lfs/wrapper.py", line 254, in uploadblobsfromrevs
+      uploadblobs(repo, pointers)
+    File "/home/wlis/hg-committed/hgext/lfs/wrapper.py", line 306, in uploadblobs
+      remoteblob.writebatch(pointers, repo.svfs.lfslocalblobstore)
+    File "/home/wlis/hg-committed/hgext/lfs/blobstore.py", line 133, in writebatch
+      self._batch(pointers, fromstore, 'upload')
+    File "/home/wlis/hg-committed/hgext/lfs/blobstore.py", line 294, in _batch
+      for _one, oid in oids:
+    File "/home/wlis/hg-committed/hgext/lfs/blobstore.py", line 278, in transfer
+      self._basictransfer(obj, action, localstore)
+    File "/home/wlis/hg-committed/hgext/lfs/blobstore.py", line 235, in _basictransfer
+      req = self.urlopener.open(request)
+    File "/usr/lib64/python2.7/urllib2.py", line 429, in open
+      req = meth(req)
+    File "/usr/lib64/python2.7/urllib2.py", line 1152, in do_request_
+      'Content-length', '%d' % len(data))
+  TypeError: object of type 'file' has no len()
+  [1]

 Clear the cache to force a download
   $ rm -rf `hg config lfs.usercache`
   $ cd ../repo2
   $ hg update tip -v
-  resolving manifests
-  getting a
-  lfs: downloading 31cf46fbc4ecd458a0943c5b4881f1f5a6dd36c53d6167d5b69ac45149b38e5b (12 bytes)
-  1 files updated, 0 files merged, 0 files removed, 0 files unresolved
+  0 files updated, 0 files merged, 0 files removed, 0 files unresolved

 When the server has some blobs already

   $ hg mv a b
+  a: $ENOENT$
+  abort: no files to copy
+  [255]
   $ echo ANOTHER-LARGE-FILE > c
   $ echo ANOTHER-LARGE-FILE2 > d
   $ hg commit -m b-and-c -A b c d
+  b: $ENOENT$
+  abort: failed to mark all new/missing files as added/removed
+  [255]
   $ hg push ../repo1 -v | grep -v '^  '
   pushing to ../repo1
-  searching for changes
-  lfs: need to transfer 2 objects (39 bytes)
-  lfs: uploading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes)
-  lfs: uploading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes)
-  1 changesets found
-  uncompressed size of bundle content:
-  adding changesets
-  adding manifests
-  adding file changes
-  added 1 changesets with 3 changes to 3 files
+  no changes found

 Clear the cache to force a download
   $ rm -rf `hg config lfs.usercache`
   $ hg --repo ../repo1 update tip -v
-  resolving manifests
-  getting b
-  getting c
-  lfs: downloading d11e1a642b60813aee592094109b406089b8dff4cb157157f753418ec7857998 (19 bytes)
-  getting d
-  lfs: downloading 37a65ab78d5ecda767e8622c248b5dbff1e68b1678ab0e730d5eb8601ec8ad19 (20 bytes)
-  3 files updated, 0 files merged, 0 files removed, 0 files unresolved
+  0 files updated, 0 files merged, 0 files removed, 0 files unresolved

 Check error message when the remote missed a blob:


ERROR: test-lfs-test-server.t output changed
!
Failed test-lfs-test-server.t: output changed
# Ran 1 tests, 0 skipped, 1 failed.
python hash seed: 1273209229
[~/hg-committed/tests]
wlis edited the test plan for this revision. (Show Details)Dec 11 2017, 7:42 PM
wlis updated this revision to Diff 4374.
wlis marked an inline comment as done.Dec 11 2017, 7:43 PM

Updated the test (as my changes change the output) and retested. Now everything works fine.

wlis edited the test plan for this revision. (Show Details)Dec 11 2017, 8:02 PM

Works for me on Windows, thanks.

Are we going to have stability problems with these tests printing out the oid on completion? Maybe just set worker.numcpus = 1 for these tests?

I know nothing about the workers, so out of curiosity, why does the API on POSIX mean losing fine-grained progress?

wlis added a comment.Dec 11 2017, 9:55 PM

@mharbison72 I am not sure if these tests are able to satisfy conditions to actually multithread. But you are right it there is an issue we can force 1 worker.
The workers on posix are implemented by forking and the only way of communication is through pipes created by worker.py code. Once forked they only communicate every some # of tasks (file fetches in this case) has been finished by the worker (I think # ~ 100 but not sure). We would have to change POSIX behaviour to allow reporting smaller pieces of progress through pipe (potentially 0 tasks finished). This would need changes in bunch of layers (worker, merge, blobstore) instead the current simple use of progress(...) function.
It is possible to implement that communication, but it is significant amount of work and testing.

mharbison72 accepted this revision.Dec 11 2017, 10:50 PM

Ok, good to know. I’m fine with just watching it for instability.

This revision was automatically updated to reflect the committed changes.

I'm seeing some corruption that appears to be related to workers, so maybe we should default the lfs workers to 'disabled'?

The setup is the latest default + some convert extension hacking pushing between local repos on CentOS 7.4. The lfs host is gitbucket on another server running CentOS 7.4. There are 36 blobs to transfer, totaling 348MB. It prints out a bunch of "uploading" and "processed" messages, but then grinds to a halt, and eventually prints a "500: Internal Server Error" after a couple minutes. I added more logging, including printing out the httperror data, which ends up being a socket timeout exception. The server side is only getting part of some files. Typically it is one or two of the first files to be sent that end up being truncated. Since they are ~2MB C files, I can diff and see that it is the beginning of the file that is missing (different amounts each time). I also added code to filewithprogress.read() to tally up the length of the data read, and it is the expected size every time fp.read() returns None.

Setting worker.enabled = False seems to avoid the problem, and it can be reproduced on demand by toggling it back on. When trying to get a wireshark trace, I also noticed that changing lfs.url to http instead of https also seems to avoid the problem. (The response URLs for where to send the blobs were still all https, so I couldn't watch those. A coworker changed some config to try to make them http, and now a 400 is being returned, so I'm going to have him put it back.)

So this may not be a Mercurial problem. But I thought there were initial concerns about using workers safely, so I thought I'd report back and see if there were any ideas.

wlis added a comment.Jan 4 2018, 3:13 AM

@mharbison72 Thank you for commenting with this issue. We didn't roll this to many people yet and didn't see the issue. I will try to test the scenario with upload of many large files and I'll comment back here soon.

It looks like keepalive.safesend() isn't sending everything. I can send this to the mailing list if it gets mangled, but I figured I'd try to keep this thread together if possible.

# HG changeset patch
# User Matt Harbison <matt_harbison@yahoo.com>
# Date 1515094298 18000
#      Thu Jan 04 14:31:38 2018 -0500
# Branch bzr_convert
# Node ID b0abd77f295edbf1df58674bc1ef7a6bc57e4096
# Parent  0653b44c70685f9ed6f5cf1689ca08f7bd2dbe34
xxx-lfs: attempt to isolate upload corruption with workers

(Please don't queue this- it's only to help Wojciech isolate this.)

It doesn't look like all of the data is being sent over the socket, although it
appears that all of the data is being read from the file.  Here's a sample
transcript of an upload, with the subsequent wall of html removed.  In this
case, all 4 files failed checksum verification on the server side.  The size in
the "Sent %d bytes of data" line corresponds to what the server ends up with,
and the other sizes match what the client starts with.

I have no idea why this seems to work when 'lfs.url' is http instead of https.

$ hg push
...
pushing to lighttpd
searching for changes
12f746f3f2493b2f39da7ecf63d7ee19c6ac9ec6a4fcd8c229da8a522cb12765: size 5278176
932b4ee4def2b434f85435d9e3e19ca8ba99ce9a065a61524b429a9d5e9b2e9c: size 5258384
ccdf7e788769838f8285b3ee672ed573358202305ee361cfec7a4a4fb005bbc7: size 2062258
dbc0ae9f8b05a7f84770ea303db5c1601500295548b3253e51f8889fcb38cc0a: size 5103733
Broken pipe!
Broken pipe!
Broken pipe!
No more data on ccdf...bbc7.  Read 2062258, expected 2062258
Sent 939954 bytes of data
No more data on dbc0...cc0a.  Read 5103733, expected 5103733
Sent 4931701 bytes of data
No more data on 932b...2e9c.  Read 5258384, expected 5258384
Sent 5176464 bytes of data
No more data on 12f7...2765.  Read 5278176, expected 5278176
Sent 5138912 bytes of data
<html with java.net.SocketTimeoutException stacktrace removed>

diff --git a/hgext/lfs/blobstore.py b/hgext/lfs/blobstore.py
--- a/hgext/lfs/blobstore.py
+++ b/hgext/lfs/blobstore.py
@@ -64,12 +64,14 @@ class filewithprogress(object):
     Useful to provide progress information for how many bytes are read.
     """
 
-    def __init__(self, fp, callback):
+    def __init__(self, fp, callback, name):
         self._fp = fp
         self._callback = callback # func(readsize)
         fp.seek(0, os.SEEK_END)
         self._len = fp.tell()
+        self.total = 0
         fp.seek(0)
+        self.name = name
 
     def __len__(self):
         return self._len
@@ -79,9 +81,12 @@ class filewithprogress(object):
             return b''
         data = self._fp.read(size)
         if data:
+            self.total += len(data)
             if self._callback:
                 self._callback(len(data))
         else:
+            print('No more data on %s.  Read %d, expected %d' %
+                  (self.name, self.total, self._len))
             self._fp.close()
             self._fp = None
         return data
@@ -100,6 +105,12 @@ class local(object):
         self.cachevfs = lfsvfs(usercache)
         self.ui = repo.ui
 
+    def open(self, oid):
+        """Open a file descriptor for the named blob."""
+        if self.cachevfs.exists(oid):
+            return self.cachevfs(oid)
+        return self.vfs(oid)
+
     def write(self, oid, data, verify=True):
         """Write blob to local blobstore."""
         if verify:
@@ -254,9 +265,9 @@ class _gitlfsremote(object):
         request = util.urlreq.request(href)
         if action == 'upload':
             # If uploading blobs, read data from local blobstore.
-            with localstore.vfs(oid) as fp:
+            with localstore.open(oid) as fp:
                 _verifyfile(oid, fp)
-            request.data = filewithprogress(localstore.vfs(oid), None)
+            request.data = filewithprogress(localstore.open(oid), None, oid)
             request.get_method = lambda: 'PUT'
 
         for k, v in headers:
@@ -271,6 +282,8 @@ class _gitlfsremote(object):
                     break
                 response += data
         except util.urlerr.httperror as ex:
+            print('%s: %s' % (oid, ex.read()))
+            print('%s: %s' % (oid, ex.info()))
             raise LfsRemoteError(_('HTTP error: %s (oid=%s, action=%s)')
                                  % (ex, oid, action))
 
@@ -288,6 +301,7 @@ class _gitlfsremote(object):
         sizes = {}
         for obj in objects:
             sizes[obj.get('oid')] = obj.get('size', 0)
+            print('%s: size %d' % (obj.get('oid'), obj.get('size', 0)))
         topic = {'upload': _('lfs uploading'),
                  'download': _('lfs downloading')}[action]
         if len(objects) > 1:
diff --git a/mercurial/keepalive.py b/mercurial/keepalive.py
--- a/mercurial/keepalive.py
+++ b/mercurial/keepalive.py
@@ -553,14 +553,18 @@ def safesend(self, str):
             if self.debuglevel > 0:
                 print("sending a read()able")
             data = read(blocksize)
+            total = 0
             while data:
+                total += len(data)
                 self.sock.sendall(data)
                 data = read(blocksize)
+            print('Sent %d bytes of data' % total)
         else:
             self.sock.sendall(str)
     except socket.error as v:
         reraise = True
         if v[0] == errno.EPIPE:      # Broken pipe
+            print('Broken pipe!')
             if self._HTTPConnection__state == httplib._CS_REQ_SENT:
                 self._broken_pipe_resp = None
                 self._broken_pipe_resp = self.getresponse()
wlis added a comment.Jan 16 2018, 7:35 PM

I finally got around to testing this properly and I can reproduce the issue. I looked into the code a bit and it is possible that we create keepalive connections before forking and we are illegally multiplexing same connection.
The quick fix on our side is to not use workers on upload action, and it fixes the issue right away. Proper fix would be to fix the https handler, but it doesn't look like an easy one. I don't think I'll get to that any time soon.

I'll try to submit a patch to only use workers for download to mitigate the issue.

Thanks @wlis . That makes sense, and aligns with what I saw (a couple of downloads worked with workers, though I still wasn't sure if that was safe or I was getting lucky). If you don't get the chance to follow up, I've got a patch that puts lfs workers behind an experimental knob. But there's no reason to disable for downloads if you're thinking that isn't buggy.