1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import time
23
24 import gst
25 from twisted.cred import credentials
26 from twisted.internet import reactor, error, defer
27 from twisted.web import server
28 from zope.interface import implements
29
30 from flumotion.common import gstreamer, errors
31 from flumotion.common import messages, netutils, interfaces
32 from flumotion.common.format import formatStorage, formatTime
33 from flumotion.common.i18n import N_, gettexter
34 from flumotion.component import feedcomponent
35 from flumotion.component.base import http
36 from flumotion.component.component import moods
37 from flumotion.component.consumers.httpstreamer import resources
38 from flumotion.component.misc.porter import porterclient
39 from flumotion.twisted import fdserver
40
41 __all__ = ['HTTPMedium', 'MultifdSinkStreamer']
42 __version__ = "$Rev$"
43 T_ = gettexter()
44 STATS_POLL_INTERVAL = 10
45 UI_UPDATE_THROTTLE_PERIOD = 2.0
46
47
48
49
50
51
53
55 self.sink = sink
56
57 self.no_clients = 0
58 self.clients_added_count = 0
59 self.clients_removed_count = 0
60 self.start_time = time.time()
61
62 self.peak_client_number = 0
63 self.peak_epoch = self.start_time
64 self.load_deltas = [0, 0]
65 self._load_deltas_period = 10
66 self._load_deltas_ongoing = [time.time(), 0, 0]
67 self._currentBitrate = -1
68 self._lastBytesReceived = -1
69
70
71 self.average_client_number = 0
72 self.average_time = self.start_time
73
74 self.hostname = "localhost"
75 self.port = 0
76 self.mountPoint = "/"
77
79
80 now = time.time()
81
82 dt1 = self.average_time - self.start_time
83 dc1 = self.average_client_number
84 dt2 = now - self.average_time
85 dc2 = self.no_clients
86 self.average_time = now
87 if dt1 == 0:
88
89 self.average_client_number = 0
90 else:
91 dt = dt1 + dt2
92 before = (dc1 * dt1) / dt
93 after = dc2 * dt2 / dt
94 self.average_client_number = before + after
95
97 self._updateAverage()
98
99 self.no_clients += 1
100 self.clients_added_count +=1
101
102
103 if self.no_clients >= self.peak_client_number:
104 self.peak_epoch = time.time()
105 self.peak_client_number = self.no_clients
106
108 self._updateAverage()
109 self.no_clients -= 1
110 self.clients_removed_count +=1
111
113 """
114 Periodically, update our statistics on load deltas, and update the
115 UIState with new values for total bytes, bitrate, etc.
116 """
117
118 oldtime, oldadd, oldremove = self._load_deltas_ongoing
119 add, remove = self.clients_added_count, self.clients_removed_count
120 now = time.time()
121 diff = float(now - oldtime)
122
123 self.load_deltas = [(add-oldadd)/diff, (remove-oldremove)/diff]
124 self._load_deltas_ongoing = [now, add, remove]
125
126 bytesReceived = self.getBytesReceived()
127 if self._lastBytesReceived >= 0:
128 self._currentBitrate = ((bytesReceived - self._lastBytesReceived) *
129 8 / STATS_POLL_INTERVAL)
130 self._lastBytesReceived = bytesReceived
131
132 self.update_ui_state()
133
134 self._updateCallLaterId = reactor.callLater(STATS_POLL_INTERVAL,
135 self._updateStats)
136
138 if self._currentBitrate >= 0:
139 return self._currentBitrate
140 else:
141 return self.getBytesReceived() * 8 / self.getUptime()
142
144 return self.sink.get_property('bytes-served')
145
147 return self.sink.get_property('bytes-to-serve')
148
150 return time.time() - self.start_time
151
153 return self.no_clients
154
156 return self.peak_client_number
157
159 return self.peak_epoch
160
162 return self.average_client_number
163
165 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
166
168 return self.load_deltas
169
171 c = self
172
173 bytes_sent = c.getBytesSent()
174 bytes_received = c.getBytesReceived()
175 uptime = c.getUptime()
176
177 set('stream-mime', c.get_mime())
178 set('stream-url', c.getUrl())
179 set('stream-uptime', formatTime(uptime))
180 bitspeed = bytes_received * 8 / uptime
181 currentbitrate = self.getCurrentBitrate()
182 set('stream-bitrate', formatStorage(bitspeed) + 'bit/s')
183 set('stream-current-bitrate',
184 formatStorage(currentbitrate) + 'bit/s')
185 set('stream-totalbytes', formatStorage(bytes_received) + 'Byte')
186 set('stream-bitrate-raw', bitspeed)
187 set('stream-totalbytes-raw', bytes_received)
188
189 set('clients-current', str(c.getClients()))
190 set('clients-max', str(c.getMaxClients()))
191 set('clients-peak', str(c.getPeakClients()))
192 set('clients-peak-time', c.getPeakEpoch())
193 set('clients-average', str(int(c.getAverageClients())))
194
195 bitspeed = bytes_sent * 8 / uptime
196 set('consumption-bitrate', formatStorage(bitspeed) + 'bit/s')
197 set('consumption-bitrate-current',
198 formatStorage(currentbitrate * c.getClients()) + 'bit/s')
199 set('consumption-totalbytes', formatStorage(bytes_sent) + 'Byte')
200 set('consumption-bitrate-raw', bitspeed)
201 set('consumption-totalbytes-raw', bytes_sent)
202
203
204 -class HTTPMedium(feedcomponent.FeedComponentMedium):
205
211
213 """
214 @rtype: L{twisted.internet.defer.Deferred} firing a keycard or None.
215 """
216 d = self.callRemote('authenticate', bouncerName, keycard)
217 return d
218
219 - def keepAlive(self, bouncerName, issuerName, ttl):
220 """
221 @rtype: L{twisted.internet.defer.Deferred}
222 """
223 return self.callRemote('keepAlive', bouncerName, issuerName, ttl)
224
226 """
227 @rtype: L{twisted.internet.defer.Deferred}
228 """
229 return self.callRemote('removeKeycardId', bouncerName, keycardId)
230
231
232
235
238
241
244
247
250
253
256
257
258
259
261 implements(interfaces.IStreamingComponent)
262
263 checkOffset = True
264
265
266 logCategory = 'cons-http'
267
268 pipe_template = 'multifdsink name=sink ' + \
269 'sync=false ' + \
270 'recover-policy=3'
271
272 componentMediumClass = HTTPMedium
273
275 reactor.debug = True
276 self.debug("HTTP streamer initialising")
277
278 self.caps = None
279 self.resource = None
280 self.httpauth = None
281 self.mountPoint = None
282 self.burst_on_connect = False
283
284 self.description = None
285
286 self.type = None
287
288
289 self._pbclient = None
290 self._porterUsername = None
291 self._porterPassword = None
292 self._porterPath = None
293
294
295
296 self.port = None
297
298 self.iface = None
299
300 self._tport = None
301
302 self._updateCallLaterId = None
303 self._lastUpdate = 0
304 self._updateUI_DC = None
305
306 self._pending_removals = {}
307
308 for i in ('stream-mime', 'stream-uptime', 'stream-current-bitrate',
309 'stream-bitrate', 'stream-totalbytes', 'clients-current',
310 'clients-max', 'clients-peak', 'clients-peak-time',
311 'clients-average', 'consumption-bitrate',
312 'consumption-bitrate-current',
313 'consumption-totalbytes', 'stream-bitrate-raw',
314 'stream-totalbytes-raw', 'consumption-bitrate-raw',
315 'consumption-totalbytes-raw', 'stream-url'):
316 self.uiState.addKey(i, None)
317
320
323
325
326 if props.get('type', 'master') == 'slave':
327 for k in 'socket-path', 'username', 'password':
328 if not 'porter-' + k in props:
329 raise errors.ConfigError("slave mode, missing required"
330 " property 'porter-%s'" % k)
331
332 if 'burst-size' in props and 'burst-time' in props:
333 raise errors.ConfigError('both burst-size and burst-time '
334 'set, cannot satisfy')
335
336
337 version = gstreamer.get_plugin_version('tcp')
338 if version < (0, 10, 9, 1):
339 m = messages.Error(T_(N_(
340 "Version %s of the '%s' GStreamer plug-in is too old.\n"),
341 ".".join(map(str, version)), 'multifdsink'))
342 m.add(T_(N_("Please upgrade '%s' to version %s."),
343 'gst-plugins-base', '0.10.10'))
344 addMessage(m)
345
347 if self.burst_on_connect:
348 if self.burst_time and \
349 gstreamer.element_factory_has_property('multifdsink',
350 'units-max'):
351 self.debug("Configuring burst mode for %f second burst",
352 self.burst_time)
353
354
355 sink.set_property('sync-method', 4)
356 sink.set_property('burst-unit', 2)
357 sink.set_property('burst-value',
358 long(self.burst_time * gst.SECOND))
359
360
361
362
363 sink.set_property('time-min',
364 long((self.burst_time + 5) * gst.SECOND))
365
366 sink.set_property('unit-type', 2)
367 sink.set_property('units-soft-max',
368 long((self.burst_time + 8) * gst.SECOND))
369 sink.set_property('units-max',
370 long((self.burst_time + 10) * gst.SECOND))
371 elif self.burst_size:
372 self.debug("Configuring burst mode for %d kB burst",
373 self.burst_size)
374
375
376
377
378
379 sink.set_property('sync-method', 'burst-keyframe')
380 sink.set_property('burst-unit', 'bytes')
381 sink.set_property('burst-value', self.burst_size * 1024)
382
383
384
385
386 sink.set_property('bytes-min', (self.burst_size + 512) * 1024)
387
388
389
390
391
392
393
394 sink.set_property('buffers-soft-max',
395 (self.burst_size + 1024) / 4)
396 sink.set_property('buffers-max',
397 (self.burst_size + 2048) / 4)
398
399 else:
400
401 self.debug("simple burst-on-connect, setting sync-method 2")
402 sink.set_property('sync-method', 2)
403
404 sink.set_property('buffers-soft-max', 250)
405 sink.set_property('buffers-max', 500)
406 else:
407 self.debug("no burst-on-connect, setting sync-method 0")
408 sink.set_property('sync-method', 0)
409
410 sink.set_property('buffers-soft-max', 250)
411 sink.set_property('buffers-max', 500)
412
526
528 return '<MultifdSinkStreamer (%s)>' % self.name
529
531 return self.resource.maxclients
532
534 if self.caps:
535 return self.caps.get_structure(0).get_name()
536
538 mime = self.get_mime()
539 if mime == 'multipart/x-mixed-replace':
540 mime += ";boundary=ThisRandomString"
541 return mime
542
544 return "http://%s:%d%s" % (self.hostname, self.port, self.mountPoint)
545
547 socket = 'flumotion.component.plugs.streamdata.StreamDataProviderPlug'
548 if self.plugs[socket]:
549 plug = self.plugs[socket][-1]
550 return plug.getStreamData()
551 else:
552 return {'protocol': 'HTTP',
553 'description': self.description,
554 'url': self.getUrl()}
555
557 """Return a tuple (deltaadded, deltaremoved, bytes_transferred,
558 current_clients, current_load) of our current bandwidth and
559 user values.
560 The deltas are estimates of how much bitrate is added, removed
561 due to client connections, disconnections, per second.
562 """
563
564
565 deltaadded, deltaremoved = self.getLoadDeltas()
566
567 bytes_received = self.getBytesReceived()
568 uptime = self.getUptime()
569 bitrate = bytes_received * 8 / uptime
570
571 bytes_sent = self.getBytesSent()
572 clients_connected = self.getClients()
573 current_load = bitrate * clients_connected
574
575 return (deltaadded * bitrate, deltaremoved * bitrate, bytes_sent,
576 clients_connected, current_load)
577
581
585
587 """Remove all the clients.
588
589 Returns a deferred fired once all clients have been removed.
590 """
591 if self.resource:
592
593 self.debug("Asking for all clients to be removed")
594 return self.resource.removeAllClients()
595
597 """Update the uiState object.
598 Such updates (through this function) are throttled to a maximum rate,
599 to avoid saturating admin clients with traffic when many clients are
600 connecting/disconnecting.
601 """
602
603 def setIfChanged(k, v):
604 if self.uiState.get(k) != v:
605 self.uiState.set(k, v)
606
607 def update_ui_state_later():
608 self._updateUI_DC = None
609 self.update_ui_state()
610
611 now = time.time()
612
613
614 if now - self._lastUpdate >= UI_UPDATE_THROTTLE_PERIOD:
615 if self._updateUI_DC:
616 self._updateUI_DC.cancel()
617 self._updateUI_DC = None
618
619 self._lastUpdate = now
620
621
622 self.updateState(setIfChanged)
623 elif not self._updateUI_DC:
624
625
626 self._updateUI_DC = reactor.callLater(UI_UPDATE_THROTTLE_PERIOD,
627 update_ui_state_later)
628
633
635 self.log('[fd %5d] client_removed_handler, reason %s', fd, reason)
636 if reason.value_name == 'GST_CLIENT_STATUS_ERROR':
637 self.warning('[fd %5d] Client removed because of write error' % fd)
638
639 self.resource.clientRemoved(sink, fd, reason, stats)
640 Stats.clientRemoved(self)
641 self.update_ui_state()
642
643
644
646 caps = pad.get_negotiated_caps()
647 if caps == None:
648 return
649
650 caps_str = gstreamer.caps_repr(caps)
651 self.debug('Got caps: %s' % caps_str)
652
653 if not self.caps == None:
654 self.warning('Already had caps: %s, replacing' % caps_str)
655
656 self.debug('Storing caps: %s' % caps_str)
657 self.caps = caps
658
659 reactor.callFromThread(self.update_ui_state)
660
661
662
663
664
665
666
667
669 stats = sink.emit('get-stats', fd)
670 self._pending_removals[fd] = (stats, reason)
671
672
673
679
680
681
683 if self._updateCallLaterId:
684 self._updateCallLaterId.cancel()
685 self._updateCallLaterId = None
686
687 if self.httpauth:
688 self.httpauth.stopKeepAlive()
689
690 if self._tport:
691 self._tport.stopListening()
692
693 l = []
694
695
696 clients = self.remove_all_clients()
697 if clients:
698 l.append(clients)
699
700 if self.type == 'slave' and self._pbclient:
701 l.append(self._pbclient.deregisterPath(self.mountPoint))
702 return defer.DeferredList(l)
703
705 """Provide a new set of porter login information, for when we're
706 in slave mode and the porter changes.
707 If we're currently connected, this won't disconnect - it'll just change
708 the information so that next time we try and connect we'll use the
709 new ones
710 """
711 if self.type == 'slave':
712 self._porterUsername = username
713 self._porterPassword = password
714
715 creds = credentials.UsernamePassword(self._porterUsername,
716 self._porterPassword)
717
718 self._pbclient.startLogin(creds, self._pbclient.medium)
719
720
721 if path != self._porterPath:
722 self.debug("Changing porter login to use \"%s\"", path)
723 self._porterPath = path
724 self._pbclient.stopTrying()
725
726 self._pbclient.resetDelay()
727 reactor.connectWith(
728 fdserver.FDConnector, self._porterPath,
729 self._pbclient, 10, checkPID=False)
730 else:
731 raise errors.WrongStateError(
732 "Can't specify porter details in master mode")
733
746
748 root = resources.HTTPRoot()
749
750 mount = self.mountPoint[1:]
751 root.putChild(mount, self.resource)
752 if self.type == 'slave':
753
754
755
756
757
758
759
760
761
762
763
764
765
766 self._porterDeferred = d = defer.Deferred()
767 mountpoints = [self.mountPoint]
768 self._pbclient = porterclient.HTTPPorterClientFactory(
769 server.Site(resource=root), mountpoints, d)
770
771 creds = credentials.UsernamePassword(self._porterUsername,
772 self._porterPassword)
773 self._pbclient.startLogin(creds, self._pbclient.medium)
774
775 self.info("Starting porter login at \"%s\"", self._porterPath)
776
777 reactor.connectWith(
778 fdserver.FDConnector, self._porterPath,
779 self._pbclient, 10, checkPID=False)
780 else:
781
782 try:
783 iface = self.iface or ""
784 self.info('Listening on port %d, interface=%r',
785 self.port, iface)
786 self._tport = reactor.listenTCP(
787 self.port, server.Site(resource=root),
788 interface=iface)
789 except error.CannotListenError:
790 t = 'Port %d is not available.' % self.port
791 self.warning(t)
792 m = messages.Error(T_(N_(
793 "Network error: TCP port %d is not available."),
794 self.port))
795 self.addMessage(m)
796 self.setMood(moods.sad)
797 return defer.fail(errors.ComponentSetupHandledError(t))
798