1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22 import gobject
23 import gst
24 import gst.interfaces
25 from twisted.internet.threads import deferToThread
26 from twisted.internet import defer
27
28 from flumotion.common import gstreamer, errors, log, messages
29 from flumotion.common.i18n import N_, gettexter
30 from flumotion.twisted import defer as fdefer
31 from flumotion.worker.checks import check
32
33 __version__ = "$Rev$"
34 T_ = gettexter()
35
36
49
50
51 -def do_element_check(pipeline_str, element_name, check_proc, state=None,
52 set_state_deferred=False):
53 """
54 Parse the given pipeline and set it to the given state.
55 When the bin reaches that state, perform the given check function on the
56 element with the given name.
57
58 @param pipeline_str: description of the pipeline used to test
59 @param element_name: name of the element being checked
60 @param check_proc: a function to call with the GstElement as argument.
61 @param state: an unused keyword parameter that will be removed when
62 support for GStreamer 0.8 is dropped.
63 @param set_state_deferred: a flag to say whether the set_state is run in
64 a deferToThread
65 @type set_state_deferred: bool
66 @returns: a deferred that will fire with the result of check_proc, or
67 fail.
68 @rtype: L{twisted.internet.defer.Deferred}
69 """
70
71 def run_check(pipeline, resolution):
72 element = pipeline.get_by_name(element_name)
73 try:
74 retval = check_proc(element)
75 resolution.callback(retval)
76 except check.CheckProcError, e:
77 log.debug('check', 'CheckProcError when running %r: %r',
78 check_proc, e.data)
79 resolution.errback(errors.RemoteRunError(e.data))
80 except Exception, e:
81 log.debug('check', 'Unhandled exception while running %r: %r',
82 check_proc, e)
83 resolution.errback(errors.RemoteRunError(
84 log.getExceptionMessage(e)))
85
86
87 pipeline.set_state(gst.STATE_NULL)
88
89 def message_rcvd(bus, message, pipeline, resolution):
90 t = message.type
91 if t == gst.MESSAGE_STATE_CHANGED:
92 if message.src == pipeline:
93 old, new, pending = message.parse_state_changed()
94 if new == gst.STATE_PLAYING:
95 run_check(pipeline, resolution)
96 elif t == gst.MESSAGE_ERROR:
97 gerror, debug = message.parse_error()
98
99
100 pipeline.set_state(gst.STATE_NULL)
101 resolution.errback(errors.GStreamerGstError(
102 message.src, gerror, debug))
103 elif t == gst.MESSAGE_EOS:
104 resolution.errback(errors.GStreamerError(
105 "Unexpected end of stream"))
106 else:
107 log.debug('check', 'message: %s: %s:' % (
108 message.src.get_path_string(),
109 message.type.value_nicks[1]))
110 if message.structure:
111 log.debug('check', 'message: %s' %
112 message.structure.to_string())
113 else:
114 log.debug('check', 'message: (no structure)')
115 return True
116
117 resolution = BusResolution()
118
119 log.debug('check', 'parsing pipeline %s' % pipeline_str)
120 try:
121 pipeline = gst.parse_launch(pipeline_str)
122 log.debug('check', 'parsed pipeline %s' % pipeline_str)
123 except gobject.GError, e:
124 resolution.errback(errors.GStreamerError(e.message))
125 return resolution.d
126
127 bus = pipeline.get_bus()
128 bus.add_signal_watch()
129 signal_id = bus.connect('message', message_rcvd, pipeline, resolution)
130
131 resolution.signal_id = signal_id
132 resolution.pipeline = pipeline
133 log.debug('check', 'setting state to playing')
134 if set_state_deferred:
135 d = deferToThread(pipeline.set_state, gst.STATE_PLAYING)
136
137 def stateChanged(res):
138 return resolution.d
139 d.addCallback(stateChanged)
140 return d
141 else:
142 pipeline.set_state(gst.STATE_PLAYING)
143 return resolution.d
144
145
147 """
148 Probe the firewire device.
149
150 Return a deferred firing a result.
151
152 The result is either:
153 - succesful, with a None value: no device found
154 - succesful, with a dictionary of width, height, and par as a num/den pair
155 - failed
156
157 @param mid: the id to set on the message.
158 @param guid: the id of the selected device.
159
160 @rtype: L{twisted.internet.defer.Deferred} of
161 L{flumotion.common.messages.Result}
162 """
163 result = messages.Result()
164
165 def do_check(demux):
166 pad = demux.get_pad('video')
167
168 if not pad or pad.get_negotiated_caps() == None:
169 raise errors.GStreamerError('Pipeline failed to negotiate?')
170
171 caps = pad.get_negotiated_caps()
172 s = caps.get_structure(0)
173 w = s['width']
174 h = s['height']
175 par = s['pixel-aspect-ratio']
176 result = dict(width=w, height=h, par=(par.num, par.denom))
177 log.debug('check', 'returning dict %r' % result)
178 return result
179
180 pipeline = \
181 'dv1394src guid=%s ! dvdemux name=demux .video ! fakesink' % guid
182
183 d = do_element_check(pipeline, 'demux', do_check)
184
185 def errbackResult(failure):
186 log.debug('check', 'returning failed Result, %r' % failure)
187 m = None
188 if failure.check(errors.GStreamerGstError):
189 source, gerror, debug = failure.value.args
190 log.debug('check', 'GStreamer GError: %s (debug: %s)' % (
191 gerror.message, debug))
192 if gerror.domain == "gst-resource-error-quark":
193 if gerror.code == int(gst.RESOURCE_ERROR_NOT_FOUND):
194
195
196 version = gstreamer.get_plugin_version('1394')
197 if version >= (0, 10, 0, 0) and version <= (0, 10, 2, 0):
198 m = messages.Error(T_(
199 N_("Could not find or open the Firewire device. "
200 "Check the device node and its permissions.")))
201 else:
202 m = messages.Error(T_(
203 N_("No Firewire device found.")))
204 elif gerror.code == int(gst.RESOURCE_ERROR_OPEN_READ):
205 m = messages.Error(T_(
206 N_("Could not open Firewire device for reading. "
207 "Check permissions on the device.")))
208
209 if not m:
210 m = check.handleGStreamerDeviceError(failure, 'Firewire',
211 mid=mid)
212
213 if not m:
214 m = messages.Error(T_(N_("Could not probe Firewire device.")),
215 debug=check.debugFailure(failure))
216
217 m.id = mid
218 result.add(m)
219 return result
220 d.addCallback(check.callbackResult, result)
221 d.addErrback(errbackResult)
222
223 return d
224