Package flumotion :: Package twisted :: Module pb
[hide private]

Source Code for Module flumotion.twisted.pb

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_pb -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  """ 
 23  Flumotion Perspective Broker using keycards 
 24   
 25  Inspired by L{twisted.spread.pb} 
 26  """ 
 27   
 28  from twisted.cred import checkers, credentials 
 29  from twisted.cred.portal import IRealm, Portal 
 30  from twisted.internet import protocol, defer 
 31  from twisted.internet import error as terror 
 32  from twisted.python import log, reflect, failure 
 33  from twisted.spread import pb, flavors 
 34  from twisted.spread.pb import PBClientFactory 
 35  from zope.interface import implements 
 36   
 37  from flumotion.configure import configure 
 38  from flumotion.common import keycards, interfaces, common, errors 
 39  from flumotion.common import log as flog 
 40  from flumotion.common.netutils import addressGetHost 
 41  from flumotion.twisted import reflect as freflect 
 42  from flumotion.twisted import credentials as fcredentials 
 43  from flumotion.twisted.compat import reactor 
 44   
 45  __version__ = "$Rev$" 
 46   
 47   
 48  # TODO: 
 49  #   merge FMCF back into twisted 
 50   
 51  ### Keycard-based FPB objects 
 52   
 53  # we made three changes to the standard PBClientFactory: 
 54  # 1) the root object has a getKeycardClasses() call that the server 
 55  #    uses to tell clients about the interfaces it supports 
 56  # 2) you can request a specific interface for the avatar to 
 57  #    implement, instead of only IPerspective 
 58  # 3) you send in a keycard, on which you can set a preference for an avatarId 
 59  # this way you can request a different avatarId than the user you authenticate 
 60  # with, or you can login without a username 
 61   
 62   
63 -class FPBClientFactory(pb.PBClientFactory, flog.Loggable):
64 """ 65 I am an extended Perspective Broker client factory using generic 66 keycards for login. 67 68 69 @ivar keycard: the keycard used last for logging in; set after 70 self.login has completed 71 @type keycard: L{keycards.Keycard} 72 @ivar medium: the client-side referenceable for the PB server 73 to call on, and for the client to call to the 74 PB server 75 @type medium: L{flumotion.common.medium.BaseMedium} 76 @ivar perspectiveInterface: the interface we want to request a perspective 77 for 78 @type perspectiveInterface: subclass of 79 L{flumotion.common.interfaces.IMedium} 80 """ 81 logCategory = "FPBClientFactory" 82 keycard = None 83 medium = None 84 perspectiveInterface = None # override in subclass 85 _fpbconnector = None 86 87 ## from protocol.ClientFactory 88
89 - def startedConnecting(self, connector):
90 self._fpbconnector = connector 91 return pb.PBClientFactory.startedConnecting(self, connector)
92 93 ## from twisted.spread.pb.ClientFactory 94
95 - def disconnect(self):
96 if self._fpbconnector: 97 try: 98 self._fpbconnector.stopConnecting() 99 except terror.NotConnectingError: 100 pass 101 return pb.PBClientFactory.disconnect(self)
102
103 - def getKeycardClasses(self):
104 """ 105 Ask the remote PB server for all the keycard interfaces it supports. 106 107 @rtype: L{twisted.internet.defer.Deferred} returning list of str 108 """ 109 110 def getRootObjectCb(root): 111 return root.callRemote('getKeycardClasses')
112 113 d = self.getRootObject() 114 d.addCallback(getRootObjectCb) 115 return d
116
117 - def login(self, authenticator):
118 """ 119 Login, respond to challenges, and eventually get perspective 120 from remote PB server. 121 122 Currently only credentials implementing IUsernamePassword are 123 supported. 124 125 @return: Deferred of RemoteReference to the perspective. 126 """ 127 assert authenticator, "I really do need an authenticator" 128 assert not isinstance(authenticator, keycards.Keycard) 129 interfaces = [] 130 if self.perspectiveInterface: 131 self.debug('perspectiveInterface is %r' % 132 self.perspectiveInterface) 133 interfaces.append(self.perspectiveInterface) 134 else: 135 self.warning('No perspectiveInterface set on %r' % self) 136 if not pb.IPerspective in interfaces: 137 interfaces.append(pb.IPerspective) 138 interfaces = [reflect.qual(interface) 139 for interface in interfaces] 140 141 def getKeycardClassesCb(keycardClasses): 142 self.log('supported keycard classes: %r' % keycardClasses) 143 d = authenticator.issue(keycardClasses) 144 return d
145 146 def issueCb(keycard): 147 self.keycard = keycard 148 self.debug('using keycard: %r' % self.keycard) 149 return self.keycard 150 151 d = self.getKeycardClasses() 152 d.addCallback(getKeycardClassesCb) 153 d.addCallback(issueCb) 154 d.addCallback(lambda r: self.getRootObject()) 155 d.addCallback(self._cbSendKeycard, authenticator, self.medium, 156 interfaces) 157 return d 158 159 # we are a different kind of PB client, so warn 160
161 - def _cbSendUsername(self, root, username, password, 162 avatarId, client, interfaces):
163 self.warning("you really want to use cbSendKeycard")
164
165 - def _cbSendKeycard(self, root, authenticator, client, interfaces, count=0):
166 self.log("_cbSendKeycard(root=%r, authenticator=%r, client=%r, " 167 "interfaces=%r, count=%d", root, authenticator, client, 168 interfaces, count) 169 count = count + 1 170 d = root.callRemote("login", self.keycard, client, *interfaces) 171 return d.addCallback(self._cbLoginCallback, root, 172 authenticator, client, interfaces, count)
173 174 # we can get either a keycard, None (?) or a remote reference 175
176 - def _cbLoginCallback(self, result, root, authenticator, client, interfaces, 177 count):
178 if count > 5: 179 # too many recursions, server is h0rked 180 self.warning('Too many recursions, internal error.') 181 self.log("FPBClientFactory(): result %r" % result) 182 183 if isinstance(result, pb.RemoteReference): 184 # everything done, return reference 185 self.debug('login successful, returning %r', result) 186 return result 187 188 # must be a keycard 189 keycard = result 190 if not keycard.state == keycards.AUTHENTICATED: 191 self.log("FPBClientFactory(): requester needs to resend %r", 192 keycard) 193 d = authenticator.respond(keycard) 194 195 def _loginAgainCb(keycard): 196 d = root.callRemote("login", keycard, client, *interfaces) 197 return d.addCallback(self._cbLoginCallback, root, 198 authenticator, client, 199 interfaces, count)
200 d.addCallback(_loginAgainCb) 201 return d 202 203 self.debug("FPBClientFactory(): authenticated %r" % keycard) 204 return keycard 205 206
207 -class ReconnectingPBClientFactory(pb.PBClientFactory, flog.Loggable, 208 protocol.ReconnectingClientFactory):
209 """ 210 Reconnecting client factory for normal PB brokers. 211 212 Users of this factory call startLogin to start logging in, and should 213 override getLoginDeferred to get the deferred returned from the PB server 214 for each login attempt. 215 """ 216
217 - def __init__(self):
218 pb.PBClientFactory.__init__(self) 219 self._doingLogin = False
220
221 - def clientConnectionFailed(self, connector, reason):
222 log.msg("connection failed to %s, reason %r" % ( 223 connector.getDestination(), reason)) 224 pb.PBClientFactory.clientConnectionFailed(self, connector, reason) 225 RCF = protocol.ReconnectingClientFactory 226 RCF.clientConnectionFailed(self, connector, reason)
227
228 - def clientConnectionLost(self, connector, reason):
229 log.msg("connection lost to %s, reason %r" % ( 230 connector.getDestination(), reason)) 231 pb.PBClientFactory.clientConnectionLost(self, connector, reason, 232 reconnecting=True) 233 RCF = protocol.ReconnectingClientFactory 234 RCF.clientConnectionLost(self, connector, reason)
235
236 - def clientConnectionMade(self, broker):
237 log.msg("connection made") 238 self.resetDelay() 239 pb.PBClientFactory.clientConnectionMade(self, broker) 240 if self._doingLogin: 241 d = self.login(self._credentials, self._client) 242 self.gotDeferredLogin(d)
243
244 - def startLogin(self, credentials, client=None):
245 self._credentials = credentials 246 self._client = client 247 248 self._doingLogin = True
249 250 # methods to override 251
252 - def gotDeferredLogin(self, deferred):
253 """ 254 The deferred from login is now available. 255 """ 256 raise NotImplementedError
257 258
259 -class ReconnectingFPBClientFactory(FPBClientFactory, 260 protocol.ReconnectingClientFactory):
261 """ 262 Reconnecting client factory for FPB brokers (using keycards for login). 263 264 Users of this factory call startLogin to start logging in. 265 Override getLoginDeferred to get a handle to the deferred returned 266 from the PB server. 267 """ 268
269 - def __init__(self):
270 FPBClientFactory.__init__(self) 271 self._doingLogin = False 272 self._doingGetPerspective = False
273
274 - def clientConnectionFailed(self, connector, reason):
275 log.msg("connection failed to %s, reason %r" % ( 276 connector.getDestination(), reason)) 277 FPBClientFactory.clientConnectionFailed(self, connector, reason) 278 RCF = protocol.ReconnectingClientFactory 279 RCF.clientConnectionFailed(self, connector, reason) 280 if self.continueTrying: 281 self.debug("will try reconnect in %f seconds", self.delay) 282 else: 283 self.debug("not trying to reconnect")
284
285 - def clientConnectionLost(self, connector, reason):
286 log.msg("connection lost to %s, reason %r" % ( 287 connector.getDestination(), reason)) 288 FPBClientFactory.clientConnectionLost(self, connector, reason, 289 reconnecting=True) 290 RCF = protocol.ReconnectingClientFactory 291 RCF.clientConnectionLost(self, connector, reason)
292
293 - def clientConnectionMade(self, broker):
294 log.msg("connection made") 295 self.resetDelay() 296 FPBClientFactory.clientConnectionMade(self, broker) 297 if self._doingLogin: 298 d = self.login(self._authenticator) 299 self.gotDeferredLogin(d)
300 301 # TODO: This is a poorly named method; it just provides the appropriate 302 # authentication information, and doesn't actually _start_ login at all. 303
304 - def startLogin(self, authenticator):
305 assert not isinstance(authenticator, keycards.Keycard) 306 self._authenticator = authenticator 307 self._doingLogin = True
308 309 # methods to override 310
311 - def gotDeferredLogin(self, deferred):
312 """ 313 The deferred from login is now available. 314 """ 315 raise NotImplementedError
316 317 ### FIXME: this code is an adaptation of twisted/spread/pb.py 318 # it allows you to login to a FPB server requesting interfaces other than 319 # IPerspective. 320 # in other terms, you can request different "kinds" of avatars from the same 321 # PB server. 322 # this code needs to be sent upstream to Twisted 323 324
325 -class _FPortalRoot:
326 """ 327 Root object, used to login to bouncer. 328 """ 329 330 implements(flavors.IPBRoot) 331
332 - def __init__(self, bouncerPortal):
333 """ 334 @type bouncerPortal: L{flumotion.twisted.portal.BouncerPortal} 335 """ 336 self.bouncerPortal = bouncerPortal
337
338 - def rootObject(self, broker):
339 return _BouncerWrapper(self.bouncerPortal, broker)
340 341
342 -class _BouncerWrapper(pb.Referenceable, flog.Loggable):
343 344 logCategory = "_BouncerWrapper" 345
346 - def __init__(self, bouncerPortal, broker):
347 self.bouncerPortal = bouncerPortal 348 self.broker = broker
349
350 - def remote_getKeycardClasses(self):
351 """ 352 @returns: the fully-qualified class names of supported keycard 353 interfaces 354 @rtype: L{twisted.internet.defer.Deferred} firing list of str 355 """ 356 return self.bouncerPortal.getKeycardClasses()
357
358 - def remote_login(self, keycard, mind, *interfaces):
359 """ 360 Start of keycard login. 361 362 @param interfaces: list of fully qualified names of interface objects 363 364 @returns: one of 365 - a L{flumotion.common.keycards.Keycard} when more steps 366 need to be performed 367 - a L{twisted.spread.pb.AsReferenceable} when authentication 368 has succeeded, which will turn into a 369 L{twisted.spread.pb.RemoteReference} on the client side 370 - a L{flumotion.common.errors.NotAuthenticatedError} when 371 authentication is denied 372 """ 373 374 def loginResponse(result): 375 self.log("loginResponse: result=%r", result) 376 # if the result is a keycard, we're not yet ready 377 if isinstance(result, keycards.Keycard): 378 return result 379 else: 380 # authenticated, so the result is the tuple 381 interface, perspective, logout = result 382 self.broker.notifyOnDisconnect(logout) 383 return pb.AsReferenceable(perspective, "perspective")
384 385 # corresponds with FPBClientFactory._cbSendKeycard 386 self.log("remote_login(keycard=%s, *interfaces=%r" % ( 387 keycard, interfaces)) 388 interfaces = [freflect.namedAny(interface) for interface in interfaces] 389 d = self.bouncerPortal.login(keycard, mind, *interfaces) 390 d.addCallback(loginResponse) 391 return d
392 393
394 -class Authenticator(flog.Loggable, pb.Referenceable):
395 """ 396 I am an object used by FPB clients to create keycards for me 397 and respond to challenges. 398 399 I encapsulate keycard-related data, plus secrets which are used locally 400 and not put on the keycard. 401 402 I can be serialized over PB connections to a RemoteReference and then 403 adapted with RemoteAuthenticator to present the same interface. 404 405 @cvar username: a username to log in with 406 @type username: str 407 @cvar password: a password to log in with 408 @type password: str 409 @cvar address: an address to log in from 410 @type address: str 411 @cvar avatarId: the avatarId we want to request from the PB server 412 @type avatarId: str 413 """ 414 logCategory = "authenticator" 415 416 avatarId = None 417 418 username = None 419 password = None 420 address = None 421 ttl = 30 422 # FIXME: we can add ssh keys and similar here later on 423
424 - def __init__(self, **kwargs):
425 for key in kwargs: 426 setattr(self, key, kwargs[key])
427
428 - def issue(self, keycardClasses):
429 """ 430 Issue a keycard that implements one of the given interfaces. 431 432 @param keycardClasses: list of fully qualified keycard classes 433 @type keycardClasses: list of str 434 435 @rtype: L{twisted.internet.defer.Deferred} firing L{keycards.Keycard} 436 """ 437 # this method returns a deferred so we present the same interface 438 # as the RemoteAuthenticator adapter 439 440 # construct a list of keycard interfaces we can support right now 441 supported = [] 442 # address is allowed to be None 443 if self.username is not None and self.password is not None: 444 # We only want to support challenge-based keycards, for 445 # security. Maybe later we want this to be configurable 446 # supported.append(keycards.KeycardUACPP) 447 supported.append(keycards.KeycardUACPCC) 448 supported.append(keycards.KeycardUASPCC) 449 450 # expand to fully qualified names 451 supported = [reflect.qual(k) for k in supported] 452 453 for i in keycardClasses: 454 if i in supported: 455 self.log('Keycard interface %s supported, looking up', i) 456 name = i.split(".")[-1] 457 methodName = "issue_%s" % name 458 method = getattr(self, methodName) 459 keycard = method() 460 self.debug('Issuing keycard %r of class %s', keycard, 461 name) 462 keycard.avatarId = self.avatarId 463 if self.ttl is not None: 464 keycard.ttl = self.ttl 465 return defer.succeed(keycard) 466 467 self.debug('Could not issue a keycard') 468 return defer.succeed(None)
469 470 # non-challenge types 471
472 - def issue_KeycardUACPP(self):
473 return keycards.KeycardUACPP(self.username, self.password, 474 self.address)
475
476 - def issue_KeycardGeneric(self):
481 # challenge types 482
483 - def issue_KeycardUACPCC(self):
484 return keycards.KeycardUACPCC(self.username, self.address)
485
486 - def issue_KeycardUASPCC(self):
487 return keycards.KeycardUASPCC(self.username, self.address)
488
489 - def respond(self, keycard):
490 """ 491 Respond to a challenge on the given keycard, based on the secrets 492 we have. 493 494 @param keycard: the keycard with the challenge to respond to 495 @type keycard: L{keycards.Keycard} 496 497 @rtype: L{twisted.internet.defer.Deferred} firing 498 a {keycards.Keycard} 499 @returns: a deferred firing the keycard with a response set 500 """ 501 self.debug('responding to challenge on keycard %r' % keycard) 502 methodName = "respond_%s" % keycard.__class__.__name__ 503 method = getattr(self, methodName) 504 return defer.succeed(method(keycard))
505
506 - def respond_KeycardUACPCC(self, keycard):
507 self.log('setting password') 508 keycard.setPassword(self.password) 509 return keycard
510
511 - def respond_KeycardUASPCC(self, keycard):
512 self.log('setting password') 513 keycard.setPassword(self.password) 514 return keycard
515 516 ### pb.Referenceable methods 517
518 - def remote_issue(self, interfaces):
519 return self.issue(interfaces)
520
521 - def remote_respond(self, keycard):
522 return self.respond(keycard)
523 524
525 -class RemoteAuthenticator:
526 """ 527 I am an adapter for a pb.RemoteReference to present the same interface 528 as L{Authenticator} 529 """ 530 531 avatarId = None # not serialized 532 username = None # for convenience, will always be None 533 password = None # for convenience, will always be None 534
535 - def __init__(self, remoteReference):
536 self._remote = remoteReference
537
538 - def copy(self, avatarId=None):
539 ret = RemoteAuthenticator(self._remote) 540 ret.avatarId = avatarId or self.avatarId 541 return ret
542
543 - def issue(self, interfaces):
544 545 def issueCb(keycard): 546 keycard.avatarId = self.avatarId 547 return keycard
548 549 d = self._remote.callRemote('issue', interfaces) 550 d.addCallback(issueCb) 551 return d
552
553 - def respond(self, keycard):
554 return self._remote.callRemote('respond', keycard)
555 556
557 -class Referenceable(pb.Referenceable, flog.Loggable):
558 """ 559 @cvar remoteLogName: name to use to log the other side of the connection 560 @type remoteLogName: str 561 """ 562 logCategory = 'referenceable' 563 remoteLogName = 'remote' 564 565 566 # a referenceable that logs receiving remote messages 567
568 - def remoteMessageReceived(self, broker, message, args, kwargs):
569 args = broker.unserialize(args) 570 kwargs = broker.unserialize(kwargs) 571 method = getattr(self, "remote_%s" % message, None) 572 if method is None: 573 raise pb.NoSuchMethod("No such method: remote_%s" % (message, )) 574 575 level = flog.DEBUG 576 if message == 'ping': 577 level = flog.LOG 578 579 debugClass = self.logCategory.upper() 580 # all this malarkey is to avoid actually interpolating variables 581 # if it is not needed 582 startArgs = [self.remoteLogName, debugClass, message] 583 format, debugArgs = flog.getFormatArgs( 584 '%s --> %s: remote_%s(', startArgs, 585 ')', (), args, kwargs) 586 # log going into the method 587 logKwArgs = self.doLog(level, method, format, *debugArgs) 588 589 # invoke the remote_ method 590 d = defer.maybeDeferred(method, *args, **kwargs) 591 592 # log coming out of the method 593 594 def callback(result): 595 format, debugArgs = flog.getFormatArgs( 596 '%s <-- %s: remote_%s(', startArgs, 597 '): %r', (flog.ellipsize(result), ), args, kwargs) 598 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 599 return result
600 601 def errback(failure): 602 format, debugArgs = flog.getFormatArgs( 603 '%s <-- %s: remote_%s(', startArgs, 604 '): failure %r', (failure, ), args, kwargs) 605 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 606 return failure
607 608 d.addCallbacks(callback, errback) 609 return broker.serialize(d, self.perspective) 610 611
612 -class Avatar(pb.Avatar, flog.Loggable):
613 """ 614 @cvar remoteLogName: name to use to log the other side of the connection 615 @type remoteLogName: str 616 """ 617 logCategory = 'avatar' 618 remoteLogName = 'remote' 619
620 - def __init__(self, avatarId):
621 self.avatarId = avatarId 622 self.logName = avatarId 623 self.mind = None 624 self.debug("created new Avatar with id %s", avatarId)
625 626 # a referenceable that logs receiving remote messages 627
628 - def perspectiveMessageReceived(self, broker, message, args, kwargs):
629 args = broker.unserialize(args) 630 kwargs = broker.unserialize(kwargs) 631 return self.perspectiveMessageReceivedUnserialised(broker, message, 632 args, kwargs)
633
634 - def perspectiveMessageReceivedUnserialised(self, broker, message, 635 args, kwargs):
636 method = getattr(self, "perspective_%s" % message, None) 637 if method is None: 638 raise pb.NoSuchMethod("No such method: perspective_%s" % ( 639 message, )) 640 641 level = flog.DEBUG 642 if message == 'ping': 643 level = flog.LOG 644 debugClass = self.logCategory.upper() 645 startArgs = [self.remoteLogName, debugClass, message] 646 format, debugArgs = flog.getFormatArgs( 647 '%s --> %s: perspective_%s(', startArgs, 648 ')', (), args, kwargs) 649 # log going into the method 650 logKwArgs = self.doLog(level, method, format, *debugArgs) 651 652 # invoke the perspective_ method 653 d = defer.maybeDeferred(method, *args, **kwargs) 654 655 # log coming out of the method 656 657 def callback(result): 658 format, debugArgs = flog.getFormatArgs( 659 '%s <-- %s: perspective_%s(', startArgs, 660 '): %r', (flog.ellipsize(result), ), args, kwargs) 661 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 662 return result
663 664 def errback(failure): 665 format, debugArgs = flog.getFormatArgs( 666 '%s <-- %s: perspective_%s(', startArgs, 667 '): failure %r', (failure, ), args, kwargs) 668 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 669 return failure
670 671 d.addCallbacks(callback, errback) 672 673 return broker.serialize(d, self, method, args, kwargs) 674
675 - def setMind(self, mind):
676 """ 677 Tell the avatar that the given mind has been attached. 678 This gives the avatar a way to call remotely to the client that 679 requested this avatar. 680 681 It is best to call setMind() from within the avatar's __init__ 682 method. Some old code still does this via a callLater, however. 683 684 @type mind: L{twisted.spread.pb.RemoteReference} 685 """ 686 self.mind = mind 687 688 def nullMind(x): 689 self.debug('%r: disconnected from %r' % (self, self.mind)) 690 self.mind = None
691 self.mind.notifyOnDisconnect(nullMind) 692 693 transport = self.mind.broker.transport 694 tarzan = transport.getHost() 695 jane = transport.getPeer() 696 if tarzan and jane: 697 self.debug( 698 "PB client connection seen by me is from me %s to %s" % ( 699 addressGetHost(tarzan), 700 addressGetHost(jane))) 701 self.log('Client attached is mind %s', mind) 702
703 - def mindCallRemoteLogging(self, level, stackDepth, name, *args, 704 **kwargs):
705 """ 706 Call the given remote method, and log calling and returning nicely. 707 708 @param level: the level we should log at (log.DEBUG, log.INFO, etc) 709 @type level: int 710 @param stackDepth: the number of stack frames to go back to get 711 file and line information, negative or zero. 712 @type stackDepth: non-positive int 713 @param name: name of the remote method 714 @type name: str 715 """ 716 if level is not None: 717 debugClass = str(self.__class__).split(".")[-1].upper() 718 startArgs = [self.remoteLogName, debugClass, name] 719 format, debugArgs = flog.getFormatArgs( 720 '%s --> %s: callRemote(%s, ', startArgs, 721 ')', (), args, kwargs) 722 logKwArgs = self.doLog(level, stackDepth - 1, format, 723 *debugArgs) 724 725 if not self.mind: 726 self.warning('Tried to mindCallRemote(%s), but we are ' 727 'disconnected', name) 728 return defer.fail(errors.NotConnectedError()) 729 730 def callback(result): 731 format, debugArgs = flog.getFormatArgs( 732 '%s <-- %s: callRemote(%s, ', startArgs, 733 '): %r', (flog.ellipsize(result), ), args, kwargs) 734 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 735 return result
736 737 def errback(failure): 738 format, debugArgs = flog.getFormatArgs( 739 '%s <-- %s: callRemote(%s, ', startArgs, 740 '): %r', (failure, ), args, kwargs) 741 self.doLog(level, -1, format, *debugArgs, **logKwArgs) 742 return failure 743 744 d = self.mind.callRemote(name, *args, **kwargs) 745 if level is not None: 746 d.addCallbacks(callback, errback) 747 return d 748
749 - def mindCallRemote(self, name, *args, **kwargs):
750 """ 751 Call the given remote method, and log calling and returning nicely. 752 753 @param name: name of the remote method 754 @type name: str 755 """ 756 return self.mindCallRemoteLogging(flog.DEBUG, -1, name, *args, 757 **kwargs)
758
759 - def disconnect(self):
760 """ 761 Disconnect the remote PB client. If we are already disconnected, 762 do nothing. 763 """ 764 if self.mind: 765 return self.mind.broker.transport.loseConnection()
766 767
768 -class PingableAvatar(Avatar):
769 _pingCheckInterval = (configure.heartbeatInterval * 770 configure.pingTimeoutMultiplier) 771
772 - def __init__(self, avatarId, clock=reactor):
773 self._clock = clock 774 Avatar.__init__(self, avatarId)
775
776 - def perspectiveMessageReceivedUnserialised(self, broker, message, 777 args, kwargs):
778 self._lastPing = self._clock.seconds() 779 return Avatar.perspectiveMessageReceivedUnserialised( 780 self, broker, message, args, kwargs)
781
782 - def perspective_ping(self):
783 return defer.succeed(True)
784
785 - def mindCallRemoteLogging(self, level, stackDepth, name, *args, 786 **kwargs):
787 d = Avatar.mindCallRemoteLogging(self, level, stackDepth, name, *args, 788 **kwargs) 789 790 def cb(result): 791 self._lastPing = self._clock.seconds() 792 return result
793 d.addCallback(cb) 794 return d
795
796 - def startPingChecking(self, disconnect):
797 self._lastPing = self._clock.seconds() 798 self._pingCheckDisconnect = disconnect 799 self._pingCheck()
800
801 - def _pingCheck(self):
802 self._pingCheckDC = None 803 if self._clock.seconds() - self._lastPing > self._pingCheckInterval: 804 self.info('no ping in %f seconds, closing connection', 805 self._pingCheckInterval) 806 self._pingCheckDisconnect() 807 else: 808 self._pingCheckDC = self._clock.callLater(self._pingCheckInterval, 809 self._pingCheck)
810
811 - def stopPingChecking(self):
812 if self._pingCheckDC: 813 self._pingCheckDC.cancel() 814 self._pingCheckDC = None 815 816 # release the disconnect function, too, to help break any 817 # potential cycles 818 self._pingCheckDisconnect = None
819
820 - def setMind(self, mind):
821 # chain up 822 Avatar.setMind(self, mind) 823 824 def stopPingCheckingCb(x): 825 self.debug('stop pinging') 826 self.stopPingChecking()
827 self.mind.notifyOnDisconnect(stopPingCheckingCb) 828 829 # Now we have a remote reference, so start checking pings. 830 831 def _disconnect(): 832 if self.mind: 833 self.mind.broker.transport.loseConnection() 834 self.startPingChecking(_disconnect) 835