import gnome import gobject import re import sys import cgi import os.path import deskbar import deskbar.core.Utils import deskbar.interfaces.Module import deskbar.interfaces.Match import deskbar.interfaces.Action from deskbar.core.Utils import is_program_in_path, spawn_async from deskbar.handlers.actions.OpenWithApplicationAction import \ OpenWithApplicationAction from deskbar.handlers.actions.OpenDesktopFileAction import \ OpenDesktopFileAction from deskbar.handlers.actions.ShowUrlAction import \ ShowUrlAction from gettext import gettext as _ MAX_RESULTS = 20 HANDLERS = ['TrackerStaticSearchHandler', 'TrackerLiveSearchHandler'] class TrackerStaticSearchMatch (deskbar.interfaces.Match): def __init__(self, **kwargs): deskbar.interfaces.Match.__init__(self, **kwargs) self.add_action (TrackerStaticSearchAction (kwargs['name'])) class TrackerStaticSearchAction (deskbar.interfaces.Action): def __init__(self, name): deskbar.interfaces.Action.__init__ (self, name) self.name = name def activate(self, text=None): gobject.spawn_async(['tracker-search-tool', self.name], \ flags=gobject.SPAWN_SEARCH_PATH) def get_verb(self): return _('Search for %s with Tracker Search Tool') % '%(name)s' def get_hash (self): return 't-s-t:'+self.name class TrackerStaticSearchHandler(deskbar.interfaces.Module): INFOS = { 'icon': deskbar.core.Utils.load_icon ('tracker'), 'name': _('Tracker Search'), 'description': _('Search with Tracker Search Tool'), } def __init__(self): deskbar.interfaces.Module.__init__(self) def query(self, query): self._emit_query_ready (query, [TrackerStaticSearchMatch(name=query, \ category='actions', priority=self.get_priority ())]) @staticmethod def has_requirements (): return is_program_in_path ('tracker-search-tool') #For now description param it's not used TYPES = { 'Applications': { 'description': (_('Launch %s (%s)') % ('%(name)s', '%(app_name)s') ), 'category': 'actions', }, 'GaimConversations': { 'description': (_('See %s conversation\n%s %s\nfrom %s') % ('%(proto)s', '%(channel)s', '%(conv_to)s', '%(time)s')), 'category': 'conversations', }, 'Email': { 'description': (_('Email from %s') % '%(publisher)s' ) + '\n%(title)s', 'category': 'emails', 'action' : 'evolution %(uri)s', }, 'Music': { 'description': _('Listen to music %s\nin %s') % ('%(base)s', '%(dir)s'), 'category': 'music', 'icon': 'audio-x-generic', }, 'Documents': { 'description': _('See document %s\nin %s') % ('%(base)s', '%(dir)s'), 'category': 'documents', }, 'Development Files': { 'description': _('Open file %s\nin %s') % ('%(base)s', '%(dir)s'), 'category': 'develop', }, 'Images': { 'description': _('View image %s\nin %s') % ('%(base)s', '%(dir)s'), 'category': 'images', 'icon': 'image-x-generic', }, 'Videos': { 'description': _('Watch video %s\nin %s') % ('%(base)s', '%(dir)s'), 'category': 'videos', 'icon': 'video-x-generic', }, 'Other Files': { 'description': _('Open file %s\nin %s') % ('%(base)s', '%(dir)s'), 'category': 'files', }, } class TrackerMoreMatch (deskbar.interfaces.Match): def __init__(self, backend, qstring, category='files', **args): deskbar.interfaces.Match.__init__(self, backend, **args) self._icon = deskbar.core.Utils.load_icon('tracker') self.qstring = qstring self.category = category def get_verb(self): return TYPES['Extra']['description'] def get_category (self): try: return TYPES[self.category]['category'] except: pass def action(self, text=None): gobject.spawn_async(['tracker-search-tool', self.qstring], flags=gobject.SPAWN_SEARCH_PATH) class TrackerLiveFileMatch (deskbar.interfaces.Match): def __init__(self, result=None, **args): deskbar.interfaces.Match.__init__ (self) print result # Set the match icon if result.has_key ('icon'): result['icon'] = result['icon'].strip () if result['icon'] == '': del result['icon'] else: self._icon = result['icon'] else: if TYPES[result['type']].has_key ('icon'): self._icon = TYPES[result['type']]['icon'] self.result = result if result['type'] == 'Applications': if result.has_key ('desktop'): self.add_action (OpenDesktopFileAction (result['name'], result['desktop'])) else: self.add_action (TrackerLiveSearchAction (result)) def get_name (self, text = None): return { 'name': self.result ['name'] } def get_verb(self): try: return TYPES[self.result['type']]['description'] except: return _('Open file %s\nin %s') % ('%(base)s', '%(dir)s') def get_hash(self, text=None): return self.result['uri'] def get_category (self): try: return TYPES[self.result['type']]['category'] except: return 'files' class TrackerLiveSearchAction (deskbar.interfaces.Action): def __init__ (self, result): deskbar.interfaces.Action (self) self.name = result['name'] self.result = result self.fullpath = result['uri'] self.init_names() self.result['base'] = self.base self.result['dir'] = self.dir def get_name(self, text=None): try: return self.result except: pass def get_hash(self, text=None): try: if self.result ['type'] == 'Applications': # return a name that matches the one returned by the Program handler of deskbar return 'generic_' + self.result ['app_basename'] return self.result['uri'] except: pass def get_verb(self): try: return TYPES[self.result['type']]['description'] except: return _('Open file %s\nin %s') % ('%(base)s', '%(dir)s') def activate (self, text=None): if TYPES[self.result['type']].has_key('action'): cmd = TYPES[self.result['type']]['action'] cmd = map(lambda arg : arg % self.result, cmd.split()) # we need this to handle spaces correctly print 'Opening Tracker hit with command:', cmd deskbar.core.Utils.spawn_async(cmd) else: deskbar.core.Utils.url_show ('file://'+dummy_escape(self.result['uri'])) print 'Opening Tracker hit:', self.result['uri'] def init_names (self): #print 'Parsing «%r»' % self.fullpath dirname, filename = os.path.split(self.fullpath) if filename == '': #We had a trailing slash dirname, filename = os.path.split(dirname) #Reverse-tilde-expansion home = os.path.normpath(os.path.expanduser('~')) regexp = re.compile(r'^%s(/|$)' % re.escape(home)) dirname = re.sub(regexp, r'~\1', dirname) self.dir = dirname self.base = filename class TrackerLiveSearchHandler(deskbar.interfaces.Module): INFOS = { 'icon': deskbar.core.Utils.load_icon ('tracker'), 'name': _('Tracker Live Search'), 'description': _('Search with Tracker, as you type'), 'categories' : { 'develop' : { 'name': _('Development Files'), }, 'music' : { 'name': _('Music'), }, 'images' : { 'name': _('Images'), }, 'videos' : { 'name': _('Videos'), }, 'applications' : { 'name': _('Applications'), }, }, } @staticmethod def has_prerequisites (): try: import dbus try : if getattr(dbus, 'version', (0,0,0)) >= (0,41,0): import dbus.glib # Check that Tracker can be started via dbus activation, we will have trouble if it's not bus = dbus.SessionBus() proxy_obj = bus.get_object('org.freedesktop.DBus', '/org/freedesktop/DBus') dbus_iface = dbus.Interface(proxy_obj, 'org.freedesktop.DBus') activatables = dbus_iface.ListActivatableNames() if not 'org.freedesktop.Tracker' in activatables: return (deskbar.Handler.HANDLER_IS_NOT_APPLICABLE, 'Tracker is not activatable via dbus', None) except: return (deskbar.Handler.HANDLER_IS_NOT_APPLICABLE, 'Python dbus.glib bindings not found.', None) return (deskbar.Handler.HANDLER_IS_HAPPY, None, None) except: return (deskbar.Handler.HANDLER_IS_NOT_APPLICABLE, 'Python dbus bindings not found.', None) def __init__(self): deskbar.interfaces.Module.__init__(self) # initing on search request, see self.query self.tracker = self.search_iface = self.keywords_iface = self.files_iface = None self.conv_re = re.compile (r'^.*?/logs/([^/]+)/([^/]+)/([^/]+)/(.+?)\.(:?txt|html)$') # all, proto, account, to-whom, time def handle_email_hits (self, info, output): output['title'] = dummy_escape(info[3]) output['publisher'] = dummy_escape(info[4]) def handle_conversation_hits (self, info, output): output ['uri'] = info [0] m = self.conv_re.match (output['uri']) output['channel']=_('with') output['proto']=output['conv_from']=output['conv_to']=output['time']='' # XXX, never happened during tests if m: output['proto'] = m.group (1) output['conv_from'] = m.group (2) output['conv_to'] = m.group (3) output['time'] = m.group (4) if output['conv_to'].endswith ('.chat'): output['channel'] = _('in channel') output['conv_to'] = output['conv_to'].replace ('.chat','') if output['proto'] == 'irc': nick_server = output['conv_from'].split ('@') if len (nick_server) > 1: output['conv_to'] = '%s on %s' % (output['conv_to'], nick_server[1]) # escape those entities, purple uses this to escape / on jabber channel/user conversations output['uri'] = output['uri'].replace ('%', '%25') # escape irc channel prefixes, else the path name parsing of stops at '#' (this holds also for the icon search) output['uri'] = output['uri'].replace ('#', '%23') def handle_application_hits (self, info, output): # print info # dbus.Array( # [ # dbus.String(u'/usr/share/applications/gksu.desktop'), # TrackerUri 0 # dbus.String(u'Applications'), # TrackerType 1 # dbus.String(u'Application'), # DesktopType 2 # dbus.String(u'Root Terminal'), # DesktopName 3 # dbus.String(u'gksu /usr/bin/x-terminal-emulator'), # DesktopExec 4 # dbus.String(u'gksu-root-terminal') # DesktopIcon 5 # ], # signature=dbus.Signature('s')) # Strip %U or whatever arguments in Exec field output['app_name'] = re.sub(r'%\w+', '', info [4]).strip () output['app_basename'] = dummy_escape (os.path.basename (output['app_name'])) output['app_name'] = dummy_escape (output['app_name']) if output['app_basename'] == '': # strange // in app_name, e.g. nautilus burn:/// output['app_basename'] = output['app_name'] output['name'] = dummy_escape (info [3]) output['icon'] = dummy_escape (info [5]) desktop = parse_desktop_file (output['uri']) if desktop: output['desktop'] = desktop def recieve_hits (self, qstring, hits, max): matches = [] results = {} for info in hits: output = {} output['name'] = os.path.basename(info[0]) output['uri'] = str(dummy_escape(info[0])) output['type'] = info[1] if not TYPES.has_key(output['type']): output['type'] = 'Other Files' if output['type'] == 'Email': self.handle_email_hits (info, output) elif output['type'] in ('GaimConversations', 'Conversations'): self.handle_conversation_hits (info, output) elif output['type'] == 'Applications': self.handle_application_hits (info, output) try: results[output['type']].append(output) except: results[output['type']] = [output] for key in results.iterkeys (): for res in results[key][:MAX_RESULTS]: matches.append(TrackerLiveFileMatch (res)) self._emit_query_ready (qstring, matches) print 'Tracker response for %s; %d hits returned, %d shown' % \ (qstring, len(hits), len(matches)) def recieve_error (self, error): print >> sys.stderr, '*** Tracker dbus error:', error def query (self, qstring): max = MAX_RESULTS if not self.tracker: try: import dbus bus = dbus.SessionBus() self.tracker = bus.get_object('org.freedesktop.Tracker', \ '/org/freedesktop/tracker') self.search_iface = dbus.Interface(self.tracker, \ 'org.freedesktop.Tracker.Search') self.keywords_iface = dbus.Interface(self.tracker, \ 'org.freedesktop.Tracker.Keywords') self.files_iface = dbus.Interface(self.tracker, \ 'org.freedesktop.Tracker.Files') except: print >> sys.stderr, 'DBus connection to tracker failed, check your settings.' return if qstring.count('tag:') == 0: for service in ('Files', 'Emails', 'Conversations', 'Applications'): print 'Searching %s' % service self.search_iface.TextDetailed (-1, service, qstring, 0,10, \ reply_handler=lambda hits : self.recieve_hits(qstring, \ hits, max), error_handler=self.recieve_error) print 'Tracker query:', qstring else: if self.tracker.GetVersion() == 502: self.search_iface.Query(-1,'Files',['File.Format'],'', \ qstring.replace('tag:',''),'',False,0,100, \ reply_handler=lambda hits : self.recieve_hits(qstring, hits, max), \ error_handler=self.recieve_error) elif self.tracker.GetVersion() == 503: self.search_iface.Query(-1,'Files',['File:Mime'],'', \ qstring.replace('tag:',''),'',False,0,100, \ reply_handler=lambda hits : self.recieve_hits(qstring, hits, max), \ error_handler=self.recieve_error) print 'Tracker tag query:', qstring.replace('tag:','') def dummy_escape (s): return s #return cgi.escape (s) def parse_desktop_file(desktop, only_if_visible=False): try: desktop = deskbar.core.gnomedesktop.item_new_from_file(desktop, deskbar.core.gnomedesktop.LOAD_ONLY_IF_EXISTS) except Exception, e: print 'Couldn\'t read desktop file:%s:%s' % (desktop, e) return None if desktop == None or desktop.get_entry_type() != deskbar.core.gnomedesktop.TYPE_APPLICATION: return None return desktop