Trees | Indices | Help |
---|
|
1 # -*- Mode: Python -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 # 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 6 # All rights reserved. 7 8 # This file may be distributed and/or modified under the terms of 9 # the GNU General Public License version 2 as published by 10 # the Free Software Foundation. 11 # This file is distributed without any warranty; without even the implied 12 # warranty of merchantability or fitness for a particular purpose. 13 # See "LICENSE.GPL" in the source distribution for more information. 14 15 # Licensees having purchased or holding a valid Flumotion Advanced 16 # Streaming Server license may use this file in accordance with the 17 # Flumotion Advanced Streaming Server Commercial License Agreement. 18 # See "LICENSE.Flumotion" in the source distribution for more information. 19 20 # Headers in this file shall remain intact. 21 22 """admin model used to connect to multiple managers""" 23 24 from twisted.internet import defer 25 26 from flumotion.common import log, planet, errors, startset, watched 27 from flumotion.admin import admin 28 29 __version__ = "$Rev$" 30 3133 import warnings 34 warnings.warn('Use getAdminForObject', DeprecationWarning, stacklevel=2) 35 return getAdminForObject(object)36 3739 if object.get('parent'): 40 return get_admin_for_object(object.get('parent')) 41 else: 42 return object.admin43 4446 logCategory = 'multiadmin' 47128 129 connectD.addCallbacks(connect_callback, connect_errback) 130 131 def start_callback(_): 132 self._managerConnected(a) 133 134 def start_errback(failure): 135 a.shutdown() 136 return failure 137 138 startD.addCallbacks(start_callback, start_errback) 139 140 return startD 14149 self.admins = watched.WatchedDict() # {managerId: AdminModel} 50 51 self._listeners = [] 52 self._reconnectHandlerIds = {} # managerId => [disconnect, id..] 53 self._startSet = startset.StartSet(self.admins.has_key, 54 errors.AlreadyConnectingError, 55 errors.AlreadyConnectedError)56 57 # Listener implementation 5860 self.debug('emit %r %r %r' % (signal_name, args, kwargs)) 61 assert signal_name != 'handler' 62 for c in self._listeners: 63 if getattr(c, 'model_handler', None): 64 c.model_handler(c, signal_name, *args, **kwargs) 65 elif getattr(c, 'model_%s' % signal_name): 66 getattr(c, 'model_%s' % signal_name)(*args, **kwargs) 67 else: 68 s = 'No model_%s in %r and no model_handler' % (signal_name, c) 69 raise NotImplementedError(s)70 7476 self._listeners.remove(obj)7779 if admin.managerId not in self._reconnectHandlerIds: 80 # the first time a manager is connected to, start listening 81 # for reconnections; intertwingled with removeManager() 82 ids = [] 83 ids.append(admin.connect('connected', 84 self._managerConnected)) 85 ids.append(admin.connect('disconnected', 86 self._managerDisconnected)) 87 self._reconnectHandlerIds[admin.managerId] = admin, ids 88 89 adminplanet = admin.planet 90 self.info('Connected to manager %s (planet %s)', 91 admin.managerId, adminplanet.get('name')) 92 assert admin.managerId not in self.admins 93 self.admins[admin.managerId] = admin 94 self.emit('addPlanet', admin, adminplanet)9597 if admin.managerId in self.admins: 98 self.emit('removePlanet', admin, admin.planet) 99 del self.admins[admin.managerId] 100 else: 101 self.warning('Could not find admin model %r', admin)102105 i = connectionInfo 106 managerId = str(i) 107 108 # This dance of deferreds is here so as to make sure that 109 # removeManager can cancel a pending connection. 110 111 # can raise errors.AlreadyConnectingError or 112 # errors.AlreadyConnectedError 113 try: 114 startD = self._startSet.createStart(managerId) 115 except Exception, e: 116 return defer.fail(e) 117 118 a = admin.AdminModel() 119 connectD = a.connectToManager(i, tenacious, 120 writeConnection=writeConnection) 121 assert a.managerId == managerId 122 123 def connect_callback(_): 124 self._startSet.avatarStarted(managerId)125 126 def connect_errback(failure): 127 self._startSet.avatarStopped(managerId, lambda _: failure)143 self.info('disconnecting from %s', managerId) 144 145 # Four cases: 146 # (1) We have no idea about this managerId, the caller is 147 # confused -- do nothing 148 # (2) We started connecting to this managerId, but never 149 # succeeded -- cancel pending connections 150 # (3) We connected at least once, and are connected now -- we 151 # have entries in the _reconnectHandlerIds and in self.admins -- 152 # disconnect from the signals, disconnect from the remote 153 # manager, and don't try to reconnect 154 # (4) We connected at least once, but are disconnected now -- we 155 # have an entry in _reconnectHandlerIds but not self.admins -- 156 # disconnect from the signals, and stop trying to reconnect 157 158 # stop listening to admin's signals, if the manager had actually 159 # connected at some point 160 if managerId in self._reconnectHandlerIds: 161 admin, handlerIds = self._reconnectHandlerIds.pop(managerId) 162 map(admin.disconnect, handlerIds) # (3) and (4) 163 if managerId not in self.admins: 164 admin.shutdown() # (4) 165 166 if managerId in self.admins: # (3) 167 admin = self.admins[managerId] 168 admin.shutdown() 169 self._managerDisconnected(admin) 170 171 # Firing this has the side effect of errbacking on any pending 172 # start, calling start_errback above if appropriate. (2) 173 self._startSet.avatarStopped( 174 managerId, lambda _: errors.ConnectionCancelledError()) 175 176 # always succeed, see (1) 177 return defer.succeed(managerId)178180 '''Call a procedure on each component that is a child of OBJECT''' 181 # ah, for multimethods... 182 if isinstance(object, planet.AdminPlanetState): 183 self.for_each_component(object.get('atmosphere'), proc) 184 for f in object.get('flows'): 185 self.for_each_component(f, proc) 186 elif (isinstance(object, planet.AdminAtmosphereState) or 187 isinstance(object, planet.AdminFlowState)): 188 for c in object.get('components'): 189 self.for_each_component(c, proc) 190 elif isinstance(object, planet.AdminComponentState): 191 proc(object)192194 '''Call a method on the remote component object associated with 195 a component state''' 196 admin = get_admin_for_object(object) 197 198 def do_op(object): 199 admin.callRemote('component'+op, object)200 self.for_each_component(object, do_op) 201
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Fri Jun 17 07:44:35 2011 | http://epydoc.sourceforge.net |