1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """a data structure to manage asynchronous avatar starts and shutdowns
23 """
24
25 from twisted.internet import defer
26
27 from flumotion.common import log
28
29 __version__ = "$Rev$"
30
31
32
33
34
35
36
37
39
40 - def __init__(self, avatarLoggedIn, alreadyStartingError,
41 alreadyRunningError):
42 """Create a StartSet, a data structure for managing starts and
43 stops of remote processes, for example jobs in a jobheaven.
44
45 @param avatarLoggedIn: a procedure of type avatarId->boolean;
46 should return True if the avatarId is logged in and "ready", and
47 False otherwise. An avatarId is ready if avatarStarted() could
48 have been called on it. This interface is made this way because
49 it is assumed that whatever code instantiates a StartSet keeps
50 track of "ready" remote processes, and this way we prevent data
51 duplication.
52 @param alreadyStartingError: An exception class to raise if
53 createStart() is called, but there is already a create deferred
54 registered for that avatarId.
55 @param alreadyRunningError: An exception class to raise if
56 createStart() is called, but there is already a "ready" process
57 with that avatarId.
58 """
59 self._avatarLoggedIn = avatarLoggedIn
60 self._alreadyStartingError = alreadyStartingError
61 self._alreadyRunningError = alreadyRunningError
62
63 self._createDeferreds = {}
64 self._shutdownDeferreds = {}
65
67 """
68 Create and register a deferred for starting a given process.
69 The deferred will be fired when the process is ready, as
70 triggered by a call to createSuccess().
71
72 @param avatarId: the id of the remote process, for example the
73 avatarId of the job
74
75 @rtype: L{twisted.internet.defer.Deferred}
76 """
77 self.debug('making create deferred for %s', avatarId)
78
79 d = defer.Deferred()
80
81
82
83
84
85 if avatarId in self._createDeferreds:
86
87
88 self.info('already have a create deferred for %s', avatarId)
89 raise self._alreadyStartingError(avatarId)
90 elif avatarId in self._shutdownDeferreds:
91
92
93 self.debug('waiting for previous %s to shut down like it '
94 'said it would', avatarId)
95
96
97 def ensureShutdown(res,
98 shutdown=self._shutdownDeferreds[avatarId]):
99 shutdown.addCallback(lambda _: res)
100 return shutdown
101 d.addCallback(ensureShutdown)
102 elif self._avatarLoggedIn(avatarId):
103
104 self.info('avatar named %s already running', avatarId)
105 raise self._alreadyRunningError(avatarId)
106 else:
107
108 pass
109
110 self.debug('registering deferredCreate for %s', avatarId)
111 self._createDeferreds[avatarId] = d
112 return d
113
115 """
116 Trigger a deferred start previously registerd via createStart().
117 For example, a JobHeaven might call this method when a job has
118 logged in and been told to start a component.
119
120 @param avatarId: the id of the remote process, for example the
121 avatarId of the job
122 """
123 self.debug('triggering create deferred for %s', avatarId)
124 if not avatarId in self._createDeferreds:
125 self.warning('No create deferred registered for %s', avatarId)
126 return
127
128 d = self._createDeferreds[avatarId]
129 del self._createDeferreds[avatarId]
130
131 d.callback(avatarId)
132
134 """
135 Notify the caller that a create has failed, and remove the create
136 from the list of pending creates.
137
138 @param avatarId: the id of the remote process, for example the
139 avatarId of the job
140 @param exception: either an exception or a failure describing
141 why the create failed.
142 """
143 self.debug('create deferred failed for %s', avatarId)
144 if not avatarId in self._createDeferreds:
145 self.warning('No create deferred registered for %s', avatarId)
146 return
147
148 d = self._createDeferreds[avatarId]
149 del self._createDeferreds[avatarId]
150 d.errback(exception)
151
153 """
154 Check if a deferred create has been registered for the given avatarId.
155
156 @param avatarId: the id of the remote process, for example the
157 avatarId of the job
158
159 @returns: The deferred create, if one has been registered.
160 Otherwise None.
161 """
162 return self._createDeferreds.get(avatarId, None)
163
165 """
166 Create and register a deferred that will be fired when a process
167 has shut down cleanly.
168
169 @param avatarId: the id of the remote process, for example the
170 avatarId of the job
171
172 @rtype: L{twisted.internet.defer.Deferred}
173 """
174 self.debug('making shutdown deferred for %s', avatarId)
175
176 if avatarId in self._shutdownDeferreds:
177 self.warning('already have a shutdown deferred for %s',
178 avatarId)
179 return self._shutdownDeferreds[avatarId]
180 else:
181 self.debug('registering shutdown for %s', avatarId)
182 d = defer.Deferred()
183 self._shutdownDeferreds[avatarId] = d
184 return d
185
187 """
188 Trigger a callback on a deferred previously registered via
189 shutdownStart(). For example, a JobHeaven would call this when a
190 job for which shutdownStart() was called is reaped.
191
192 @param avatarId: the id of the remote process, for example the
193 avatarId of the job
194 """
195 self.debug('triggering shutdown deferred for %s', avatarId)
196 if not avatarId in self._shutdownDeferreds:
197 self.warning('No shutdown deferred registered for %s', avatarId)
198 return
199
200 d = self._shutdownDeferreds.pop(avatarId)
201 d.callback(avatarId)
202
204 """
205 Check if a deferred shutdown has been registered for the given
206 avatarId.
207
208 @param avatarId: the id of the remote process, for example the
209 avatarId of the job
210
211 @returns: True if a deferred shutdown has been registered for
212 this object, False otherwise
213 """
214 return avatarId in self._shutdownDeferreds
215
217 """
218 Notify the startset that an avatar has started. If there was a
219 create deferred registered for this avatar, this will cause
220 createSuccess() to be called.
221
222 @param avatarId: the id of the remote process, for example the
223 avatarId of the job
224 """
225 if avatarId in self._createDeferreds:
226 self.createSuccess(avatarId)
227 else:
228 self.log('avatar %s started, but we were not waiting for'
229 ' it', avatarId)
230
232 """
233 Notify the startset that an avatar has stopped. If there was a
234 shutdown deferred registered for this avatar, this will cause
235 shutdownSuccess() to be called.
236
237 On the other hand, if there was a create deferred still pending,
238 we will call createFailed with the result of calling getFailure.
239
240 If no start or create was registered, we do nothing.
241
242 @param avatarId: the id of the remote process, for example the
243 avatarId of the job
244 @param getFailure: procedure of type avatarId -> Failure. The
245 returned failure should describe the reason that the job failed.
246 """
247 if avatarId in self._createDeferreds:
248 self.createFailed(avatarId, getFailure(avatarId))
249 elif avatarId in self._shutdownDeferreds:
250 self.shutdownSuccess(avatarId)
251 else:
252 self.debug('unknown avatar %s logged out', avatarId)
253