1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 """worker-side objects to handle worker clients
23 """
24
25 import signal
26
27 from twisted.internet import defer, error, reactor
28 from zope.interface import implements
29
30 from flumotion.common import errors, interfaces, log
31 from flumotion.worker import medium, job, feedserver
32 from flumotion.twisted.defer import defer_call_later
33
34 __version__ = "$Rev$"
35
36
38 logCategory = "proxybouncer"
39
40 """
41 I am a bouncer that proxies authenticate calls to a remote FPB root
42 object.
43 """
44
46 """
47 @param remote: an object that has .callRemote()
48 """
49 self._remote = remote
50
52 """
53 Call me before asking me to authenticate, so I know what I can
54 authenticate.
55 """
56 return self._remote.callRemote('getKeycardClasses')
57
62
63
64
65
67 """
68 I am the main object in the worker process, managing jobs and everything
69 related.
70 I live in the main worker process.
71
72 @ivar authenticator: authenticator worker used to log in to manager
73 @type authenticator L{flumotion.twisted.pb.Authenticator}
74 @ivar medium:
75 @type medium: L{medium.WorkerMedium}
76 @ivar jobHeaven:
77 @type jobHeaven: L{job.ComponentJobHeaven}
78 @ivar checkHeaven:
79 @type checkHeaven: L{job.CheckJobHeaven}
80 @ivar workerClientFactory:
81 @type workerClientFactory: L{medium.WorkerClientFactory}
82 @ivar feedServerPort: TCP port the Feed Server is listening on
83 @type feedServerPort: int
84 """
85
86 implements(interfaces.IFeedServerParent)
87
88 logCategory = 'workerbrain'
89
91 """
92 @param options: the optparsed dictionary of command-line options
93 @type options: an object with attributes
94 """
95 self.options = options
96 self.workerName = options.name
97
98
99 if not self.options.randomFeederports:
100 self.ports = self.options.feederports[:-1]
101 else:
102 self.ports = []
103
104 self.medium = medium.WorkerMedium(self)
105
106
107 self.jobHeaven = job.ComponentJobHeaven(self)
108
109 self.checkHeaven = job.CheckJobHeaven(self)
110
111 self.managerConnectionInfo = None
112
113
114
115 self.feedServer = None
116
117 self.stopping = False
118 reactor.addSystemEventTrigger('before', 'shutdown',
119 self.shutdownHandler)
120 self._installHUPHandler()
121
123
124 def sighup(signum, frame):
125 if self._oldHUPHandler:
126 self.log('got SIGHUP, calling previous handler %r',
127 self._oldHUPHandler)
128 self._oldHUPHandler(signum, frame)
129 self.debug('telling kids about new log file descriptors')
130 self.jobHeaven.rotateChildLogFDs()
131
132 handler = signal.signal(signal.SIGHUP, sighup)
133 if handler == signal.SIG_DFL or handler == signal.SIG_IGN:
134 self._oldHUPHandler = None
135 else:
136 self._oldHUPHandler = handler
137
139 """
140 Start listening on FeedServer (incoming eater requests) and
141 JobServer (through which we communicate with our children) ports
142
143 @returns: True if we successfully listened on both ports
144 """
145
146 try:
147 self.feedServer = self._makeFeedServer()
148 except error.CannotListenError, e:
149 self.warning("Failed to listen on feed server port: %r", e)
150 return False
151
152 try:
153 self.jobHeaven.listen()
154 except error.CannotListenError, e:
155 self.warning("Failed to listen on job server port: %r", e)
156 return False
157
158 try:
159 self.checkHeaven.listen()
160 except error.CannotListenError, e:
161 self.warning("Failed to listen on check server port: %r", e)
162 return False
163
164 return True
165
167 """
168 @returns: L{flumotion.worker.feedserver.FeedServer}
169 """
170 port = None
171 if self.options.randomFeederports:
172 port = 0
173 elif not self.options.feederports:
174 self.info('Not starting feed server because no port is '
175 'configured')
176 return None
177 else:
178 port = self.options.feederports[-1]
179
180 return feedserver.FeedServer(self, ProxyBouncer(self), port)
181
182 - def login(self, managerConnectionInfo):
183 self.managerConnectionInfo = managerConnectionInfo
184 self.medium.startConnecting(managerConnectionInfo)
185
186 - def callRemote(self, methodName, *args, **kwargs):
188
190 if self.stopping:
191 self.warning("Already shutting down, ignoring shutdown request")
192 return
193
194 self.info("Reactor shutting down, stopping jobHeaven")
195 self.stopping = True
196
197 l = [self.jobHeaven.shutdown(), self.checkHeaven.shutdown()]
198 if self.feedServer:
199 l.append(self.feedServer.shutdown())
200
201 return defer_call_later(defer.DeferredList(l))
202
203
204
205 - def feedToFD(self, componentId, feedName, fd, eaterId):
206 """
207 Called from the FeedAvatar to pass a file descriptor on to
208 the job running the component for this feeder.
209
210 @returns: whether the fd was successfully handed off to the component.
211 """
212 if componentId not in self.jobHeaven.avatars:
213 self.warning("No such component %s running", componentId)
214 return False
215
216 avatar = self.jobHeaven.avatars[componentId]
217 return avatar.sendFeed(feedName, fd, eaterId)
218
219 - def eatFromFD(self, componentId, eaterAlias, fd, feedId):
220 """
221 Called from the FeedAvatar to pass a file descriptor on to
222 the job running the given component.
223
224 @returns: whether the fd was successfully handed off to the component.
225 """
226 if componentId not in self.jobHeaven.avatars:
227 self.warning("No such component %s running", componentId)
228 return False
229
230 avatar = self.jobHeaven.avatars[componentId]
231 return avatar.receiveFeed(eaterAlias, fd, feedId)
232
233
234
236 return self.ports, self.options.randomFeederports
237
239 if self.feedServer:
240 return self.feedServer.getPortNum()
241 else:
242 return None
243
244 - def create(self, avatarId, type, moduleName, methodName, nice,
245 conf):
257
258 def spawnJob(bundles):
259 return self.jobHeaven.spawn(avatarId, type, moduleName,
260 methodName, nice, bundles, conf)
261
262 def createError(failure):
263 failure.trap(errors.ComponentCreateError)
264 self.debug('create deferred for %s failed, forwarding error',
265 avatarId)
266 return failure
267
268 def success(res):
269 self.debug('create deferred for %s succeeded (%r)',
270 avatarId, res)
271 return res
272
273 self.info('Starting component "%s" of type "%s"', avatarId,
274 type)
275 d = getBundles()
276 d.addCallback(spawnJob)
277 d.addCallback(success)
278 d.addErrback(createError)
279 return d
280
281 - def runCheck(self, module, function, *args, **kwargs):
286
287 def runCheck(bundles):
288 return self.checkHeaven.runCheck(bundles, module, function,
289 *args, **kwargs)
290
291 d = getBundles()
292 d.addCallback(runCheck)
293 return d
294
297
298 - def killJob(self, avatarId, signum):
300