Package flumotion :: Package component :: Package decoders :: Package generic :: Module generic
[hide private]

Source Code for Module flumotion.component.decoders.generic.generic

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import gst 
 23  import gobject 
 24  import threading 
 25   
 26  from flumotion.component import decodercomponent as dc 
 27  from flumotion.common import messages 
 28  from flumotion.common.i18n import N_, gettexter 
 29   
 30  T_ = gettexter() 
 31   
 32  __version__ = "$Rev: 7162 $" 
 33   
 34  BASIC_AUDIO_CAPS = "audio/x-raw-int;audio/x-raw-float" 
 35  BASIC_VIDEO_CAPS = "video/x-raw-yuv;video/x-raw-rgb" 
 36   
 37  # FIXME: The GstAutoplugSelectResult enum has no bindings in gst-python. 
 38  # Replace this when the enum is exposed in the bindings. 
 39   
 40  GST_AUTOPLUG_SELECT_TRY = 0 
 41  GST_AUTOPLUG_SELECT_SKIP = 2 
 42   
 43   
44 -class FeederInfo(object):
45
46 - def __init__(self, name, caps, linked=False):
47 self.name = name 48 self.caps = caps
49 50
51 -class SyncKeeper(gst.Element):
52 __gstdetails__ = ('SyncKeeper', 'Generic', 53 'Retimestamp the output to be contiguous and maintain ' 54 'the sync', 'Xavier Queralt') 55 _audiosink = gst.PadTemplate("audio-in", 56 gst.PAD_SINK, 57 gst.PAD_ALWAYS, 58 gst.caps_from_string(BASIC_AUDIO_CAPS)) 59 _videosink = gst.PadTemplate("video-in", 60 gst.PAD_SINK, 61 gst.PAD_ALWAYS, 62 gst.caps_from_string(BASIC_VIDEO_CAPS)) 63 _audiosrc = gst.PadTemplate("audio-out", 64 gst.PAD_SRC, 65 gst.PAD_ALWAYS, 66 gst.caps_from_string(BASIC_AUDIO_CAPS)) 67 _videosrc = gst.PadTemplate("video-out", 68 gst.PAD_SRC, 69 gst.PAD_ALWAYS, 70 gst.caps_from_string(BASIC_VIDEO_CAPS)) 71
72 - def __init__(self):
73 gst.Element.__init__(self) 74 75 # create source pads 76 self.audiosrc = gst.Pad(self._audiosrc, "audio-out") 77 self.add_pad(self.audiosrc) 78 self.videosrc = gst.Pad(self._videosrc, "video-out") 79 self.add_pad(self.videosrc) 80 81 # create the sink pads and set the chain and event function 82 self.audiosink = gst.Pad(self._audiosink, "audio-in") 83 self.audiosink.set_chain_function(lambda pad, buffer: 84 self.chainfunc(pad, buffer, self.audiosrc)) 85 self.audiosink.set_event_function(lambda pad, buffer: 86 self.eventfunc(pad, buffer, self.audiosrc)) 87 self.add_pad(self.audiosink) 88 self.videosink = gst.Pad(self._videosink, "video-in") 89 self.videosink.set_chain_function(lambda pad, buffer: 90 self.chainfunc(pad, buffer, self.videosrc)) 91 self.videosink.set_event_function(lambda pad, buffer: 92 self.eventfunc(pad, buffer, self.videosrc)) 93 self.add_pad(self.videosink) 94 95 # all this variables need to be protected with a lock!!! 96 self._lock = threading.Lock() 97 self._totalTime = 0L 98 self._syncTimestamp = 0L 99 self._syncOffset = 0L 100 self._resetReceived = True 101 self._sendNewSegment = True
102
103 - def _send_new_segment(self):
104 for pad in [self.videosrc, self.audiosrc]: 105 pad.push_event( 106 gst.event_new_new_segment(True, 1.0, gst.FORMAT_TIME, 107 self._syncTimestamp, -1, 0)) 108 self._sendNewSegment = False
109
110 - def _update_sync_point(self, start, position):
111 # Only update the sync point if we haven't received any buffer 112 # (totalTime == 0) or we received a reset 113 if not self._totalTime and not self._resetReceived: 114 return 115 self._syncTimestamp = self._totalTime 116 self._syncOffset = start + (start - position) 117 self._resetReceived = False 118 self.info("Update sync point to % r, offset to %r" % 119 (gst.TIME_ARGS(self._syncTimestamp), 120 (gst.TIME_ARGS(self._syncOffset))))
121
122 - def chainfunc(self, pad, buf, srcpad):
123 self.log("Input %s timestamp: %s, %s" % 124 (srcpad is self.audiosrc and 'audio' or 'video', 125 gst.TIME_ARGS(buf.timestamp), 126 gst.TIME_ARGS(buf.duration))) 127 128 if not self._sendNewSegment: 129 self._send_new_segment() 130 131 try: 132 self._lock.acquire() 133 try: 134 buf.timestamp += self._syncTimestamp - self._syncOffset 135 except TypeError: 136 buf.timestamp = 0 137 dur = buf.duration != gst.CLOCK_TIME_NONE and buf.duration or 0 138 self._totalTime = max(buf.timestamp + dur, self._totalTime) 139 140 self.log("Output %s timestamp: %s, %s" % 141 (srcpad is self.audiosrc and 'audio' or 'video', 142 gst.TIME_ARGS(buf.timestamp), 143 gst.TIME_ARGS(buf.duration))) 144 finally: 145 self._lock.release() 146 147 srcpad.push(buf) 148 return gst.FLOW_OK
149
150 - def eventfunc(self, pad, event, srcpad):
151 self.debug("Received event %r from %s" % (event, event.src)) 152 try: 153 self._lock.acquire() 154 if event.type == gst.EVENT_NEWSEGMENT: 155 u, r, f, start, s, position = event.parse_new_segment() 156 self._update_sync_point(start, position) 157 if event.get_structure().get_name() == 'flumotion-reset': 158 self._resetReceived = True 159 self._send_new_segment = True 160 finally: 161 self._lock.release() 162 163 # forward all the events except the new segment events 164 if event.type != gst.EVENT_NEWSEGMENT: 165 return srcpad.push_event(event) 166 return True
167 168 gobject.type_register(SyncKeeper) 169 gst.element_register(SyncKeeper, "synckeeper", gst.RANK_MARGINAL) 170 171
172 -class GenericDecoder(dc.DecoderComponent):
173 """ 174 Generic decoder component using decodebin2. 175 176 It listen to the custom gstreamer event flumotion-reset, 177 and reset the decoding pipeline by removing the old one 178 and creating a new one. 179 180 Sub-classes must override _get_feeders_info() and return 181 a list of FeederInfo instances that describe the decoder 182 output. 183 184 When reset, if the new decoded pads do not match the 185 previously negotiated caps, feeder will not be connected, 186 and the decoder will go sad. 187 """ 188 189 logCategory = "gen-decoder" 190 feeder_tmpl = ("identity name=%(ename)s single-segment=true " 191 "silent=true ! %(caps)s ! @feeder:%(pad)s@") 192 193 ### Public Methods ### 194
195 - def init(self):
196 self._feeders_info = None # {FEEDER_NAME: FeederInfo}
197
198 - def get_pipeline_string(self, properties):
199 # Retrieve feeder info and build a dict out of it 200 finfo = self._get_feeders_info() 201 assert finfo, "No feeder info specified" 202 self._feeders_info = dict([(i.name, i) for i in finfo]) 203 204 pipeline_parts = [self._get_base_pipeline_string()] 205 206 for i in self._feeders_info.values(): 207 ename = self._get_output_element_name(i.name) 208 pipeline_parts.append( 209 self.feeder_tmpl % dict(ename=ename, caps=i.caps, pad=i.name)) 210 211 pipeline_str = " ".join(pipeline_parts) 212 self.log("Decoder pipeline: %s", pipeline_str) 213 214 self._blacklist = properties.get('blacklist', []) 215 216 return pipeline_str
217
218 - def configure_pipeline(self, pipeline, properties):
219 dc.DecoderComponent.configure_pipeline(self, pipeline, 220 properties) 221 222 decoder = self.pipeline.get_by_name("decoder") 223 decoder.connect('autoplug-select', self._autoplug_select_cb)
224 225 ### Protected Methods ## 226
228 return 'decodebin2 name=decoder'
229
230 - def _get_feeders_info(self):
231 """ 232 Must be overridden to returns a tuple of FeederInfo. 233 """ 234 return None
235 236 ### Private Methods ### 237
238 - def _get_output_element_name(self, feed_name):
239 return "%s-output" % feed_name
240 241 ### Callbacks ### 242
243 - def _autoplug_select_cb(self, decoder, pad, caps, factory):
244 if factory.get_name() in self._blacklist: 245 self.log("Skipping element %s because it's in the blacklist", 246 factory.get_name()) 247 return GST_AUTOPLUG_SELECT_SKIP 248 return GST_AUTOPLUG_SELECT_TRY
249 250
251 -class SingleGenericDecoder(GenericDecoder):
252 253 logCategory = "sgen-decoder" 254 255 _caps_lookup = {'audio': BASIC_AUDIO_CAPS, 256 'video': BASIC_VIDEO_CAPS} 257
258 - def init(self):
259 self._media_type = None
260
261 - def check_properties(self, properties, addMessage):
262 media_type = properties.get("media-type") 263 if media_type not in ["audio", "video"]: 264 msg = 'Property media-type can only be "audio" or "video"' 265 m = messages.Error(T_(N_(msg)), mid="error-decoder-media-type") 266 addMessage(m) 267 else: 268 self._media_type = media_type
269
270 - def _get_feeders_info(self):
271 caps = self._caps_lookup[self._media_type] 272 return FeederInfo('default', caps),
273 274
275 -class AVGenericDecoder(GenericDecoder):
276 277 logCategory = "avgen-decoder" 278 feeder_tmpl = ("identity name=%(ename)s silent=true ! %(caps)s ! " 279 "sync.%(pad)s-in sync.%(pad)s-out ! @feeder:%(pad)s@") 280
281 - def _get_feeders_info(self):
282 return (FeederInfo('audio', BASIC_AUDIO_CAPS), 283 FeederInfo('video', BASIC_VIDEO_CAPS))
284
286 return 'decodebin2 name=decoder synckeeper name=sync'
287