Package flumotion :: Package component :: Package base :: Module watcher
[hide private]

Source Code for Module flumotion.component.base.watcher

  1  # -*- Mode: Python -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import time 
 24   
 25  from twisted.internet import reactor 
 26   
 27  from flumotion.common import log 
 28   
 29  __version__ = "$Rev$" 
 30   
 31   
32 -class BaseWatcher(log.Loggable):
33 """I watch for file changes. 34 35 I am a base class for a file watcher. I can be specialized to watch 36 any set of files. 37 """ 38
39 - def __init__(self, timeout):
40 """Make a file watcher object. 41 42 @param timeout: timeout between checks, in seconds 43 @type timeout: int 44 """ 45 self.timeout = timeout 46 self._reset() 47 self._subscribeId = 0 48 self.subscribers = {}
49
50 - def _reset(self):
51 self._stableData = {} 52 self._changingData = {} 53 self._delayedCall = None
54
55 - def _subscribe(self, **events):
56 """Subscribe to events. 57 58 @param events: The events to subscribe to. Subclasses are 59 expected to formalize this dict, specifying which events they 60 support via declaring their kwargs explicitly. 61 62 @returns: A subscription ID that can later be passed to 63 unsubscribe(). 64 """ 65 sid = self._subscribeId 66 self._subscribeId += 1 67 self.subscribers[sid] = events 68 return sid
69
70 - def subscribe(self, fileChanged=None, fileDeleted=None):
71 """Subscribe to events. 72 73 @param fileChanged: A function to call when a file changes. This 74 function will only be called if the file's details (size, mtime) 75 do not change during the timeout period. 76 @type fileChanged: filename -> None 77 @param fileDeleted: A function to call when a file is deleted. 78 @type fileDeleted: filename -> None 79 80 @returns: A subscription ID that can later be passed to 81 unsubscribe(). 82 """ 83 return self._subscribe(fileChanged=fileChanged, 84 fileDeleted=fileDeleted)
85
86 - def unsubscribe(self, id):
87 """Unsubscribe from file change notifications. 88 89 @param id: Subscription ID received from subscribe() 90 """ 91 del self.subscribers[id]
92
93 - def event(self, event, *args, **kwargs):
94 """Fire an event. 95 96 This method is intended for use by object implementations. 97 """ 98 for s in self.subscribers.values(): 99 if s[event]: 100 # Exceptions raised by subscribers need to be catched to 101 # continue polling for changes 102 try: 103 s[event](*args, **kwargs) 104 except Exception, e: 105 self.warning("A callback for event %s raised an error: %s" 106 % (event, log.getExceptionMessage(e)))
107 108 # FIXME: this API has tripped up two people thus far, including its 109 # author. make subscribe() call start() if necessary? 110
111 - def start(self):
112 """Start checking for file changes. 113 114 Subscribers will be notified asynchronously of changes to the 115 watched files. 116 """ 117 118 def checkFiles(): 119 self.log("checking for file changes") 120 new = self.getFileData() 121 changing = self._changingData 122 stable = self._stableData 123 for f in new: 124 if f not in changing: 125 if not f in stable and self.isNewFileStable(f, new[f]): 126 self.debug('file %s stable when noted', f) 127 stable[f] = new[f] 128 self.event('fileChanged', f) 129 elif f in stable and new[f] == stable[f]: 130 # no change 131 pass 132 else: 133 self.debug('change start noted for %s', f) 134 changing[f] = new[f] 135 else: 136 if new[f] == changing[f]: 137 self.debug('change finished for %s', f) 138 del changing[f] 139 stable[f] = new[f] 140 self.event('fileChanged', f) 141 else: 142 self.log('change continues for %s', f) 143 changing[f] = new[f] 144 for f in stable.keys(): 145 if f not in new: 146 # deletion 147 del stable[f] 148 self.debug('file %s has been deleted', f) 149 self.event('fileDeleted', f) 150 for f in changing.keys(): 151 if f not in new: 152 self.debug('file %s has been deleted', f) 153 del changing[f] 154 self._delayedCall = reactor.callLater(self.timeout, 155 checkFiles)
156 157 assert self._delayedCall is None 158 checkFiles()
159
160 - def stop(self):
161 """Stop checking for file changes. 162 """ 163 self._delayedCall.cancel() 164 self._reset()
165
166 - def getFileData(self):
167 """ 168 @returns: a dict, {filename => DATA} 169 DATA can be anything. In the default implementation it is a pair 170 of (mtime, size). 171 """ 172 ret = {} 173 for f in self.getFilesToStat(): 174 try: 175 stat = os.stat(f) 176 ret[f] = (stat.st_mtime, stat.st_size) 177 except OSError, e: 178 self.debug('could not read file %s: %s', f, 179 log.getExceptionMessage(e)) 180 return ret
181
182 - def isNewFileStable(self, fName, fData):
183 """ 184 Check if the file is already stable when being added to the 185 set of watched files. 186 187 @param fName: filename 188 @type fName: str 189 @param fData: DATA, as returned by L{getFileData} method. In 190 the default implementation it is a pair of 191 (mtime, size). 192 193 @rtype: bool 194 """ 195 __pychecker__ = 'unusednames=fName' 196 197 ret = fData[0] + self.timeout < time.time() 198 return ret
199
200 - def getFilesToStat(self):
201 """ 202 @returns: sequence of filename 203 """ 204 raise NotImplementedError
205 206
207 -class DirectoryWatcher(BaseWatcher):
208 """ 209 Directory Watcher 210 Watches a directory for new files. 211 """ 212
213 - def __init__(self, path, ignorefiles=(), timeout=30):
214 BaseWatcher.__init__(self, timeout) 215 self.path = path 216 self._ignorefiles = ignorefiles
217
218 - def getFilesToStat(self):
219 return [os.path.join(self.path, f) 220 for f in os.listdir(self.path) 221 if f not in self._ignorefiles]
222 223
224 -class FilesWatcher(BaseWatcher):
225 """ 226 Watches a collection of files for modifications. 227 """ 228
229 - def __init__(self, files, timeout=30):
230 BaseWatcher.__init__(self, timeout) 231 self._files = files
232
233 - def getFilesToStat(self):
234 return self._files
235