Details
Details
- Reviewers
 - None
 - Group Reviewers
 hg-reviewers - Commits
 - rHGed4ebbb98ca0: wireprotov2: raise exception in objects() if future has been resolved
 
Diff Detail
Diff Detail
- Repository
 - rHG Mercurial
 - Lint
 Lint Skipped - Unit
 Unit Tests Skipped 
| hg-reviewers | 
| Lint Skipped | 
| Unit Tests Skipped | 
| Path | Packages | |||
|---|---|---|---|---|
| M | mercurial/wireprotov2peer.py (25 lines) | 
| Commit | Parents | Author | Summary | Date | 
|---|---|---|---|---|
| Gregory Szorc | Oct 8 2018, 6:19 PM | 
| # sending us new data and another consuming it. | # sending us new data and another consuming it. | ||||
| self._lock = threading.RLock() | self._lock = threading.RLock() | ||||
| # An event is set when state of the object changes. This event | # An event is set when state of the object changes. This event | ||||
| # is waited on by the generator emitting objects. | # is waited on by the generator emitting objects. | ||||
| self._serviceable = threading.Event() | self._serviceable = threading.Event() | ||||
| self._pendingevents = [] | self._pendingevents = [] | ||||
| self._pendingerror = None | |||||
| self._decoder = cborutil.bufferingdecoder() | self._decoder = cborutil.bufferingdecoder() | ||||
| self._seeninitial = False | self._seeninitial = False | ||||
| self._redirect = None | self._redirect = None | ||||
| def _oninputcomplete(self): | def _oninputcomplete(self): | ||||
| with self._lock: | with self._lock: | ||||
| self._inputcomplete = True | self._inputcomplete = True | ||||
| self._serviceable.set() | self._serviceable.set() | ||||
| raise error.Abort(_('received unexpected response data ' | raise error.Abort(_('received unexpected response data ' | ||||
| 'after content redirect; the remote is ' | 'after content redirect; the remote is ' | ||||
| 'buggy')) | 'buggy')) | ||||
| self._pendingevents.append(o) | self._pendingevents.append(o) | ||||
| self._serviceable.set() | self._serviceable.set() | ||||
| def _onerror(self, e): | |||||
| self._pendingerror = e | |||||
| with self._lock: | |||||
| self._serviceable.set() | |||||
| def _handleinitial(self, o): | def _handleinitial(self, o): | ||||
| self._seeninitial = True | self._seeninitial = True | ||||
| if o[b'status'] == b'ok': | if o[b'status'] == b'ok': | ||||
| return | return | ||||
| elif o[b'status'] == b'redirect': | elif o[b'status'] == b'redirect': | ||||
| l = o[b'location'] | l = o[b'location'] | ||||
| self._redirect = wireprototypes.alternatelocationresponse( | self._redirect = wireprototypes.alternatelocationresponse( | ||||
| """ | """ | ||||
| while True: | while True: | ||||
| # TODO this can infinite loop if self._inputcomplete is never | # TODO this can infinite loop if self._inputcomplete is never | ||||
| # set. We likely want to tie the lifetime of this object/state | # set. We likely want to tie the lifetime of this object/state | ||||
| # to that of the background thread receiving frames and updating | # to that of the background thread receiving frames and updating | ||||
| # our state. | # our state. | ||||
| self._serviceable.wait(1.0) | self._serviceable.wait(1.0) | ||||
| if self._pendingerror: | |||||
| raise self._pendingerror | |||||
| with self._lock: | with self._lock: | ||||
| self._serviceable.clear() | self._serviceable.clear() | ||||
| # Make copies because objects could be mutated during | # Make copies because objects could be mutated during | ||||
| # iteration. | # iteration. | ||||
| stop = self._inputcomplete | stop = self._inputcomplete | ||||
| pending = list(self._pendingevents) | pending = list(self._pendingevents) | ||||
| self._pendingevents[:] = [] | self._pendingevents[:] = [] | ||||
| response = self._responses[frame.requestid] | response = self._responses[frame.requestid] | ||||
| if action == 'responsedata': | if action == 'responsedata': | ||||
| # Any failures processing this frame should bubble up to the | # Any failures processing this frame should bubble up to the | ||||
| # future tracking the request. | # future tracking the request. | ||||
| try: | try: | ||||
| self._processresponsedata(frame, meta, response) | self._processresponsedata(frame, meta, response) | ||||
| except BaseException as e: | except BaseException as e: | ||||
| # If an exception occurs before the future is resolved, | |||||
| # fail the future. Otherwise, we stuff the exception on | |||||
| # the response object so it can be raised during objects() | |||||
| # iteration. If nothing is consuming objects(), we could | |||||
| # silently swallow this exception. That's a risk we'll have to | |||||
| # take. | |||||
| if frame.requestid in self._futures: | |||||
| self._futures[frame.requestid].set_exception(e) | self._futures[frame.requestid].set_exception(e) | ||||
| del self._futures[frame.requestid] | del self._futures[frame.requestid] | ||||
| response._oninputcomplete() | response._oninputcomplete() | ||||
| else: | else: | ||||
| response._onerror(e) | |||||
| else: | |||||
| raise error.ProgrammingError( | raise error.ProgrammingError( | ||||
| 'unhandled action from clientreactor: %s' % action) | 'unhandled action from clientreactor: %s' % action) | ||||
| def _processresponsedata(self, frame, meta, response): | def _processresponsedata(self, frame, meta, response): | ||||
| # This can raise. The caller can handle it. | # This can raise. The caller can handle it. | ||||
| response._onresponsedata(meta['data']) | response._onresponsedata(meta['data']) | ||||
| # If we got a content redirect response, we want to fetch it and | # If we got a content redirect response, we want to fetch it and | ||||