Trees | Indices | Help |
---|
|
1 # -*- Mode: Python; test-case-name:flumotion.test.test_worker_worker -*- 2 # vi:si:et:sw=4:sts=4:ts=4 3 # 4 # Flumotion - a streaming media server 5 # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 6 # All rights reserved. 7 8 # This file may be distributed and/or modified under the terms of 9 # the GNU General Public License version 2 as published by 10 # the Free Software Foundation. 11 # This file is distributed without any warranty; without even the implied 12 # warranty of merchantability or fitness for a particular purpose. 13 # See "LICENSE.GPL" in the source distribution for more information. 14 15 # Licensees having purchased or holding a valid Flumotion Advanced 16 # Streaming Server license may use this file in accordance with the 17 # Flumotion Advanced Streaming Server Commercial License Agreement. 18 # See "LICENSE.Flumotion" in the source distribution for more information. 19 20 # Headers in this file shall remain intact. 21 22 """ 23 worker-side objects to handle worker clients 24 """ 25 26 import os 27 import sys 28 import signal 29 30 from twisted.cred import portal 31 from twisted.internet import defer, reactor 32 from twisted.spread import pb 33 from zope.interface import implements 34 35 from flumotion.common import errors, log 36 from flumotion.common import worker, startset 37 from flumotion.common.process import signalPid 38 from flumotion.twisted import checkers, fdserver 39 from flumotion.twisted import pb as fpb 40 41 __version__ = "$Rev$" 42 43 JOB_SHUTDOWN_TIMEOUT = 5 44 4547 # FIXME: there is mkstemp for sockets, so we have a small window 48 # here in which the socket could be created by something else 49 # I didn't succeed in preparing a socket file with that name either 50 51 # caller needs to delete name before using 52 import tempfile 53 fd, name = tempfile.mkstemp('.%d' % os.getpid(), 'flumotion.worker.') 54 os.close(fd) 55 56 return name57 5860 """ 61 I hold information about a job. 62 63 @cvar pid: PID of the child process 64 @type pid: int 65 @cvar avatarId: avatar identification string 66 @type avatarId: str 67 @cvar type: type of the component to create 68 @type type: str 69 @cvar moduleName: name of the module to create the component from 70 @type moduleName: str 71 @cvar methodName: the factory method to use to create the component 72 @type methodName: str 73 @cvar nice: the nice level to run the job as 74 @type nice: int 75 @cvar bundles: ordered list of (bundleName, bundlePath) needed to 76 create the component 77 @type bundles: list of (str, str) 78 """ 79 __slots__ = ('pid', 'avatarId', 'type', 'moduleName', 'methodName', 80 'nice', 'bundles') 8191 9284 self.pid = pid 85 self.avatarId = avatarId 86 self.type = type 87 self.moduleName = moduleName 88 self.methodName = methodName 89 self.nice = nice 90 self.bundles = bundles94135 13696 self._startSet = startSet 97 self._deferredStart = startSet.createRegistered(avatarId) 98 worker.ProcessProtocol.__init__(self, heaven, avatarId, 99 'component', 100 heaven.getWorkerName())101103 heaven = self.loggable 104 heaven.brain.callRemote('componentAddMessage', self.avatarId, 105 message)106108 heaven = self.loggable 109 dstarts = self._startSet 110 signum = status.value.signal 111 112 # we need to trigger a failure on the create deferred 113 # if the job failed before logging in to the worker; 114 # otherwise the manager still thinks it's starting up when it's 115 # dead. If the job already attached to the worker however, 116 # the create deferred will already have callbacked. 117 deferred = dstarts.createRegistered(self.avatarId) 118 if deferred is self._deferredStart: 119 if signum: 120 reason = "received signal %d" % signum 121 else: 122 reason = "unknown reason" 123 text = ("Component '%s' has exited early (%s)." % 124 (self.avatarId, reason)) 125 dstarts.createFailed(self.avatarId, 126 errors.ComponentCreateError(text)) 127 128 if dstarts.shutdownRegistered(self.avatarId): 129 dstarts.shutdownSuccess(self.avatarId) 130 131 heaven.jobStopped(self.pid) 132 133 # chain up 134 worker.ProcessProtocol.processEnded(self, status)138 """ 139 I am similar to but not quite the same as a manager-side Heaven. 140 I manage avatars inside the worker for job processes spawned by the worker. 141 142 @ivar avatars: dict of avatarId -> avatar 143 @type avatars: dict of str -> L{base.BaseJobAvatar} 144 @ivar brain: the worker brain 145 @type brain: L{worker.WorkerBrain} 146 """ 147 148 logCategory = "job-heaven" 149 implements(portal.IRealm) 150 151 avatarClass = None 152279 ret.addCallback(stopListening) 280 return ret 281154 """ 155 @param brain: a reference to the worker brain 156 @type brain: L{worker.WorkerBrain} 157 """ 158 self.avatars = {} # componentId -> avatar 159 self.brain = brain 160 self._socketPath = _getSocketPath() 161 self._port = None 162 self._onShutdown = None # If set, a deferred to fire when 163 # our last child process exits 164 165 self._jobInfos = {} # processid -> JobInfo 166 167 self._startSet = startset.StartSet( 168 lambda x: x in self.avatars, 169 errors.ComponentAlreadyStartingError, 170 errors.ComponentAlreadyRunningError)171173 assert self._port is None 174 assert self.avatarClass is not None 175 # FIXME: we should hand a username and password to log in with to 176 # the job process instead of allowing anonymous 177 checker = checkers.FlexibleCredentialsChecker() 178 checker.allowPasswordless(True) 179 p = portal.Portal(self, [checker]) 180 f = pb.PBServerFactory(p) 181 try: 182 os.unlink(self._socketPath) 183 except OSError: 184 pass 185 186 # Rather than a listenUNIX(), we use listenWith so that we can specify 187 # our particular Port, which creates Transports that we know how to 188 # pass FDs over. 189 self.debug("Listening for FD's on unix socket %s", self._socketPath) 190 port = reactor.listenWith(fdserver.FDPort, self._socketPath, f) 191 self._port = port192 193 ### portal.IRealm method 194196 if pb.IPerspective in interfaces: 197 avatar = self.avatarClass(self, avatarId, mind) 198 assert avatarId not in self.avatars 199 self.avatars[avatarId] = avatar 200 return pb.IPerspective, avatar, avatar.logout 201 else: 202 raise NotImplementedError("no interface")203205 if avatarId in self.avatars: 206 del self.avatars[avatarId] 207 else: 208 self.warning("some programmer is telling me about an avatar " 209 "I have no idea about: %s", avatarId)210212 """ 213 Gets the name of the worker that spawns the process. 214 215 @rtype: str 216 """ 217 return self.brain.workerName218 221 224 227229 return self._jobInfos.keys()230232 self.debug('telling kids about new log file descriptors') 233 for avatar in self.avatars.values(): 234 avatar.logTo(sys.stdout.fileno(), sys.stderr.fileno())235237 if pid in self._jobInfos: 238 self.debug('Removing job info for %d', pid) 239 del self._jobInfos[pid] 240 241 if not self._jobInfos and self._onShutdown: 242 self.debug("Last child exited") 243 self._onShutdown.callback(None) 244 else: 245 self.warning("some programmer is telling me about a pid " 246 "I have no idea about: %d", pid)247249 self.debug('Shutting down JobHeaven') 250 self.debug('Stopping all jobs') 251 for avatar in self.avatars.values(): 252 avatar.stop() 253 254 if self.avatars: 255 # If our jobs fail to shut down nicely within some period of 256 # time, shut them down less nicely 257 dc = reactor.callLater(JOB_SHUTDOWN_TIMEOUT, self.kill) 258 259 def cancelDelayedCall(res, dc): 260 # be nice to unit tests 261 if dc.active(): 262 dc.cancel() 263 return res264 265 self._onShutdown = defer.Deferred() 266 self._onShutdown.addCallback(cancelDelayedCall, dc) 267 ret = self._onShutdown 268 else: 269 # everything's gone already, return success 270 ret = defer.succeed(None) 271 272 def stopListening(_): 273 # possible for it to be None, if we haven't been told to 274 # listen yet, as in some test cases 275 if self._port: 276 port = self._port 277 self._port = None 278 return port.stopListening()283 self.warning("Killing all children immediately") 284 for pid in self.getJobPids(): 285 self.killJobByPid(pid, signum)286288 if pid not in self._jobInfos: 289 raise errors.UnknownComponentError(pid) 290 291 jobInfo = self._jobInfos[pid] 292 self.debug("Sending signal %d to job %s at pid %d", signum, 293 jobInfo.avatarId, jobInfo.pid) 294 signalPid(jobInfo.pid, signum)295297 for job in self._jobInfos.values(): 298 if job.avatarId == avatarId: 299 self.killJobByPid(job.pid, signum)300 301303 """ 304 I am an avatar for the job living in the worker. 305 """ 306 logCategory = 'job-avatar' 307363309 """ 310 @type heaven: L{flumotion.worker.base.BaseJobHeaven} 311 @type avatarId: str 312 """ 313 fpb.Avatar.__init__(self, avatarId) 314 self._heaven = heaven 315 self.setMind(mind) 316 self.pid = None317319 """ 320 @param mind: reference to the job's JobMedium on which we can call 321 @type mind: L{twisted.spread.pb.RemoteReference} 322 """ 323 fpb.Avatar.setMind(self, mind) 324 self.haveMind()325 329331 self.log('logout called, %s disconnected', self.avatarId) 332 333 self._heaven.removeAvatar(self.avatarId)334 340342 try: 343 # FIXME: pay attention to the return value of 344 # sendFileDescriptor; is the same as the return value of 345 # sendmsg(2) 346 self.mind.broker.transport.sendFileDescriptor(fd, message) 347 return True 348 except OSError, e: 349 # OSError is what is thrown by the C code doing this 350 # when there are issues 351 self.warning("Error %s sending file descriptors", 352 log.getExceptionMessage(e)) 353 return False354356 """ 357 Tell the job to log to the given file descriptors. 358 """ 359 self.debug('Giving job new stdout and stderr') 360 if self.mind: 361 self._sendFileDescriptor(stdout, "redirectStdout") 362 self._sendFileDescriptor(stdout, "redirectStderr")
Trees | Indices | Help |
---|
Generated by Epydoc 3.0.1 on Fri Jun 17 07:44:45 2011 | http://epydoc.sourceforge.net |