[kupfer: 2/67] Parse .desktop files and re-implement application launching



commit b2a9c554f26949f16b78d5df0df37400ff051895
Author: Ulrik Sverdrup <ulrik sverdrup gmail com>
Date:   Tue Mar 15 22:52:25 2011 +0100

    Parse .desktop files and re-implement application launching
    
    Launch applications like Gio (GLib) does it,
    reimplement parsing of the Exec= key of .desktop files,
    use GdkAppLaunchContext to manage the startup notification,
    allow launching any kind of terminal.
    
    This adds a bit more control for launching applications which we can
    use later.

 kupfer/desktop_launch.py |  355 ++++++++++++++++++++++++++++++++++++++++++++++
 kupfer/desktop_parse.py  |  177 +++++++++++++++++++++++
 2 files changed, 532 insertions(+), 0 deletions(-)
---
diff --git a/kupfer/desktop_launch.py b/kupfer/desktop_launch.py
new file mode 100644
index 0000000..b5d6a5a
--- /dev/null
+++ b/kupfer/desktop_launch.py
@@ -0,0 +1,355 @@
+import os
+
+import pygtk
+pygtk.require('2.0')
+import glib
+import gio
+import gtk
+
+import xdg
+import xdg.DesktopEntry
+import xdg.Exceptions
+
+from kupfer import desktop_parse
+
+__all__ = ['launch_app_info']
+
+TERMINAL = ["gnome-terminal", "-x"]
+TERM_STARTUPNOTIFY = True
+STARTUP_ENV = "DESKTOP_STARTUP_ID"
+
+# TODO: Broadcast Gio's launched message on dbus
+# NOTE: GDK's startup notification things that we use
+#       are really only sending xmessages. (roughly).
+
+def debug_log(*args):
+	print " ".join(str(s) for s in args)
+warning_log = error_log = debug_log
+
+class ResourceLookupError (Exception):
+	"Unable to find resource"
+
+class ResourceReadError (Exception):
+	"Unable to open resource"
+
+def gtk_to_unicode(gtkstring):
+	"""Return unicode for a GTK/GLib string (bytestring or unicode)"""
+	if isinstance(gtkstring, unicode):
+		return gtkstring
+	return gtkstring.decode("UTF-8", "ignore")
+
+def find_desktop_file(desk_id):
+	"""Find file for @desk_id or raise ResourceLookupError"""
+	if not desk_id:
+		raise ResourceLookupError("Empty id")
+	try:
+		return next(xdg.BaseDirectory.load_data_paths("applications", desk_id))
+	except StopIteration:
+		raise ResourceLookupError("Cannot locate '%s'" % (desk_id,))
+
+def read_desktop_info(desktop_file):
+	"""
+	Get the keys StartupNotify, Terminal, Exec, Path, Icon
+	Return dict with bool and unicode values
+	"""
+	# Return values in unicode
+	try:
+		de = xdg.DesktopEntry.DesktopEntry(desktop_file)
+	except xdg.Exceptions.Error:
+		raise ResourceReadError
+	return {
+		"Terminal": de.getTerminal(),
+		"StartupNotify": de.getStartupNotify(),
+		"Exec": gtk_to_unicode(de.getExec()),
+		"Path": gtk_to_unicode(de.getPath()),
+		"Icon": gtk_to_unicode(de.getIcon()),
+		"Name": gtk_to_unicode(de.getName()),
+	}
+
+def create_desktop_info(commandline, name, icon, work_dir, in_terminal, startup_notify):
+	return {
+		"Terminal": in_terminal,
+		"StartupNotify": startup_notify,
+		"Exec": commandline,
+		"Path": work_dir,
+		"Icon": icon,
+		"Name": name,
+	}
+
+
+def replace_format_specs(argv, location, desktop_info, gfilelist):
+	"""
+	http://standards.freedesktop.org/desktop-entry-spec/latest/ar01s06.html
+
+	Replace format specifiers
+
+	%% literal %
+	%f file
+	%F list of files
+	%u URL
+	%U list of URLs
+	%i --icon <Icon key>
+	%c Translated name
+	%k location of .desktop file
+
+	deprecated are removed:
+	%d %D %n %N %v %m
+
+	apart from those, all other.. stay and are ignored
+	Like other implementations, we do actually insert
+	a local path for %u and %U if it exists.
+
+	Return (supports_single, added_at_end, argv)
+
+	supports_single: Launcher only supports a single file
+	                 caller has to re-call for each file
+	added_at_end:    No format found for the file, it was added
+	                 at the end
+	"""
+	supports_single_file = False
+	files_added_at_end = False
+	class Flags(object):
+		did_see_small_f = False
+		did_see_large_f = False
+
+	fileiter = iter(gfilelist)
+
+	def get_file_path(gfile):
+		if not gfile:
+			return ""
+		return gfile.get_path() or gfile.get_uri()
+
+	def get_next_file_path():
+		try:
+			f = next(fileiter)
+		except StopIteration:
+			return ""
+		return get_file_path(f)
+
+	def replace_single_code(key):
+		"Handle all embedded format codes, including those to be removed"
+		keys = set(["%%", "%f", "%u", "%c", "%k"])
+		deprecated = set(['%d', '%D', '%n', '%N', '%v', '%m'])
+		if key in deprecated:
+			return ""
+		if key == "%%":
+			return "%"
+		if key == "%f" or key == "%u":
+			if Flags.did_see_large_f or Flags.did_see_small_f:
+				warning_log("Warning, multiple file format specs!")
+				return ""
+			Flags.did_see_small_f = True
+			return get_next_file_path()
+
+		if key == "%c":
+			return gtk_to_unicode(desktop_info["Name"] or location)
+		if key == "%k":
+			return location
+		else:
+			return None
+
+	def replace_array_format(elem):
+		"""
+		Handle array format codes -- only recognized as single arguments
+		
+		Return  flag, arglist
+		where flag is true if something was replaced
+		"""
+		if elem == "%U" or elem == "%F":
+			if Flags.did_see_large_f or Flags.did_see_small_f:
+				warning_log("Warning, multiple file format specs!")
+				return True, []
+			Flags.did_see_large_f = True
+			return True, filter(bool,[get_file_path(f) for f in gfilelist])
+		if elem == "%i":
+			if desktop_info["Icon"]:
+				return True, ["--icon", desktop_info["Icon"]]
+			return True, []
+		else:
+			return False, elem
+
+	def two_part_unescaper(s, repfunc):
+		"""
+		Handle embedded format codes
+
+		Scan @s two characters at a time and replace using @repfunc
+		"""
+		if not s:
+			return s
+		def _inner():
+			it = iter(zip(s, s[1:]))
+			for cur, nex in it:
+				key = cur+nex
+				rep = repfunc(key)
+				if rep is not None:
+					yield rep
+					# skip a step in the iter
+					try:
+						it.next()
+					except StopIteration:
+						return
+				else:
+					yield cur
+			yield s[-1]
+		return ''.join(_inner())
+
+	new_argv = []
+	for x in argv:
+		succ, newargs = replace_array_format(x)
+		if succ:
+			new_argv.extend(newargs)
+		else:
+			arg = two_part_unescaper(x, replace_single_code)
+			if arg:
+				new_argv.append(arg)
+	
+	if len(gfilelist) > 1 and not Flags.did_see_large_f:
+		supports_single_file = True
+	if not Flags.did_see_small_f and not Flags.did_see_large_f and len(gfilelist):
+		files_added_at_end = True
+		new_argv.append(get_next_file_path())
+
+	return supports_single_file, files_added_at_end, new_argv
+
+def _file_and_info_for_app_info(app_info):
+	desktop_info = None
+	try:
+		desktop_file = find_desktop_file(app_info.get_id())
+	except ResourceLookupError as exc:
+		error_log("Error:", exc)
+		desktop_file = None
+	else:
+		try:
+			desktop_info = read_desktop_info(desktop_file)
+		except ResourceReadError as exc:
+			error_log("Read error:", exc)
+	return desktop_file, desktop_info
+
+def launch_app_info(app_info, gfiles=[]):
+	desktop_file, desktop_info = _file_and_info_for_app_info(app_info)
+	if not desktop_file or not desktop_info:
+		# Allow in-memory app_info creations (without id or desktop file)
+		desktop_file = ""
+		desktop_info = create_desktop_info(app_info.get_commandline() or "",
+		                                   app_info.get_name(),
+		                                   "",
+		                                   "",
+		                                   False,
+		                                   False)
+		# in this case, the command line is already primarily escaped
+		argv = desktop_parse.parse_argv(desktop_info["Exec"])
+	else:
+		# In the normal case, we must first escape one round
+		argv = desktop_parse.parse_unesc_argv(desktop_info["Exec"])
+	assert argv and argv[0]
+
+	# Now Resolve the %f etc format codes
+	multiple_needed, missing_format, launch_argv = \
+			replace_format_specs(argv, desktop_file, desktop_info, gfiles)
+
+	if not multiple_needed:
+		# Launch 1 process
+		launch_records = [(launch_argv, gfiles)]
+
+	else:
+		# Launch one process per file
+		launch_records = [(launch_argv, gfiles[0])]
+		for f in gfiles[1:]:
+			_ignore1, _ignore2, launch_argv = \
+				replace_format_specs(argv, desktop_file, desktop_info, [f])
+			launch_records.append((launch_argv, [f]))
+
+	notify = desktop_info["StartupNotify"]
+	workdir = desktop_info["Path"] or None
+
+	if desktop_info["Terminal"]:
+		in_terminal = True
+		notify = notify or TERM_STARTUPNOTIFY
+	else:
+		in_terminal = False
+
+	for argv, gfiles in launch_records:
+		if in_terminal:
+			argv = TERMINAL + argv
+		ret = spawn_app(app_info, argv, gfiles, workdir, notify)
+		if not ret:
+			return False
+	return True
+
+def spawn_app_id(app_id, argv, workdir=None, startup_notify=True):
+	"""
+	Spawn @argv trying to notify it as if it is app_id
+	"""
+	try:
+		app_info = get_info_for_id(app_id)
+	except RuntimeError:
+		app_info = None
+		startup_notify = False
+	return spawn_app(app_info, argv, [], workdir, startup_notify)
+
+def spawn_app(app_info, argv, filelist, workdir=None, startup_notify=True):
+	"""
+	Spawn app.
+
+	@argv: argument list including files
+	@workdir: where to set workdir if not cwd
+	@app_info: Used for startup notification, if @startup_notify is True
+	@filelist: Used for startup notification
+	@startup_notify: Use startup notification
+	"""
+	notify_id = None
+	if startup_notify:
+		ctx = gtk.gdk.AppLaunchContext()
+		ctx.set_timestamp(gtk.get_current_event_time())
+		# This not only returns the string ID but
+		# it actually starts the startup notification!
+		notify_id = ctx.get_startup_notify_id(app_info, filelist)
+		child_env_add = {STARTUP_ENV: notify_id}
+	else:
+		child_env_add = {}
+
+	if not workdir or not os.path.exists(workdir):
+		workdir = "."
+
+	argv = list(encode_argv(argv, "UTF-8"))
+
+	try:
+		debug_log("Launching", argv)
+		debug_log("Startup Notify ID:", notify_id)
+		pid = glib.spawn_async(argv,
+		                       working_directory=workdir,
+		                       flags=glib.SPAWN_SEARCH_PATH,
+		                       child_setup=child_setup,
+		                       user_data=child_env_add)
+		debug_log("Launched child with PID", pid)
+	except glib.GError as exc:
+		error_log("Error Launching ", argv, exc)
+		if notify_id:
+			gtk.gdk.notify_startup_complete_with_id(notify_id)
+		return False
+	return True
+
+def child_setup(add_environ):
+	"""Called to setup the child process before exec()
+	@add_environ is a dict for extra env variables
+	"""
+	for key in add_environ:
+		os.putenv(key, add_environ[key])
+
+def encode_argv(argv, encoding):
+	for x in argv:
+		if isinstance(x, unicode):
+			yield x.encode(encoding, "ignore")
+		else:
+			yield x
+
+def get_info_for_id(id_):
+	return gio.unix.DesktopAppInfo(id_)
+
+if __name__ == '__main__':
+
+	while True:
+		id_ = raw_input("Give me an App ID > ")
+		launch_app_info(get_info_for_id(id_ + ".desktop"), [])
+		#launch_app_info(gio.AppInfo("gvim"), [gio.File(".")])
+
diff --git a/kupfer/desktop_parse.py b/kupfer/desktop_parse.py
new file mode 100644
index 0000000..472d458
--- /dev/null
+++ b/kupfer/desktop_parse.py
@@ -0,0 +1,177 @@
+
+"""
+Implementation of unescaping and unquoting of the Exec= key in
+the Desktop Entry Specification (As of March 2011, version 1.1-draft)
+http://standards.freedesktop.org/desktop-entry-spec/latest/ar01s06.html
+http://standards.freedesktop.org/desktop-entry-spec/desktop-entry-spec-1.1.html#exec-variables
+
+The unescaping we are doing is only one way.. so we unescape according to the
+rules, but we accept everything, if validly quoted or not.
+"""
+
+
+# This is the "string" type encoding escapes
+# this is unescaped before we process anything..
+escape_table = {
+	r'\s': ' ',
+	r'\n': '\n',
+	r'\t': '\t',
+	r'\r': '\r',
+	'\\\\': '\\',
+}
+
+# quoted are those chars that need a backslash in front
+# (inside a double-quoted section, that is)
+quoted = r""" " ` $ \ """.split()
+quoted_table = {
+	r'\"': '"',
+	r'\`': '`',
+	r'\$': '$',
+	'\\\\': '\\',
+}
+
+'''
+# reserved are those that need to be inside quotes
+# note that all the quoted are also reserved, of course
+
+We don't use these at all
+reserved = r""" " ' \ > < ~ | & ; $ * ? # ( ) ` """.split()
+reserved.extend([' ', '\t', '\n'])
+'''
+
+def rmquotes(s):
+	"remove first and last char if we can"
+	if len(s) > 1 and s[0] == s[-1] and s[0] in '"\'':
+		return s[1:-1]
+	return s
+
+def two_part_unescaper(s, reptable):
+	"Scan @s two characters at a time and replace using @reptable"
+	if not s:
+		return s
+	def _inner():
+		it = iter(zip(s, s[1:]))
+		for cur, nex in it:
+			key = cur+nex
+			if key in reptable:
+				yield reptable[key]
+				try:
+					it.next()
+				except StopIteration:
+					return
+			else:
+				yield cur
+		yield s[-1]
+	return ''.join(_inner())
+
+def quote_scanner(s, reptable):
+	"Scan @s two characters at a time and replace using @reptable"
+	qstr = r'"'
+	eqstr = '\\' + qstr
+
+	parts = []  # A list of arguments
+
+	if not s:
+		return parts
+
+	def add_part(is_quoted, part):
+		_ps = "".join(part)
+		if is_quoted:
+			parts.append(two_part_unescaper(rmquotes(_ps), reptable))
+		else:
+			parts.extend(_ps.split())
+
+
+	is_quoted = False
+	it = iter(zip(s, s[1:]))
+	part = []
+	for cur, nex in it:
+		part.append(cur)
+		if cur+nex == eqstr:
+			# Skip along if we see an escaped quote (\")
+			part.append(nex)
+			try:
+				it.next()
+			except StopIteration:
+				break
+		elif cur == qstr:
+			if is_quoted:
+				add_part(is_quoted, part)
+				part = []
+				is_quoted = not is_quoted
+			else:
+				head = part[:-1]
+				if head:
+					add_part(is_quoted, head)
+					part = [part[-1]]
+				is_quoted = not is_quoted
+		else:
+			pass
+	else:
+		# This is a for-else: we did not 'break'
+		# Emit the last if it wasn't already
+		part.append(s[-1])
+	add_part(is_quoted, part)
+	return parts
+
+
+def unescape(s):
+	"Primary unescape of control sequences"
+	return two_part_unescaper(s, escape_table)
+
+def unquote_inside(s):
+	"unquote reserved chars inside a quoted string"
+	t = {}
+	slash = '\\'
+	for rep in quoted:
+		t[slash+rep] = rep
+	return two_part_unescaper(s, t)
+
+def test_unescape():
+	r"""
+	>>> t = r'"This \\$ \\\\ \s\\\\"'
+	>>> unescape(t)
+	'"This \\$ \\\\  \\\\"'
+	>>> unescape(r'\t\s\\\\')
+	'\t \\\\'
+	"""
+	pass
+
+def test_unquote_inside():
+	r"""
+	>>> unquote_inside(r'\$ \\ \" \`')
+	'$ \\ " `'
+	>>> unquote_inside(r'abc \q')
+	'abc \\q'
+	"""
+	pass
+
+def parse_argv(instr):
+	"Parse quoted @instr into an argv"
+	return quote_scanner(instr, quoted_table)
+
+def parse_unesc_argv(instr):
+	"Parse quoted @instr into an argv after unescaping it"
+	return quote_scanner(unescape(instr), quoted_table)
+
+'''
+print escaped
+print reserved
+
+t = r'"This \\$ \\\\ \s\\\\"'
+print repr(t)
+print t
+print unescape(t)
+print unquote_inside(rmquotes(unescape(t)))
+
+print two_part_unescaper(t, escape_table)
+
+print quote_scanner(r'"hi \"there" I am you\"', inside_table)
+print quote_scanner(r'Now "\"this\" will be interesting"""', inside_table)
+print quote_scanner(unescape(r'"\\$"'), inside_table)
+
+'''
+
+if __name__ == "__main__":
+	import doctest
+	doctest.testmod()



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]