Package flumotion :: Package component :: Package misc :: Package httpserver :: Module ratecontrol
[hide private]

Source Code for Module flumotion.component.misc.httpserver.ratecontrol

  1  # -*- Mode: Python; test-case-name: -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007,2008 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  __version__ = "$Rev$" 
 23   
 24  import time 
 25   
 26  from flumotion.common import log 
 27   
 28  from twisted.internet import reactor 
 29   
 30  from flumotion.component.plugs import base as plugbase 
 31   
 32   
33 -class RateControllerPlug(plugbase.ComponentPlug):
34 35 # Create a producer-consumer proxy that sits between a FileTransfer object 36 # and a request object. 37 # You may return a Deferred here. 38
39 - def createProducerConsumerProxy(self, consumer, request):
40 pass
41 42
43 -class RateControllerFixedPlug(RateControllerPlug):
44
45 - def __init__(self, args):
46 props = args['properties'] 47 self._rateBytesPerSec = int(props.get('rate', 128000) / 8) 48 # Peak level is 10 seconds of data; this is chosen 49 # entirely arbitrarily. 50 self._maxLevel = int(props.get('max-level', 51 self._rateBytesPerSec * 8 * 10) / 8) 52 self._initialLevel = int(props.get('initial-level', 0) / 8)
53
54 - def createProducerConsumerProxy(self, consumer, request):
55 return TokenBucketConsumer(consumer, self._maxLevel, 56 self._rateBytesPerSec, self._initialLevel)
57 58
59 -class TokenBucketConsumer(log.Loggable):
60 """ 61 Use a token bucket to proxy between a producer (e.g. FileTransfer) and a 62 consumer (TCP protocol, etc.), doing rate control. 63 64 The bucket has a rate and a maximum level, so a small burst can be 65 permitted. The initial level can be set to a non-zero value, this is 66 useful to implement burst-on-connect behaviour. 67 68 TODO: This almost certainly only works with producers that work like 69 FileTransfer - i.e. they produce data directly in resumeProducing, and 70 ignore pauseProducing. This is sufficient for our needs right now. 71 """ 72 73 logCategory = 'token-bucket' 74 75 # NOTE: Performance is strongly correlated with this value. 76 # Low values (e.g. 0.2) give a 'smooth' transfer, but very high cpu usage 77 # if you have several hundred clients. 78 # Higher values (e.g. 1.0 or more) give bursty transfer, but nicely lower 79 # cpu usage. 80 _dripInterval = 1.0 # If we need to wait for more bits in our bucket, wait 81 # at least this long, to avoid overly frequent small 82 # writes 83
84 - def __init__(self, consumer, maxLevel, fillRate, fillLevel=0):
85 self.maxLevel = maxLevel # in bytes 86 self.fillRate = fillRate # in bytes per second 87 self.fillLevel = fillLevel # in bytes 88 89 self._buffers = [] # List of (offset, buffer) tuples 90 self._buffersSize = 0 91 92 self._finishing = False # If true, we'll stop once the current buffer 93 # has been sent. 94 95 self._unregister = False # If true, we'll unregister from the consumer 96 # once the data has been sent. 97 98 self._lastDrip = time.time() 99 self._dripDC = None 100 self._paused = True 101 102 self.producer = None # we get this in registerProducer. 103 self.consumer = consumer 104 105 # We are implemented as a push producer. We forcibly push some 106 # data every couple of seconds to maintain the requested 107 # rate. If the consumer cannot keep up with that rate we want 108 # to get a pauseProducing() call, so we will stop 109 # writing. Otherwise the data would have been buffered on the 110 # server side, leading to excessive memory consumption. 111 self.consumer.registerProducer(self, 1) 112 113 self.info("Created TokenBucketConsumer with rate %d, " 114 "initial level %d, maximum level %d", 115 fillRate, fillLevel, maxLevel)
116
117 - def _dripAndTryWrite(self):
118 """ 119 Re-fill our token bucket based on how long it has been since we last 120 refilled it. 121 Then attempt to write some data. 122 """ 123 self._dripDC = None 124 125 now = time.time() 126 elapsed = now - self._lastDrip 127 self._lastDrip = now 128 129 bytes = self.fillRate * elapsed 130 # Note that this does introduce rounding errors - not particularly 131 # important if the drip interval is reasonably high, though. These will 132 # cause the actual rate to be lower than the nominal rate. 133 self.fillLevel = int(min(self.fillLevel + bytes, self.maxLevel)) 134 135 self._tryWrite()
136
137 - def _tryWrite(self):
138 if not self.consumer: 139 return 140 141 while self.fillLevel > 0 and self._buffersSize > 0: 142 # If we're permitted to write at the moment, do so. 143 offset, buf = self._buffers[0] 144 sendbuf = buf[offset:offset+self.fillLevel] 145 bytes = len(sendbuf) 146 147 if bytes + offset == len(buf): 148 self._buffers.pop(0) 149 else: 150 self._buffers[0] = (offset+bytes, buf) 151 self._buffersSize -= bytes 152 153 self.consumer.write(sendbuf) 154 self.fillLevel -= bytes 155 156 if self._buffersSize > 0: 157 # If we have data (and we're not already waiting for our next drip 158 # interval), wait... this is what actually performs the data 159 # throttling. 160 if not (self._dripDC or self._paused): 161 self._dripDC = reactor.callLater(self._dripInterval, 162 self._dripAndTryWrite) 163 else: 164 # No buffer remaining; ask for more data or finish 165 if self._finishing: 166 if self._unregister: 167 self._doUnregister() 168 self._doFinish() 169 elif self.producer: 170 self.producer.resumeProducing() 171 elif self._unregister: 172 self._doUnregister()
173
174 - def _doUnregister(self):
175 self.consumer.unregisterProducer() 176 self._unregister = False
177
178 - def _doFinish(self):
179 self.debug('consumer <- finish()') 180 self.consumer.finish() 181 self._finishing = False
182
183 - def stopProducing(self):
184 self.debug('stopProducing; buffered data: %d', self._buffersSize) 185 if self.producer is not None: 186 self.producer.stopProducing() 187 188 if self._dripDC: 189 # don't produce after stopProducing()! 190 self._dripDC.cancel() 191 self._dripDC = None 192 193 # ...and then, we still may have pending things to do 194 if self._unregister: 195 self._doUnregister() 196 197 if self._finishing: 198 self._finishing = False 199 self.consumer.finish() 200 201 if self._buffersSize > 0: 202 # make sure we release all the buffers, just in case 203 self._buffers = [] 204 self._buffersSize = 0 205 206 self.consumer = None
207
208 - def pauseProducing(self):
209 self._paused = True 210 211 # In case our producer is also 'push', we want it to stop. 212 # FIXME: Pull producers don't even need to implement that 213 # method, so we probably should remember what kind of producer 214 # are we dealing with and not call pauseProducing when it's 215 # 'pull'. 216 # However, all our producers (e.g. FileProducer) just 217 # ignore pauseProducing, so for now it works. 218 # 219 # FIXME: convert the following scenario into a unit test and remove it 220 # from here. It's rather lengthy for a comment. 221 # 222 # The producer might be None at this point if the following happened: 223 # 1) we resumeProducing() 224 # 2) we find out we're not permitted to write more, so we set up the 225 # callLater to write after self._dripInterval 226 # 3) the producer goes avay, unregisterProducer() gets called 227 # 4) the callLater fires and we _dripAndTryWrite() 228 # 5) we try to push some data to the consumer 229 # 6) but the consumer is not reading fast enough, Twisted calls 230 # pauseProducing() on us 231 # 7) at this point if self.producer is None we simply don't proxy the 232 # pauseProducing() call to him 233 if self.producer: 234 self.producer.pauseProducing() 235 236 # We have to stop dripping, otherwise we will keep on filling 237 # the buffers and eventually run out of memory. 238 if self._dripDC: 239 self._dripDC.cancel() 240 self._dripDC = None
241
242 - def resumeProducing(self):
243 self._paused = False 244 self._tryWrite() 245 246 if not self._buffers and self.producer: 247 self.producer.resumeProducing()
248
249 - def write(self, data):
250 self._buffers.append((0, data)) 251 self._buffersSize += len(data) 252 253 self._tryWrite() 254 255 if self._buffers and not self.fillLevel and self.producer: 256 # FIXME: That's not completely correct. See the comment in 257 # self.pauseProducing() about not calling pauseProducing 258 # on 'pull' producers. 259 self.producer.pauseProducing()
260
261 - def finish(self):
262 if self._dripDC: 263 self._finishing = True 264 elif self.consumer: 265 self._doFinish()
266
267 - def registerProducer(self, producer, streaming):
268 self.debug("Producer registered: %r", producer) 269 self.producer = producer 270 271 self.resumeProducing()
272
273 - def unregisterProducer(self):
274 self.debug('unregisterProducer; buffered data: %d', self._buffersSize) 275 if self.producer is not None: 276 self.producer = None 277 278 if not self._dripDC: 279 self._doUnregister() 280 else: 281 # we need to wait until we've written the data 282 self._unregister = True
283