1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """
23 manager-side objects for components
24
25 API Stability: semi-stable
26 """
27
28 import time
29
30 from twisted.spread import pb
31 from twisted.internet import reactor, defer
32 from twisted.internet import error as terror
33 from twisted.python.failure import Failure
34 from zope.interface import implements
35
36 from flumotion.configure import configure
37 from flumotion.manager import base, config
38 from flumotion.common import errors, interfaces, keycards, log, planet
39 from flumotion.common import messages, common
40 from flumotion.common.i18n import N_, gettexter
41 from flumotion.common.planet import moods
42 from flumotion.twisted import flavors
43
44 __version__ = "$Rev$"
45 T_ = gettexter()
46
47
49 """
50 I am a Manager-side avatar for a component.
51 I live in the L{ComponentHeaven}.
52
53 Each component that logs in to the manager gets an avatar created for it
54 in the manager.
55
56 @cvar avatarId: the L{componentId<common.componentId>}
57 @type avatarId: str
58 @cvar jobState: job state of this avatar's component
59 @type jobState: L{flumotion.common.planet.ManagerJobState}
60 @cvar componentState: component state of this avatar's component
61 @type componentState: L{flumotion.common.planet.ManagerComponentState}
62 """
63
64 logCategory = 'comp-avatar'
65
66 - def __init__(self, heaven, avatarId, remoteIdentity, mind, conf,
67 jobState, clocking):
83
84
85
94
95
96
99
100 def gotStates(result):
101 (_s1, conf), (_s2, jobState), (_s3, clocking) = result
102 assert _s1 and _s2 and _s3
103 log.debug('component-avatar', 'got state information')
104 return (heaven, avatarId, remoteIdentity, mind,
105 conf, jobState, clocking)
106 log.debug('component-avatar', 'calling mind for state information')
107 d = defer.DeferredList([mind.callRemote('getConfig'),
108 mind.callRemote('getState'),
109 mind.callRemote('getMasterClockInfo')],
110 fireOnOneErrback=True)
111 d.addCallback(gotStates)
112 return d
113 makeAvatarInitArgs = classmethod(makeAvatarInitArgs)
114
141
142
143
144 - def addMessage(self, level, mid, format, *args, **kwargs):
145 """
146 Convenience message to construct a message and add it to the
147 component state. `format' should be marked as translatable in
148 the source with N_, and *args will be stored as format
149 arguments. Keyword arguments are passed on to the message
150 constructor. See L{flumotion.common.messages.Message} for the
151 meanings of the rest of the arguments.
152
153 For example::
154
155 self.addMessage(messages.WARNING, 'foo-warning',
156 N_('The answer is %d'), 42, debug='not really')
157 """
158 self.addMessageObject(messages.Message(level,
159 T_(format, *args),
160 mid=mid, **kwargs))
161
163 """
164 Add a message to the planet state.
165
166 @type message: L{flumotion.common.messages.Message}
167 """
168 self.componentState.append('messages', message)
169
186
188
189
190
191
192
193
194
195
196
197 def verifyExistingComponentState(conf, state):
198
199 state.setJobState(self.jobState)
200 self.componentState = state
201
202 self.upgradeConfig(state, conf)
203 if state.get('config') != conf:
204 diff = config.dictDiff(state.get('config'), conf)
205 diffMsg = config.dictDiffMessageString(diff,
206 'internal conf',
207 'running conf')
208 self.addMessage(messages.WARNING, 'stale-config',
209 N_("Component logged in with stale "
210 "configuration. To fix this, stop "
211 "this component and then restart "
212 "the manager."),
213 debug=("Updating internal conf from "
214 "running conf:\n" + diffMsg))
215 self.warning('updating internal component state for %r',
216 state)
217 self.debug('changes to conf: %s',
218 config.dictDiffMessageString(diff))
219 state.set('config', conf)
220
221 def makeNewComponentState(conf):
222
223 state = planet.ManagerComponentState()
224 state.setJobState(self.jobState)
225 self.componentState = state
226
227 self.upgradeConfig(state, conf)
228
229 flowName, compName = conf['parent'], conf['name']
230
231 state.set('name', compName)
232 state.set('type', conf['type'])
233 state.set('workerRequested', self.jobState.get('workerName'))
234 state.set('config', conf)
235 self.vishnu.addComponentToFlow(state, flowName)
236 return state
237
238 mState = self.vishnu.getManagerComponentState(self.avatarId)
239 if mState:
240 verifyExistingComponentState(conf, mState)
241 else:
242 makeNewComponentState(conf)
243
245 """
246 Tell the component to provide a master clock.
247
248 @rtype: L{twisted.internet.defer.Deferred}
249 """
250
251 def success(clocking):
252 self.clocking = clocking
253 self.heaven.masterClockAvailable(self)
254
255 def error(failure):
256 self.addMessage(messages.WARNING, 'provide-master-clock',
257 N_('Failed to provide the master clock'),
258 debug=log.getFailureMessage(failure))
259 self.vishnu.releasePortsOnWorker(self.getWorkerName(), [port])
260
261 if self.clocking:
262 self.heaven.masterClockAvailable(self)
263 else:
264 (port, ) = self.vishnu.reservePortsOnWorker(
265 self.getWorkerName(), 1)
266 self.debug('provideMasterClock on port %d', port)
267
268 d = self.mindCallRemote('provideMasterClock', port)
269 d.addCallbacks(success, error)
270
272 """
273 Returns the port on which a feed server for this component is
274 listening on.
275
276 @rtype: int
277 """
278 return self.vishnu.getWorkerFeedServerPort(self.getWorkerName())
279
281 """
282 Get the IP address of the manager as seen by the component.
283
284 @rtype: str
285 """
286 return self.jobState.get('manager-ip')
287
289 """
290 Return the name of the worker.
291
292 @rtype: str
293 """
294 return self.jobState.get('workerName')
295
297 """
298 Return the PID of the component.
299
300 @rtype: int
301 """
302 return self.jobState.get('pid')
303
305 """
306 Get the name of the component.
307
308 @rtype: str
309 """
310 return self.componentState.get('name')
311
313 """
314 Get the name of the component's parent.
315
316 @rtype: str
317 """
318 return self.componentState.get('parent').get('name')
319
321 """
322 Get the component type name of the component.
323
324 @rtype: str
325 """
326 return self.componentState.get('type')
327
329 """
330 Get the set of eaters that this component eats from.
331
332 @rtype: dict of eaterName -> [(feedId, eaterAlias)]
333 """
334 return self.componentState.get('config').get('eater', {})
335
337 """
338 Get the list of feeders that this component provides.
339
340 @rtype: list of feederName
341 """
342 return self.componentState.get('config').get('feed', [])
343
345 """
346 Get the feedId of a feed provided or consumed by this component.
347
348 @param feedName: The name of the feed (i.e., eater alias or
349 feeder name)
350 @rtype: L{flumotion.common.common.feedId}
351 """
352 return common.feedId(self.getName(), feedName)
353
355 """
356 Get the full feedId of a feed provided or consumed by this
357 component.
358
359 @param feedName: The name of the feed (i.e., eater alias or
360 feeder name)
361 @rtype: L{flumotion.common.common.fullFeedId}
362 """
363 return common.fullFeedId(self.getParentName(),
364 self.getName(), feedName)
365
367 """
368 Get the set of virtual feeds provided by this component.
369
370 @rtype: dict of fullFeedId -> (ComponentAvatar, feederName)
371 """
372 conf = self.componentState.get('config')
373 ret = {}
374 for feedId, feederName in conf.get('virtual-feeds', {}).items():
375 vComp, vFeed = common.parseFeedId(feedId)
376 ffid = common.fullFeedId(self.getParentName(), vComp, vFeed)
377 ret[ffid] = (self, feederName)
378 return ret
379
381 """
382 Get the worker that this component should run on.
383
384 @rtype: str
385 """
386 return self.componentState.get('workerRequested')
387
389 """
390 Get this component's clock master, if any.
391
392 @rtype: avatarId or None
393 """
394 return self.componentState.get('config')['clock-master']
395
397 """
398 Tell the remote component to shut down.
399 """
400 self._shutdownDeferred = defer.Deferred()
401
402 self.mindCallRemote('stop')
403
404 return self._shutdownDeferred
405
409
410 - def eatFrom(self, eaterAlias, fullFeedId, host, port):
414
415 - def feedTo(self, feederName, fullFeedId, host, port):
419
420
421
423 """
424 Authenticate the given keycard.
425 Gets proxied to L{flumotion.component.bouncers.bouncer.""" \
426 """BouncerMedium.remote_authenticate}
427 The component should be a subclass of
428 L{flumotion.component.bouncers.bouncer.Bouncer}
429
430 @type keycard: L{flumotion.common.keycards.Keycard}
431 """
432 return self.mindCallRemote('authenticate', keycard)
433
435 """
436 Remove a keycard managed by this bouncer because the requester
437 has gone.
438
439 @type keycardId: str
440 """
441 return self.mindCallRemote('removeKeycardId', keycardId)
442
444 """
445 Expire a keycard issued to this component because the bouncer decided
446 to.
447
448 @type keycardId: str
449 """
450 return self.mindCallRemote('expireKeycard', keycardId)
451
453 """
454 Expire keycards issued to this component because the bouncer
455 decided to.
456
457 @type keycardIds: sequence of str
458 """
459 return self.mindCallRemote('expireKeycards', keycardIds)
460
462 """
463 Resets the expiry timeout for keycards issued by issuerName.
464
465 @param issuerName: the issuer for which keycards should be kept
466 alive; that is to say, keycards with the
467 attribute 'issuerName' set to this value will
468 have their ttl values reset.
469 @type issuerName: str
470 @param ttl: the new expiry timeout
471 @type ttl: number
472 """
473 return self.mindCallRemote('keepAlive', issuerName, ttl)
474
475
476
478 """
479 Called by a component to tell the manager that it's shutting down
480 cleanly (and thus should go to sleeping, rather than lost or sad)
481 """
482 self.debug("shutdown is clean, shouldn't go to lost")
483 self._shutdown_requested = True
484
502
504 """
505 Expire a keycard (and thus the requester's connection)
506 issued to the given requester.
507
508 This is called by the bouncer component that authenticated the keycard.
509
510 @param requesterId: name (avatarId) of the component that originally
511 requested authentication for the given keycardId
512 @type requesterId: str
513 @param keycardId: id of keycard to expire
514 @type keycardId: str
515 """
516
517 if not self.heaven.hasAvatar(requesterId):
518 self.warning('asked to expire keycard %s for requester %s, '
519 'but no such component registered',
520 keycardId, requesterId)
521 raise errors.UnknownComponentError(requesterId)
522
523 return self.heaven.getAvatar(requesterId).expireKeycard(keycardId)
524
526 """
527 Expire multiple keycards (and thus the requester's connections)
528 issued to the given requester.
529
530 This is called by the bouncer component that authenticated
531 the keycards.
532
533 @param requesterId: name (avatarId) of the component that originally
534 requested authentication for the given keycardId
535 @type requesterId: str
536 @param keycardIds: sequence of id of keycards to expire
537 @type keycardIds: sequence of str
538 """
539 if not self.heaven.hasAvatar(requesterId):
540 self.warning('asked to expire %d keycards for requester %s, '
541 'but no such component registered',
542 len(keycardIds), requesterId)
543 raise errors.UnknownComponentError(requesterId)
544
545 return self.heaven.getAvatar(requesterId).expireKeycards(keycardIds)
546
547
549
550 - def add(self, key, value):
551 if key not in self:
552 self[key] = []
553 self[key].append(value)
554
555 - def remove(self, key, value):
556 self[key].remove(value)
557 if not self[key]:
558 del self[key]
559
560
561 -class FeedMap(object, log.Loggable):
562 logName = 'feed-map'
563
565
566 self.avatars = {}
567 self._ordered_avatars = []
568 self._dirty = True
569 self._recalc()
570
572 assert avatar.avatarId not in self.avatars
573 self.avatars[avatar.avatarId] = avatar
574 self._ordered_avatars.append(avatar)
575 self._dirty = True
576
578
579
580 del self.avatars[avatar.avatarId]
581 self._ordered_avatars.remove(avatar)
582 self._dirty = True
583
584
585 return [(a, f) for a, f in self.feedDeps.pop(avatar, [])
586 if a.avatarId in self.avatars]
587
601
603 if not self._dirty:
604 return
605 self.feedersForEaters = ffe = {}
606 self.eatersForFeeders = eff = dictlist()
607 self.feeds = dictlist()
608 self.feedDeps = dictlist()
609
610 for comp in self._ordered_avatars:
611 for feederName in comp.getFeeders():
612 self.feeds.add(comp.getFullFeedId(feederName),
613 (comp, feederName))
614 for ffid, pair in comp.getVirtualFeeds().items():
615 self.feeds.add(ffid, pair)
616
617 for eater in self.avatars.values():
618 for pairs in eater.getEaters().values():
619 for feedId, eName in pairs:
620 feeder, fName = self._getFeederAvatar(eater, feedId)
621 if feeder:
622 ffe[eater.getFullFeedId(eName)] = (
623 eName, feeder, fName)
624 eff.add(feeder.getFullFeedId(fName),
625 (fName, eater, eName))
626 else:
627 self.debug('eater %s waiting for feed %s to log in',
628 eater.getFeedId(eName), feedId)
629 self._dirty = False
630
632 """Get the set of feeds that this component is eating from,
633 keyed by eater alias.
634
635 @return: a list of (eaterAlias, feederAvatar, feedName) tuples
636 @rtype: list of (str, ComponentAvatar, str)
637 """
638 self._recalc()
639 ret = []
640 for tups in avatar.getEaters().values():
641 for feedId, alias in tups:
642 ffid = avatar.getFullFeedId(alias)
643 if ffid in self.feedersForEaters:
644 ret.append(self.feedersForEaters[ffid])
645 return ret
646
648 """Get the set of feeds that this component is eating from
649 for the given feedId.
650
651 @param avatar: the eater component
652 @type avatar: L{ComponentAvatar}
653 @param ffid: full feed id for which to return feeders
654 @type ffid: str
655 @return: a list of (eaterAlias, feederAvatar, feedName) tuples
656 @rtype: list of (str, L{ComponentAvatar}, str)
657 """
658 self._recalc()
659 ret = []
660 for feeder, feedName in self.feeds.get(ffid, []):
661 rffid = feeder.getFullFeedId(feedName)
662 eff = self.eatersForFeeders.get(rffid, [])
663 for fName, eater, eaterName in eff:
664 if eater == avatar:
665 ret.append((eaterName, feeder, feedName))
666 return ret
667
669 """Get the set of eaters that this component feeds, keyed by
670 feeder name.
671
672 @return: a list of (feederName, eaterAvatar, eaterAlias) tuples
673 @rtype: list of (str, ComponentAvatar, str)
674 """
675 self._recalc()
676 ret = []
677 for feedName in avatar.getFeeders():
678 ffid = avatar.getFullFeedId(feedName)
679 if ffid in self.eatersForFeeders:
680 ret.extend(self.eatersForFeeders[ffid])
681 return ret
682
683
685 """
686 I handle all registered components and provide L{ComponentAvatar}s
687 for them.
688 """
689
690 implements(interfaces.IHeaven)
691 avatarClass = ComponentAvatar
692
693 logCategory = 'comp-heaven'
694
699
700
701
710
721
723 master = avatar.getClockMaster()
724 if master:
725 if master == avatar.avatarId:
726 self.debug('Need for %r to provide a clock master',
727 master)
728 avatar.provideMasterClock()
729 else:
730 self.debug('Need to synchronize with clock master %r',
731 master)
732
733
734
735
736 m = self.vishnu.getComponentMapper(master)
737 if m and m.avatar:
738 clocking = m.avatar.clocking
739 if clocking:
740 host, port, base_time = clocking
741 avatar.setClocking(host, port, base_time)
742 else:
743 self.warning('%r should provide a clock master '
744 'but is not doing so', master)
745
746 else:
747 self.debug('clock master not logged in yet, will '
748 'set clocking later')
749
756
758 assert avatar.avatarId not in self.avatars
759 compsNeedingReconnect = self.feedMap.componentDetached(avatar)
760 if self.vishnu.running:
761 self.debug('will reconnect: %r', compsNeedingReconnect)
762
763
764 for comp, ffid in compsNeedingReconnect:
765 self._connectEaters(comp, ffid)
766
768 """
769 @param fromAvatar: the avatar to connect from
770 @type fromAvatar: L{ComponentAvatar}
771 @param fromAvatar: the avatar to connect to
772 @type toAvatar: L{ComponentAvatar}
773
774 @returns: the host and port on which to make the connection to
775 toAvatar from fromAvatar
776 @rtype: tuple of (str, int or None)
777 """
778 toHost = toAvatar.getClientAddress()
779 toPort = toAvatar.getFeedServerPort()
780
781
782
783
784
785
786
787 fromHost = fromAvatar.mind.broker.transport.getPeer().host
788 if fromHost == toHost:
789 toHost = '127.0.0.1'
790
791 self.debug('mapNetFeed from %r to %r: %s:%r', fromAvatar, toAvatar,
792 toHost, toPort)
793 return toHost, toPort
794
805
807
808
809 def always(otherComp):
810 return True
811
812 def never(otherComp):
813 return False
814 directions = [(self.feedMap.getFeedersForEaters,
815 always, 'eatFrom', 'feedTo'),
816 (self.feedMap.getEatersForFeeders,
817 never, 'feedTo', 'eatFrom')]
818
819 myComp = avatar
820 for getPeers, initiate, directMethod, reversedMethod in directions:
821 for myFeedName, otherComp, otherFeedName in getPeers(myComp):
822 if initiate(otherComp):
823
824 self._connectFeederToEater(myComp, myFeedName, otherComp,
825 otherFeedName, directMethod)
826 else:
827
828 self._connectFeederToEater(otherComp, otherFeedName,
829 myComp, myFeedName,
830 reversedMethod)
831
833
834 ffe = self.feedMap.getFeedersForEater(avatar, ffid)
835 for myFeedName, otherComp, otherFeedName in ffe:
836 self._connectFeederToEater(avatar, myFeedName, otherComp,
837 otherFeedName, 'eatFrom')
838