Package flumotion :: Package manager :: Module manager
[hide private]

Source Code for Module flumotion.manager.manager

   1  # -*- Mode: Python; test-case-name: flumotion.test.test_manager_manager -*- 
   2  # vi:si:et:sw=4:sts=4:ts=4 
   3  # 
   4  # Flumotion - a streaming media server 
   5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
   6  # All rights reserved. 
   7   
   8  # This file may be distributed and/or modified under the terms of 
   9  # the GNU General Public License version 2 as published by 
  10  # the Free Software Foundation. 
  11  # This file is distributed without any warranty; without even the implied 
  12  # warranty of merchantability or fitness for a particular purpose. 
  13  # See "LICENSE.GPL" in the source distribution for more information. 
  14   
  15  # Licensees having purchased or holding a valid Flumotion Advanced 
  16  # Streaming Server license may use this file in accordance with the 
  17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
  18  # See "LICENSE.Flumotion" in the source distribution for more information. 
  19   
  20  # Headers in this file shall remain intact. 
  21   
  22  """ 
  23  manager implementation and related classes 
  24   
  25  API Stability: semi-stable 
  26   
  27  @var  LOCAL_IDENTITY: an identity for the manager itself; can be used 
  28                        to compare against to verify that the manager 
  29                        requested an action 
  30  @type LOCAL_IDENTITY: L{LocalIdentity} 
  31  """ 
  32   
  33  import os 
  34   
  35  from twisted.internet import reactor, defer 
  36  from twisted.python import components, failure 
  37  from twisted.spread import pb 
  38  from twisted.cred import portal 
  39  from zope.interface import implements 
  40   
  41  from flumotion.common import errors, interfaces, log, registry 
  42  from flumotion.common import planet, common, dag, messages, reflectcall, server 
  43  from flumotion.common.i18n import N_, gettexter 
  44  from flumotion.common.identity import RemoteIdentity, LocalIdentity 
  45  from flumotion.common.netutils import addressGetHost 
  46  from flumotion.common.planet import moods 
  47  from flumotion.configure import configure 
  48  from flumotion.manager import admin, component, worker, base, config 
  49  from flumotion.twisted import checkers 
  50  from flumotion.twisted import portal as fportal 
  51  from flumotion.project import project 
  52   
  53  __all__ = ['ManagerServerFactory', 'Vishnu'] 
  54  __version__ = "$Rev$" 
  55  T_ = gettexter() 
  56  LOCAL_IDENTITY = LocalIdentity('manager') 
  57   
  58   
  59  # an internal class 
  60   
  61   
62 -class Dispatcher(log.Loggable):
63 """ 64 I implement L{twisted.cred.portal.IRealm}. 65 I make sure that when a L{pb.Avatar} is requested through me, the 66 Avatar being returned knows about the mind (client) requesting 67 the Avatar. 68 """ 69 70 implements(portal.IRealm) 71 72 logCategory = 'dispatcher' 73
74 - def __init__(self, computeIdentity):
75 """ 76 @param computeIdentity: see L{Vishnu.computeIdentity} 77 @type computeIdentity: callable 78 """ 79 self._interfaceHeavens = {} # interface -> heaven 80 self._computeIdentity = computeIdentity 81 self._bouncer = None 82 self._avatarKeycards = {} # avatarId -> keycard
83
84 - def setBouncer(self, bouncer):
85 """ 86 @param bouncer: the bouncer to authenticate with 87 @type bouncer: L{flumotion.component.bouncers.bouncer} 88 """ 89 self._bouncer = bouncer
90
91 - def registerHeaven(self, heaven, interface):
92 """ 93 Register a Heaven as managing components with the given interface. 94 95 @type interface: L{twisted.python.components.Interface} 96 @param interface: a component interface to register the heaven with. 97 """ 98 assert isinstance(heaven, base.ManagerHeaven) 99 100 self._interfaceHeavens[interface] = heaven
101 102 ### IRealm methods 103
104 - def requestAvatar(self, avatarId, keycard, mind, *ifaces):
105 106 def got_avatar(avatar): 107 if avatar.avatarId in heaven.avatars: 108 raise errors.AlreadyConnectedError(avatar.avatarId) 109 heaven.avatars[avatar.avatarId] = avatar 110 self._avatarKeycards[avatar.avatarId] = keycard 111 112 # OK so this is byzantine, but test_manager_manager actually 113 # uses these kwargs to set its own info. so don't change 114 # these args or their order or you will break your test 115 # suite. 116 117 def cleanup(avatarId=avatar.avatarId, avatar=avatar, mind=mind): 118 self.info('lost connection to client %r', avatar) 119 del heaven.avatars[avatar.avatarId] 120 avatar.onShutdown() 121 # avoid leaking the keycard 122 keycard = self._avatarKeycards.pop(avatarId) 123 if self._bouncer: 124 try: 125 self._bouncer.removeKeycard(keycard) 126 except KeyError: 127 self.warning("bouncer forgot about keycard %r", 128 keycard)
129 130 return (pb.IPerspective, avatar, cleanup)
131 132 def got_error(failure): 133 # If we failed for some reason, we want to drop the connection. 134 # However, we want the failure to get to the client, so we don't 135 # call loseConnection() immediately - we return the failure first. 136 # loseConnection() will then not drop the connection until it has 137 # finished sending the current data to the client. 138 reactor.callLater(0, mind.broker.transport.loseConnection) 139 return failure 140 141 if pb.IPerspective not in ifaces: 142 raise errors.NoPerspectiveError(avatarId) 143 if len(ifaces) != 2: 144 # IPerspective and the specific avatar interface. 145 raise errors.NoPerspectiveError(avatarId) 146 iface = [x for x in ifaces if x != pb.IPerspective][0] 147 if iface not in self._interfaceHeavens: 148 self.warning('unknown interface %r', iface) 149 raise errors.NoPerspectiveError(avatarId) 150 151 heaven = self._interfaceHeavens[iface] 152 klass = heaven.avatarClass 153 host = addressGetHost(mind.broker.transport.getPeer()) 154 d = self._computeIdentity(keycard, host) 155 d.addCallback(lambda identity: \ 156 klass.makeAvatar(heaven, avatarId, identity, mind)) 157 d.addCallbacks(got_avatar, got_error) 158 return d 159 160
161 -class ComponentMapper:
162 """ 163 I am an object that ties together different objects related to a 164 component. I am used as values in a lookup hash in the vishnu. 165 """ 166
167 - def __init__(self):
168 self.state = None # ManagerComponentState; created first 169 self.id = None # avatarId of the eventual ComponentAvatar 170 self.avatar = None # ComponentAvatar 171 self.jobState = None # ManagerJobState of a running component
172 173
174 -class Vishnu(log.Loggable):
175 """ 176 I am the toplevel manager object that knows about all 177 heavens and factories. 178 179 @cvar dispatcher: dispatcher to create avatars 180 @type dispatcher: L{Dispatcher} 181 @cvar workerHeaven: the worker heaven 182 @type workerHeaven: L{worker.WorkerHeaven} 183 @cvar componentHeaven: the component heaven 184 @type componentHeaven: L{component.ComponentHeaven} 185 @cvar adminHeaven: the admin heaven 186 @type adminHeaven: L{admin.AdminHeaven} 187 @cvar configDir: the configuration directory for 188 this Vishnu's manager 189 @type configDir: str 190 """ 191 192 implements(server.IServable) 193 194 logCategory = "vishnu" 195
196 - def __init__(self, name, unsafeTracebacks=0, configDir=None):
197 # create a Dispatcher which will hand out avatars to clients 198 # connecting to me 199 self.dispatcher = Dispatcher(self.computeIdentity) 200 201 self.workerHeaven = self._createHeaven(interfaces.IWorkerMedium, 202 worker.WorkerHeaven) 203 self.componentHeaven = self._createHeaven(interfaces.IComponentMedium, 204 component.ComponentHeaven) 205 self.adminHeaven = self._createHeaven(interfaces.IAdminMedium, 206 admin.AdminHeaven) 207 208 self.running = True 209 210 def setStopped(): 211 self.running = False
212 reactor.addSystemEventTrigger('before', 'shutdown', setStopped) 213 214 if configDir is not None: 215 self.configDir = configDir 216 else: 217 self.configDir = os.path.join(configure.configdir, 218 "managers", name) 219 220 self.bouncer = None # used by manager to authenticate worker/component 221 222 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 223 224 self._componentMappers = {} # any object -> ComponentMapper 225 226 self.state = planet.ManagerPlanetState() 227 self.state.set('name', name) 228 self.state.set('version', configure.version) 229 230 self.plugs = {} # socket -> list of plugs 231 232 # create a portal so that I can be connected to, through our dispatcher 233 # implementing the IRealm and a bouncer 234 self.portal = fportal.BouncerPortal(self.dispatcher, None) 235 #unsafeTracebacks = 1 # for debugging tracebacks to clients 236 self.factory = pb.PBServerFactory(self.portal, 237 unsafeTracebacks=unsafeTracebacks) 238 self.connectionInfo = {} 239 self.setConnectionInfo(None, None, None)
240
241 - def shutdown(self):
242 """Cancel any pending operations in preparation for shutdown. 243 244 This method is mostly useful for unit tests; currently, it is 245 not called during normal operation. Note that the caller is 246 responsible for stopping listening on the port, as the the 247 manager does not have a handle on the twisted port object. 248 249 @returns: A deferred that will fire when the manager has shut 250 down. 251 """ 252 if self.bouncer: 253 return self.bouncer.stop() 254 else: 255 return defer.succeed(None)
256
257 - def setConnectionInfo(self, host, port, use_ssl):
258 info = dict(host=host, port=port, use_ssl=use_ssl) 259 self.connectionInfo.update(info)
260
261 - def getConfiguration(self):
262 """Returns the manager's configuration as a string suitable for 263 importing via loadConfiguration(). 264 """ 265 return config.exportPlanetXml(self.state)
266
267 - def getBundlerBasket(self):
268 """ 269 Return a bundler basket to unbundle from. 270 If the registry files were updated since the last time, the 271 bundlerbasket will be rebuilt. 272 273 @since: 0.2.2 274 @rtype: L{flumotion.common.bundle.BundlerBasket} 275 """ 276 if registry.getRegistry().rebuildNeeded(): 277 self.info("Registry changed, rebuilding") 278 registry.getRegistry().verify(force=True) 279 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 280 elif not self.bundlerBasket.isUptodate(registry.getRegistry().mtime): 281 self.info("BundlerBasket is older than the Registry, rebuilding") 282 self.bundlerBasket = registry.getRegistry().makeBundlerBasket() 283 return self.bundlerBasket
284
285 - def addMessage(self, level, mid, format, *args, **kwargs):
286 """ 287 Convenience message to construct a message and add it to the 288 planet state. `format' should be marked as translatable in the 289 source with N_, and *args will be stored as format arguments. 290 Keyword arguments are passed on to the message constructor. See 291 L{flumotion.common.messages.Message} for the meanings of the 292 rest of the arguments. 293 294 For example:: 295 296 self.addMessage(messages.WARNING, 'foo-warning', 297 N_('The answer is %d'), 42, debug='not really') 298 """ 299 self.addMessageObject(messages.Message(level, 300 T_(format, *args), 301 mid=mid, **kwargs))
302
303 - def addMessageObject(self, message):
304 """ 305 Add a message to the planet state. 306 307 @type message: L{flumotion.common.messages.Message} 308 """ 309 self.state.setitem('messages', message.id, message)
310
311 - def clearMessage(self, mid):
312 """ 313 Clear any messages with the given message ID from the planet 314 state. 315 316 @type mid: message ID, normally a str 317 """ 318 if mid in self.state.get('messages'): 319 self.state.delitem('messages', mid)
320
321 - def adminAction(self, identity, message, args, kw):
322 """ 323 @param identity: L{flumotion.common.identity.Identity} 324 """ 325 socket = 'flumotion.component.plugs.adminaction.AdminActionPlug' 326 if socket in self.plugs: 327 for plug in self.plugs[socket]: 328 plug.action(identity, message, args, kw)
329
330 - def computeIdentity(self, keycard, remoteHost):
331 """ 332 Compute a suitable identity for a remote host. First looks to 333 see if there is a 334 L{flumotion.component.plugs.identity.IdentityProviderPlug} plug 335 installed on the manager, falling back to user@host. 336 337 The identity is only used in the adminaction interface. An 338 example of its use is when you have an adminaction plug that 339 checks an admin's privileges before actually doing an action; 340 the identity object you use here might store the privileges that 341 the admin has. 342 343 @param keycard: the keycard that the remote host used to log in. 344 @type keycard: L{flumotion.common.keycards.Keycard} 345 @param remoteHost: the ip of the remote host 346 @type remoteHost: str 347 348 @rtype: a deferred that will fire a 349 L{flumotion.common.identity.RemoteIdentity} 350 """ 351 352 socket = 'flumotion.component.plugs.identity.IdentityProviderPlug' 353 if socket in self.plugs: 354 for plug in self.plugs[socket]: 355 identity = plug.computeIdentity(keycard, remoteHost) 356 if identity: 357 return identity 358 username = getattr(keycard, 'username', None) 359 return defer.succeed(RemoteIdentity(username, remoteHost))
360
361 - def _addComponent(self, conf, parent, identity):
362 """ 363 Add a component state for the given component config entry. 364 365 @rtype: L{flumotion.common.planet.ManagerComponentState} 366 """ 367 368 self.debug('adding component %s to %s' 369 % (conf.name, parent.get('name'))) 370 371 if identity != LOCAL_IDENTITY: 372 self.adminAction(identity, '_addComponent', (conf, parent), {}) 373 374 state = planet.ManagerComponentState() 375 state.set('name', conf.name) 376 state.set('type', conf.getType()) 377 state.set('workerRequested', conf.worker) 378 state.setMood(moods.sleeping.value) 379 state.set('config', conf.getConfigDict()) 380 381 state.set('parent', parent) 382 parent.append('components', state) 383 384 avatarId = conf.getConfigDict()['avatarId'] 385 386 self.clearMessage('loadComponent-%s' % avatarId) 387 388 configDict = conf.getConfigDict() 389 projectName = configDict['project'] 390 versionTuple = configDict['version'] 391 392 projectVersion = None 393 try: 394 projectVersion = project.get(projectName, 'version') 395 except errors.NoProjectError: 396 m = messages.Warning(T_(N_( 397 "This component is configured for Flumotion project '%s', " 398 "but that project is not installed.\n"), 399 projectName)) 400 state.append('messages', m) 401 402 if projectVersion: 403 self.debug('project %s, version %r, project version %r' % ( 404 projectName, versionTuple, projectVersion)) 405 if not common.checkVersionsCompat( 406 versionTuple, 407 common.versionStringToTuple(projectVersion)): 408 m = messages.Warning(T_(N_( 409 "This component is configured for " 410 "Flumotion '%s' version %s, " 411 "but you are running version %s.\n" 412 "Please update the configuration of the component.\n"), 413 projectName, common.versionTupleToString(versionTuple), 414 projectVersion)) 415 state.append('messages', m) 416 417 # add to mapper 418 m = ComponentMapper() 419 m.state = state 420 m.id = avatarId 421 self._componentMappers[state] = m 422 self._componentMappers[avatarId] = m 423 424 return state
425
426 - def _updateStateFromConf(self, _, conf, identity):
427 """ 428 Add a new config object into the planet state. 429 430 @returns: a list of all components added 431 @rtype: list of L{flumotion.common.planet.ManagerComponentState} 432 """ 433 434 self.debug('syncing up planet state with config') 435 added = [] # added components while parsing 436 437 def checkNotRunning(comp, parentState): 438 name = comp.getName() 439 440 comps = dict([(x.get('name'), x) 441 for x in parentState.get('components')]) 442 runningComps = dict([(x.get('name'), x) 443 for x in parentState.get('components') 444 if x.get('mood') != moods.sleeping.value]) 445 if name not in comps: 446 # We don't have it at all; allow it 447 return True 448 elif name not in runningComps: 449 # We have it, but it's not running. Allow it after deleting 450 # the old one. 451 oldComp = comps[name] 452 self.deleteComponent(oldComp) 453 return True 454 455 # if we get here, the component is already running; warn if 456 # the running configuration is different. Return False in 457 # all cases. 458 parent = comps[name].get('parent').get('name') 459 newConf = c.getConfigDict() 460 oldConf = comps[name].get('config') 461 462 if newConf == oldConf: 463 self.debug('%s already has component %s running with ' 464 'same configuration', parent, name) 465 self.clearMessage('loadComponent-%s' % oldConf['avatarId']) 466 return False 467 468 self.info('%s already has component %s, but configuration ' 469 'not the same -- notifying admin', parent, name) 470 471 diff = config.dictDiff(oldConf, newConf) 472 diffMsg = config.dictDiffMessageString(diff, 'existing', 'new') 473 474 self.addMessage(messages.WARNING, 475 'loadComponent-%s' % oldConf['avatarId'], 476 N_('Could not load component %r into %r: ' 477 'a component is already running with ' 478 'this name, but has a different ' 479 'configuration.'), name, parent, 480 debug=diffMsg) 481 return False
482 483 state = self.state 484 atmosphere = state.get('atmosphere') 485 for c in conf.atmosphere.components.values(): 486 if checkNotRunning(c, atmosphere): 487 added.append(self._addComponent(c, atmosphere, identity)) 488 489 flows = dict([(x.get('name'), x) for x in state.get('flows')]) 490 for f in conf.flows: 491 if f.name in flows: 492 flow = flows[f.name] 493 else: 494 self.info('creating flow %r', f.name) 495 flow = planet.ManagerFlowState(name=f.name, parent=state) 496 state.append('flows', flow) 497 498 for c in f.components.values(): 499 if checkNotRunning(c, flow): 500 added.append(self._addComponent(c, flow, identity)) 501 502 return added 503
504 - def _startComponents(self, components, identity):
505 # now start all components that need starting -- collecting into 506 # an temporary dict of the form {workerId => [components]} 507 componentsToStart = {} 508 for c in components: 509 workerId = c.get('workerRequested') 510 if not workerId in componentsToStart: 511 componentsToStart[workerId] = [] 512 componentsToStart[workerId].append(c) 513 self.debug('_startComponents: componentsToStart %r' % 514 (componentsToStart, )) 515 516 for workerId, componentStates in componentsToStart.items(): 517 self._workerCreateComponents(workerId, componentStates)
518
519 - def _loadComponentConfiguration(self, conf, identity):
520 # makeBouncer only makes a bouncer if there is one in the config 521 d = defer.succeed(None) 522 d.addCallback(self._updateStateFromConf, conf, identity) 523 d.addCallback(self._startComponents, identity) 524 return d
525
526 - def loadComponentConfigurationXML(self, file, identity):
527 """ 528 Load the configuration from the given XML, merging it on top of 529 the currently running configuration. 530 531 @param file: file to parse, either as an open file object, 532 or as the name of a file to open 533 @type file: str or file 534 @param identity: The identity making this request.. This is used by the 535 adminaction logging mechanism in order to say who is 536 performing the action. 537 @type identity: L{flumotion.common.identity.Identity} 538 """ 539 self.debug('loading configuration') 540 mid = 'loadComponent-parse-error' 541 if isinstance(file, str): 542 mid += '-%s' % file 543 try: 544 self.clearMessage(mid) 545 conf = config.PlanetConfigParser(file) 546 conf.parse() 547 return self._loadComponentConfiguration(conf, identity) 548 except errors.ConfigError, e: 549 self.addMessage(messages.WARNING, mid, 550 N_('Invalid component configuration.'), 551 debug=e.args[0]) 552 return defer.fail(e) 553 except errors.UnknownComponentError, e: 554 if isinstance(file, str): 555 debug = 'Configuration loaded from file %r' % file 556 else: 557 debug = 'Configuration loaded remotely' 558 self.addMessage(messages.WARNING, mid, 559 N_('Unknown component in configuration: %s.'), 560 e.args[0], debug=debug) 561 return defer.fail(e) 562 except Exception, e: 563 self.addMessage(messages.WARNING, mid, 564 N_('Unknown error while loading configuration.'), 565 debug=log.getExceptionMessage(e)) 566 return defer.fail(e)
567
568 - def _loadManagerPlugs(self, conf):
569 # Load plugs 570 for socket, plugs in conf.plugs.items(): 571 if not socket in self.plugs: 572 self.plugs[socket] = [] 573 574 for args in plugs: 575 self.debug('loading plug type %s for socket %s' 576 % (args['type'], socket)) 577 defs = registry.getRegistry().getPlug(args['type']) 578 e = defs.getEntry() 579 call = reflectcall.reflectCallCatching 580 581 plug = call(errors.ConfigError, 582 e.getModuleName(), e.getFunction(), args) 583 self.plugs[socket].append(plug)
584
585 - def startManagerPlugs(self):
586 for socket in self.plugs: 587 for plug in self.plugs[socket]: 588 self.debug('starting plug %r for socket %s', plug, socket) 589 plug.start(self)
590
591 - def _loadManagerBouncer(self, conf):
592 if not (conf.bouncer): 593 self.warning('no bouncer defined, nothing can access the ' 594 'manager') 595 return defer.succeed(None) 596 597 self.debug('going to start manager bouncer %s of type %s', 598 conf.bouncer.name, conf.bouncer.type) 599 600 defs = registry.getRegistry().getComponent(conf.bouncer.type) 601 entry = defs.getEntryByType('component') 602 # FIXME: use entry.getModuleName() (doesn't work atm?) 603 moduleName = defs.getSource() 604 methodName = entry.getFunction() 605 bouncer = reflectcall.createComponent(moduleName, methodName, 606 conf.bouncer.getConfigDict()) 607 d = bouncer.waitForHappy() 608 609 def setupCallback(result): 610 bouncer.debug('started') 611 self.setBouncer(bouncer)
612 613 def setupErrback(failure): 614 self.warning('Error starting manager bouncer') 615 d.addCallbacks(setupCallback, setupErrback) 616 return d 617
618 - def loadManagerConfigurationXML(self, file):
619 """ 620 Load manager configuration from the given XML. The manager 621 configuration is currently used to load the manager's bouncer 622 and plugs, and is only run once at startup. 623 624 @param file: file to parse, either as an open file object, 625 or as the name of a file to open 626 @type file: str or file 627 """ 628 self.debug('loading configuration') 629 conf = config.ManagerConfigParser(file) 630 conf.parseBouncerAndPlugs() 631 self._loadManagerPlugs(conf) 632 self._loadManagerBouncer(conf) 633 conf.unlink()
634 635 __pychecker__ = 'maxargs=11' # hahaha 636
637 - def loadComponent(self, identity, componentType, componentId, 638 componentLabel, properties, workerName, 639 plugs, eaters, isClockMaster, virtualFeeds):
640 """ 641 Load a component into the manager configuration. 642 643 See L{flumotion.manager.admin.AdminAvatar.perspective_loadComponent} 644 for a definition of the argument types. 645 """ 646 self.debug('loading %s component %s on %s', 647 componentType, componentId, workerName) 648 parentName, compName = common.parseComponentId(componentId) 649 650 if isClockMaster: 651 raise NotImplementedError("Clock master components are not " 652 "yet supported") 653 if worker is None: 654 raise errors.ConfigError("Component %r needs to specify the" 655 " worker on which it should run" 656 % componentId) 657 658 state = self.state 659 compState = None 660 661 compConf = config.ConfigEntryComponent(compName, parentName, 662 componentType, 663 componentLabel, 664 properties, 665 plugs, workerName, 666 eaters, isClockMaster, 667 None, None, virtualFeeds) 668 669 if compConf.defs.getNeedsSynchronization(): 670 raise NotImplementedError("Components that need " 671 "synchronization are not yet " 672 "supported") 673 674 if parentName == 'atmosphere': 675 parentState = state.get('atmosphere') 676 else: 677 flows = dict([(x.get('name'), x) for x in state.get('flows')]) 678 if parentName in flows: 679 parentState = flows[parentName] 680 else: 681 self.info('creating flow %r', parentName) 682 parentState = planet.ManagerFlowState(name=parentName, 683 parent=state) 684 state.append('flows', parentState) 685 686 components = [x.get('name') for x in parentState.get('components')] 687 if compName in components: 688 self.debug('%r already has component %r', parentName, compName) 689 raise errors.ComponentAlreadyExistsError(compName) 690 691 compState = self._addComponent(compConf, parentState, identity) 692 693 self._startComponents([compState], identity) 694 695 return compState
696
697 - def _createHeaven(self, interface, klass):
698 """ 699 Create a heaven of the given klass that will send avatars to clients 700 implementing the given medium interface. 701 702 @param interface: the medium interface to create a heaven for 703 @type interface: L{flumotion.common.interfaces.IMedium} 704 @param klass: the type of heaven to create 705 @type klass: an implementor of L{flumotion.common.interfaces.IHeaven} 706 """ 707 assert issubclass(interface, interfaces.IMedium) 708 heaven = klass(self) 709 self.dispatcher.registerHeaven(heaven, interface) 710 return heaven
711
712 - def setBouncer(self, bouncer):
713 """ 714 @type bouncer: L{flumotion.component.bouncers.bouncer.Bouncer} 715 """ 716 if self.bouncer: 717 self.warning("manager already had a bouncer, setting anyway") 718 719 self.bouncer = bouncer 720 self.portal.bouncer = bouncer 721 self.dispatcher.setBouncer(bouncer)
722
723 - def getFactory(self):
724 return self.factory
725
726 - def componentCreate(self, componentState):
727 """ 728 Create the given component. This will currently also trigger 729 a start eventually when the component avatar attaches. 730 731 The component should be sleeping. 732 The worker it should be started on should be present. 733 """ 734 m = componentState.get('mood') 735 if m != moods.sleeping.value: 736 raise errors.ComponentMoodError("%r not sleeping but %s" % ( 737 componentState, moods.get(m).name)) 738 739 p = componentState.get('moodPending') 740 if p != None: 741 raise errors.ComponentMoodError( 742 "%r already has a pending mood %s" % ( 743 componentState, moods.get(p).name)) 744 745 # find a worker this component can start on 746 workerId = (componentState.get('workerName') 747 or componentState.get('workerRequested')) 748 749 if not workerId in self.workerHeaven.avatars: 750 raise errors.ComponentNoWorkerError( 751 "worker %s is not logged in" % workerId) 752 else: 753 return self._workerCreateComponents(workerId, [componentState])
754
755 - def _componentStopNoAvatar(self, componentState, avatarId):
756 # NB: reset moodPending if asked to stop without an avatar 757 # because we changed above to allow stopping even if moodPending 758 # is happy 759 760 def stopSad(): 761 self.debug('asked to stop a sad component without avatar') 762 for mid in componentState.get('messages')[:]: 763 self.debug("Deleting message %r", mid) 764 componentState.remove('messages', mid) 765 766 componentState.setMood(moods.sleeping.value) 767 componentState.set('moodPending', None) 768 return defer.succeed(None)
769 770 def stopLost(): 771 772 def gotComponents(comps): 773 return avatarId in comps 774 775 def gotJobRunning(running): 776 if running: 777 self.warning('asked to stop lost component %r, but ' 778 'it is still running', avatarId) 779 # FIXME: put a message on the state to suggest a 780 # kill? 781 msg = "Cannot stop lost component which is still running." 782 raise errors.ComponentMoodError(msg) 783 else: 784 self.debug('component %r seems to be really lost, ' 785 'setting to sleeping') 786 componentState.setMood(moods.sleeping.value) 787 componentState.set('moodPending', None) 788 return None 789 790 self.debug('asked to stop a lost component without avatar') 791 workerName = componentState.get('workerRequested') 792 if workerName and self.workerHeaven.hasAvatar(workerName): 793 self.debug('checking if component has job process running') 794 d = self.workerHeaven.getAvatar(workerName).getComponents() 795 d.addCallback(gotComponents) 796 d.addCallback(gotJobRunning) 797 return d 798 else: 799 self.debug('component lacks a worker, setting to sleeping') 800 d = defer.maybeDeferred(gotJobRunning, False) 801 return d 802 803 def stopUnknown(): 804 msg = ('asked to stop a component without avatar in mood %s' 805 % moods.get(mood)) 806 self.warning(msg) 807 return defer.fail(errors.ComponentMoodError(msg)) 808 809 mood = componentState.get('mood') 810 stoppers = {moods.sad.value: stopSad, 811 moods.lost.value: stopLost} 812 return stoppers.get(mood, stopUnknown)() 813
814 - def _componentStopWithAvatar(self, componentState, componentAvatar):
815 # FIXME: This deferred is just the remote call; there's no actual 816 # deferred for completion of shutdown. 817 d = componentAvatar.stop() 818 819 return d
820
821 - def componentStop(self, componentState):
822 """ 823 Stop the given component. 824 If the component was sad, we clear its sad state as well, 825 since the stop was explicitly requested by the admin. 826 827 @type componentState: L{planet.ManagerComponentState} 828 829 @rtype: L{twisted.internet.defer.Deferred} 830 """ 831 self.debug('componentStop(%r)', componentState) 832 # We permit stopping a component even if it has a pending mood of 833 # happy, so that if it never gets to happy, we can still stop it. 834 if (componentState.get('moodPending') != None and 835 componentState.get('moodPending') != moods.happy.value): 836 self.debug("Pending mood is %r", componentState.get('moodPending')) 837 838 raise errors.BusyComponentError(componentState) 839 840 m = self.getComponentMapper(componentState) 841 if not m: 842 # We have a stale componentState for an already-deleted 843 # component 844 self.warning("Component mapper for component state %r doesn't " 845 "exist", componentState) 846 raise errors.UnknownComponentError(componentState) 847 elif not m.avatar: 848 return self._componentStopNoAvatar(componentState, m.id) 849 else: 850 return self._componentStopWithAvatar(componentState, m.avatar)
851
852 - def componentAddMessage(self, avatarId, message):
853 """ 854 Set the given message on the given component's state. 855 Can be called e.g. by a worker to report on a crashed component. 856 Sets the mood to sad if it is an error message. 857 """ 858 if not avatarId in self._componentMappers: 859 self.warning('asked to set a message on non-mapped component %s' % 860 avatarId) 861 return 862 863 m = self._componentMappers[avatarId] 864 m.state.append('messages', message) 865 if message.level == messages.ERROR: 866 self.debug('Error message makes component sad') 867 m.state.setMood(moods.sad.value)
868 869 # FIXME: unify naming of stuff like this 870
871 - def workerAttached(self, workerAvatar):
872 # called when a worker logs in 873 workerId = workerAvatar.avatarId 874 self.debug('vishnu.workerAttached(): id %s' % workerId) 875 876 # Create all components assigned to this worker. Note that the 877 # order of creation is unimportant, it's only the order of 878 # starting that matters (and that's different code). 879 components = [c for c in self._getComponentsToCreate() 880 if c.get('workerRequested') in (workerId, None)] 881 # So now, check what components worker is running 882 # so we can remove them from this components list 883 # also add components we have that are lost but not 884 # in list given by worker 885 d = workerAvatar.getComponents() 886 887 def workerAvatarComponentListReceived(workerComponents): 888 # list() is called to work around a pychecker bug. FIXME. 889 lostComponents = list([c for c in self.getComponentStates() 890 if c.get('workerRequested') == workerId and \ 891 c.get('mood') == moods.lost.value]) 892 for comp in workerComponents: 893 # comp is an avatarId string 894 # components is a list of {ManagerComponentState} 895 if comp in self._componentMappers: 896 compState = self._componentMappers[comp].state 897 if compState in components: 898 components.remove(compState) 899 if compState in lostComponents: 900 lostComponents.remove(compState) 901 902 for compState in lostComponents: 903 self.info( 904 "Restarting previously lost component %s on worker %s", 905 self._componentMappers[compState].id, workerId) 906 # We set mood to sleeping first. This allows things to 907 # distinguish between a newly-started component and a lost 908 # component logging back in. 909 compState.set('moodPending', None) 910 compState.setMood(moods.sleeping.value) 911 912 allComponents = components + lostComponents 913 914 if not allComponents: 915 self.debug( 916 "vishnu.workerAttached(): no components for this worker") 917 return 918 919 self._workerCreateComponents(workerId, allComponents)
920 d.addCallback(workerAvatarComponentListReceived) 921 922 reactor.callLater(0, self.componentHeaven.feedServerAvailable, 923 workerId) 924
925 - def _workerCreateComponents(self, workerId, components):
926 """ 927 Create the list of components on the given worker, sequentially, but 928 in no specific order. 929 930 @param workerId: avatarId of the worker 931 @type workerId: string 932 @param components: components to start 933 @type components: list of 934 L{flumotion.common.planet.ManagerComponentState} 935 """ 936 self.debug("_workerCreateComponents: workerId %r, components %r" % ( 937 workerId, components)) 938 939 if not workerId in self.workerHeaven.avatars: 940 self.debug('worker %s not logged in yet, delaying ' 941 'component start' % workerId) 942 return defer.succeed(None) 943 944 workerAvatar = self.workerHeaven.avatars[workerId] 945 946 d = defer.Deferred() 947 948 for c in components: 949 componentType = c.get('type') 950 conf = c.get('config') 951 self.debug('scheduling create of %s on %s' 952 % (conf['avatarId'], workerId)) 953 d.addCallback(self._workerCreateComponentDelayed, 954 workerAvatar, c, componentType, conf) 955 956 d.addCallback(lambda result: self.debug( 957 '_workerCreateComponents(): completed setting up create chain')) 958 959 # now trigger the chain 960 self.debug('_workerCreateComponents(): triggering create chain') 961 d.callback(None) 962 #reactor.callLater(0, d.callback, None) 963 return d
964
965 - def _workerCreateComponentDelayed(self, result, workerAvatar, 966 componentState, componentType, conf):
967 968 avatarId = conf['avatarId'] 969 nice = conf.get('nice', 0) 970 971 # we set the moodPending to HAPPY, so this component only gets 972 # asked to start once 973 componentState.set('moodPending', moods.happy.value) 974 975 d = workerAvatar.createComponent(avatarId, componentType, nice, 976 conf) 977 # FIXME: here we get the avatar Id of the component we wanted 978 # started, so now attach it to the planetState's component state 979 d.addCallback(self._createCallback, componentState) 980 d.addErrback(self._createErrback, componentState)
981 982 # FIXME: shouldn't we return d here to make sure components 983 # wait on each other to be started ? 984
985 - def _createCallback(self, result, componentState):
986 self.debug('got avatarId %s for state %s' % (result, componentState)) 987 m = self._componentMappers[componentState] 988 assert result == m.id, "received id %s is not the expected id %s" % ( 989 result, m.id)
990
991 - def _createErrback(self, failure, state):
992 # FIXME: make ConfigError copyable so we can .check() it here 993 # and print a nicer warning 994 self.warning('failed to create component %s: %s', 995 state.get('name'), log.getFailureMessage(failure)) 996 997 if failure.check(errors.ComponentAlreadyRunningError): 998 if self._componentMappers[state].jobState: 999 self.info('component appears to have logged in in the ' 1000 'meantime') 1001 else: 1002 self.info('component appears to be running already; ' 1003 'treating it as lost until it logs in') 1004 state.setMood(moods.lost.value) 1005 else: 1006 message = messages.Error(T_( 1007 N_("The component could not be started.")), 1008 debug=log.getFailureMessage(failure)) 1009 1010 state.setMood(moods.sad.value) 1011 state.append('messages', message) 1012 1013 return None
1014
1015 - def workerDetached(self, workerAvatar):
1016 # called when a worker logs out 1017 workerId = workerAvatar.avatarId 1018 self.debug('vishnu.workerDetached(): id %s' % workerId)
1019
1020 - def addComponentToFlow(self, componentState, flowName):
1021 # check if we have this flow yet and add if not 1022 if flowName == 'atmosphere': 1023 # treat the atmosphere like a flow, although it's not 1024 flow = self.state.get('atmosphere') 1025 else: 1026 flow = self._getFlowByName(flowName) 1027 if not flow: 1028 self.info('Creating flow "%s"' % flowName) 1029 flow = planet.ManagerFlowState() 1030 flow.set('name', flowName) 1031 flow.set('parent', self.state) 1032 self.state.append('flows', flow) 1033 1034 componentState.set('parent', flow) 1035 flow.append('components', componentState)
1036
1037 - def registerComponent(self, componentAvatar):
1038 # fetch or create a new mapper 1039 m = (self.getComponentMapper(componentAvatar.avatarId) 1040 or ComponentMapper()) 1041 1042 m.state = componentAvatar.componentState 1043 m.jobState = componentAvatar.jobState 1044 m.id = componentAvatar.avatarId 1045 m.avatar = componentAvatar 1046 1047 self._componentMappers[m.state] = m 1048 self._componentMappers[m.jobState] = m 1049 self._componentMappers[m.id] = m 1050 self._componentMappers[m.avatar] = m
1051
1052 - def unregisterComponent(self, componentAvatar):
1053 # called when the component is logging out 1054 # clear up jobState and avatar 1055 self.debug('unregisterComponent(%r): cleaning up state' % 1056 componentAvatar) 1057 1058 m = self._componentMappers[componentAvatar] 1059 1060 # unmap jobstate 1061 try: 1062 del self._componentMappers[m.jobState] 1063 except KeyError: 1064 self.warning('Could not remove jobState for %r' % componentAvatar) 1065 m.jobState = None 1066 1067 m.state.set('pid', None) 1068 m.state.set('workerName', None) 1069 m.state.set('moodPending', None) 1070 1071 # unmap avatar 1072 del self._componentMappers[m.avatar] 1073 m.avatar = None
1074
1075 - def getComponentStates(self):
1076 cList = self.state.getComponents() 1077 self.debug('getComponentStates(): %d components' % len(cList)) 1078 for c in cList: 1079 self.log(repr(c)) 1080 mood = c.get('mood') 1081 if mood == None: 1082 self.warning('%s has mood None' % c.get('name')) 1083 1084 return cList
1085
1086 - def deleteComponent(self, componentState):
1087 """ 1088 Empty the planet of the given component. 1089 1090 @returns: a deferred that will fire when all listeners have been 1091 notified of the removal of the component. 1092 """ 1093 self.debug('deleting component %r from state', componentState) 1094 c = componentState 1095 if c not in self._componentMappers: 1096 raise errors.UnknownComponentError(c) 1097 1098 flow = componentState.get('parent') 1099 if (c.get('moodPending') != None 1100 or c.get('mood') is not moods.sleeping.value): 1101 raise errors.BusyComponentError(c) 1102 1103 del self._componentMappers[self._componentMappers[c].id] 1104 del self._componentMappers[c] 1105 return flow.remove('components', c)
1106
1107 - def _getFlowByName(self, flowName):
1108 for flow in self.state.get('flows'): 1109 if flow.get('name') == flowName: 1110 return flow
1111
1112 - def deleteFlow(self, flowName):
1113 """ 1114 Empty the planet of a flow. 1115 1116 @returns: a deferred that will fire when the flow is removed. 1117 """ 1118 1119 flow = self._getFlowByName(flowName) 1120 if flow is None: 1121 raise ValueError("No flow called %s found" % (flowName, )) 1122 1123 components = flow.get('components') 1124 for c in components: 1125 # if any component is already in a mood change/command, fail 1126 if (c.get('moodPending') != None or 1127 c.get('mood') is not moods.sleeping.value): 1128 raise errors.BusyComponentError(c) 1129 for c in components: 1130 del self._componentMappers[self._componentMappers[c].id] 1131 del self._componentMappers[c] 1132 d = flow.empty() 1133 d.addCallback(lambda _: self.state.remove('flows', flow)) 1134 return d
1135
1136 - def emptyPlanet(self):
1137 """ 1138 Empty the planet of all components, and flows. Also clears all 1139 messages. 1140 1141 @returns: a deferred that will fire when the planet is empty. 1142 """ 1143 for mid in self.state.get('messages').keys(): 1144 self.clearMessage(mid) 1145 1146 # first get all components to sleep 1147 components = self.getComponentStates() 1148 1149 # if any component is already in a mood change/command, fail 1150 components = [c for c in components 1151 if c.get('moodPending') != None] 1152 if components: 1153 state = components[0] 1154 raise errors.BusyComponentError( 1155 state, 1156 "moodPending is %s" % moods.get(state.get('moodPending'))) 1157 1158 # filter out the ones that aren't sleeping and stop them 1159 components = [c for c in self.getComponentStates() 1160 if c.get('mood') is not moods.sleeping.value] 1161 1162 # create a big deferred for stopping everything 1163 d = defer.Deferred() 1164 1165 self.debug('need to stop %d components: %r' % ( 1166 len(components), components)) 1167 1168 for c in components: 1169 avatar = self._componentMappers[c].avatar 1170 # If this has logged out, but isn't sleeping (so is sad or lost), 1171 # we won't have an avatar. So, stop if it we can. 1172 if avatar: 1173 d.addCallback(lambda result, a: a.stop(), avatar) 1174 else: 1175 assert (c.get('mood') is moods.sad.value or 1176 c.get('mood') is moods.lost.value) 1177 1178 d.addCallback(self._emptyPlanetCallback) 1179 1180 # trigger the deferred after returning 1181 reactor.callLater(0, d.callback, None) 1182 1183 return d
1184
1185 - def _emptyPlanetCallback(self, result):
1186 # gets called after all components have stopped 1187 # cleans up the rest of the planet state 1188 components = self.getComponentStates() 1189 self.debug('_emptyPlanetCallback: need to delete %d components' % 1190 len(components)) 1191 1192 for c in components: 1193 if c.get('mood') is not moods.sleeping.value: 1194 self.warning('Component %s is not sleeping', c.get('name')) 1195 # clear mapper; remove componentstate and id 1196 m = self._componentMappers[c] 1197 del self._componentMappers[m.id] 1198 del self._componentMappers[c] 1199 1200 # if anything's left, we have a mistake somewhere 1201 l = self._componentMappers.keys() 1202 if len(l) > 0: 1203 self.warning('mappers still has keys %r' % (repr(l))) 1204 1205 dList = [] 1206 1207 dList.append(self.state.get('atmosphere').empty()) 1208 1209 for f in self.state.get('flows'): 1210 self.debug('appending deferred for emptying flow %r' % f) 1211 dList.append(f.empty()) 1212 self.debug('appending deferred for removing flow %r' % f) 1213 dList.append(self.state.remove('flows', f)) 1214 self.debug('appended deferreds') 1215 1216 dl = defer.DeferredList(dList) 1217 return dl
1218
1219 - def _getComponentsToCreate(self):
1220 """ 1221 @rtype: list of L{flumotion.common.planet.ManagerComponentState} 1222 """ 1223 # return a list of components that are sleeping 1224 components = self.state.getComponents() 1225 1226 # filter the ones that are sleeping 1227 # NOTE: now sleeping indicates that there is no existing job 1228 # as when jobs are created, mood becomes waking, so no need to 1229 # filter on moodPending 1230 isSleeping = lambda c: c.get('mood') == moods.sleeping.value 1231 components = filter(isSleeping, components) 1232 return components
1233
1234 - def _getWorker(self, workerName):
1235 # returns the WorkerAvatar with the given name 1236 if not workerName in self.workerHeaven.avatars: 1237 raise errors.ComponentNoWorkerError("Worker %s not logged in?" 1238 % workerName) 1239 1240 return self.workerHeaven.avatars[workerName]
1241
1242 - def getWorkerFeedServerPort(self, workerName):
1243 if workerName in self.workerHeaven.avatars: 1244 return self._getWorker(workerName).feedServerPort 1245 return None
1246
1247 - def reservePortsOnWorker(self, workerName, numPorts):
1248 """ 1249 Requests a number of ports on the worker named workerName. The 1250 ports will be reserved for the use of the caller until 1251 releasePortsOnWorker is called. 1252 1253 @returns: a list of ports as integers 1254 """ 1255 return self._getWorker(workerName).reservePorts(numPorts)
1256
1257 - def releasePortsOnWorker(self, workerName, ports):
1258 """ 1259 Tells the manager that the given ports are no longer being used, 1260 and may be returned to the allocation pool. 1261 """ 1262 try: 1263 return self._getWorker(workerName).releasePorts(ports) 1264 except errors.ComponentNoWorkerError, e: 1265 self.warning('could not release ports: %r' % e.args)
1266
1267 - def getComponentMapper(self, object):
1268 """ 1269 Look up an object mapper given the object. 1270 1271 @rtype: L{ComponentMapper} or None 1272 """ 1273 if object in self._componentMappers.keys(): 1274 return self._componentMappers[object] 1275 1276 return None
1277
1278 - def getManagerComponentState(self, object):
1279 """ 1280 Look up an object mapper given the object. 1281 1282 @rtype: L{ComponentMapper} or None 1283 """ 1284 if object in self._componentMappers.keys(): 1285 return self._componentMappers[object].state 1286 1287 return None
1288
1289 - def invokeOnComponents(self, componentType, methodName, *args, **kwargs):
1290 """ 1291 Invokes method on all components of a certain type 1292 """ 1293 1294 def invokeOnOneComponent(component, methodName, *args, **kwargs): 1295 m = self.getComponentMapper(component) 1296 if not m: 1297 self.warning('Component %s not mapped. Maybe deleted.', 1298 component.get('name')) 1299 raise errors.UnknownComponentError(component) 1300 1301 avatar = m.avatar 1302 if not avatar: 1303 self.warning('No avatar for %s, cannot call remote', 1304 component.get('name')) 1305 raise errors.SleepingComponentError(component) 1306 1307 try: 1308 return avatar.mindCallRemote(methodName, *args, **kwargs) 1309 except Exception, e: 1310 log_message = log.getExceptionMessage(e) 1311 msg = "exception on remote call %s: %s" % (methodName, 1312 log_message) 1313 self.warning(msg) 1314 raise errors.RemoteMethodError(methodName, 1315 log_message)
1316 1317 # only do this on happy or hungry components of type componentType 1318 dl_array = [] 1319 for c in self.getComponentStates(): 1320 if c.get('type') == componentType and \ 1321 (c.get('mood') is moods.happy.value or 1322 c.get('mood') is moods.hungry.value): 1323 self.info("component %r to have %s run", c, methodName) 1324 d = invokeOnOneComponent(c, methodName, *args, **kwargs) 1325 dl_array.append(d) 1326 dl = defer.DeferredList(dl_array) 1327 return dl 1328