[damned-lies] Secured run_shell_command by limited usage of shell=True
- From: Claude Paroz <claudep src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [damned-lies] Secured run_shell_command by limited usage of shell=True
- Date: Sat, 20 Sep 2014 13:16:39 +0000 (UTC)
commit 38f6aae2b3a9d815c3500f626b852a73c1e863bd
Author: Claude Paroz <claude 2xlibre net>
Date: Sat Sep 20 12:46:48 2014 +0200
Secured run_shell_command by limited usage of shell=True
subprocess.Popen is much more secure with shell=False. Most
commands have been rewritten to not require a shell.
Many thanks to Tobias Mueller for pointing out this security weakness.
stats/management/commands/migrate-to-git.py | 7 +-
stats/models.py | 157 +++++++++------------------
stats/tests/tests.py | 9 +-
stats/utils.py | 49 +++++----
vertimus/models.py | 8 +-
5 files changed, 84 insertions(+), 146 deletions(-)
---
diff --git a/stats/management/commands/migrate-to-git.py b/stats/management/commands/migrate-to-git.py
index a5b6208..09326c1 100644
--- a/stats/management/commands/migrate-to-git.py
+++ b/stats/management/commands/migrate-to-git.py
@@ -33,12 +33,9 @@ class Command(BaseCommand):
for branch in module.branch_set.exclude(name='master'):
# Checkout branch (other than master)
- cmd = "cd \"%(localdir)s\" && git checkout --track -b %(branch)s origin/%(branch)s" % {
- "localdir" : branch.co_path(),
- "branch" : branch.name,
- }
+ cmd = ['git', 'checkout', '--track', '-b', branch.name, 'origin/%s' % branch.name]
try:
- utils.run_shell_command(cmd, raise_on_error=True)
+ utils.run_shell_command(cmd, raise_on_error=True, cwd=branch.co_path())
except Exception, e:
self.stderr.write("Unable to checkout branch '%s' of module '%s': %s" % (branch.name,
module.name, e))
continue
diff --git a/stats/models.py b/stats/models.py
index ac41f23..7cb110d 100644
--- a/stats/models.py
+++ b/stats/models.py
@@ -233,11 +233,9 @@ class Branch(models.Model):
if os.access(self.co_path(), os.W_OK):
shutil.rmtree(self.co_path())
else:
- cmd = "cd \"%(localdir)s\" && git checkout master && git branch -D %(branch)s" % {
- 'localdir': self.co_path(),
- 'branch': self.name,
- }
- utils.run_shell_command(cmd)
+ wd = self.co_path()
+ utils.run_shell_command(['git', 'checkout', 'master'], cwd=wd)
+ utils.run_shell_command(['git', 'branch', '-D', self.name], cwd=wd)
#To be implemented for hg/bzr
# Remove the pot/po generated files
@@ -484,7 +482,6 @@ class Branch(models.Model):
# 7. Update language po files and update DB
# *****************************************
- command = "msgmerge --previous -o %(outpo)s %(pofile)s %(potfile)s"
stats_with_ext_errors = Statistics.objects.filter(branch=self, domain=dom,
information__type__endswith='-ext')
langs_with_ext_errors = [stat.language.locale for stat in stats_with_ext_errors]
dom_langs = dom.get_lang_files(self.co_path())
@@ -496,12 +493,7 @@ class Branch(models.Model):
continue
# msgmerge po with recent pot
- realcmd = command % {
- 'outpo' : outpo,
- 'pofile' : pofile,
- 'potfile' : potfile,
- }
- utils.run_shell_command(realcmd)
+ utils.run_shell_command(['msgmerge', '--previous', '-o', outpo, pofile, potfile])
# Get Statistics object
try:
@@ -534,8 +526,8 @@ class Branch(models.Model):
def _exists(self):
""" Determine if branch (self) already exists (i.e. already checked out) on local FS """
if self.module.vcs_type == 'git':
- command = "cd %s && git branch | grep %s" % (self.co_path(), self.name)
- status, output, errs = utils.run_shell_command(command)
+ command = "git branch | grep %s" % self.name
+ status, output, errs = utils.run_shell_command(command, cwd=self.co_path())
return output != ""
elif self.module.vcs_type == 'hg':
return self.id != None and os.access(self.co_path(), os.X_OK | os.W_OK)
@@ -553,36 +545,25 @@ class Branch(models.Model):
try: os.makedirs(localroot)
except: pass
- commandList = []
+ command_list = []
if self._exists():
# Path exists, update repos
if vcs_type == "cvs":
- commandList.append("cd \"%(localdir)s\" && cvs -z4 up -Pd" % {
- "localdir" : modulepath,
- })
+ command_list.append((modulepath, ['cvs', '-z4', 'up', '-Pd']))
elif vcs_type == "svn":
- commandList.append("cd \"%(localdir)s\" && svn up --non-interactive" % {
- "localdir" : modulepath,
- })
+ command_list.append((modulepath, ['svn', 'up', '--non-interactive']))
elif vcs_type == "hg":
- commandList.append("cd \"%(localdir)s\" && hg revert --all" % {
- "localdir" : modulepath,
- })
+ command_list.append((modulepath, ['hg', 'revert', '--all']))
elif vcs_type == "git":
- # tester "cd \"%(localdir)s\" && git checkout %(branch)s && git clean -dfq && git pull
origin/%(branch)s"
- commandList.append("cd \"%(localdir)s\" && git checkout -f %(branch)s && "
- "git fetch && git reset --hard origin/%(branch)s && git clean -dfq" % {
- "localdir" : modulepath,
- "branch" : self.name,
- })
+ # tester "git checkout %(branch)s && git clean -dfq && git pull origin/%(branch)s"
+ command_list.append((modulepath, ['git', 'checkout', '-f', self.name]))
+ command_list.append((modulepath, ['git', 'fetch']))
+ command_list.append((modulepath, ['git', 'reset', '--hard', 'origin/%s' % self.name]))
+ command_list.append((modulepath, ['git', 'clean', '-dfq']))
# check if there are any submodules and init & update them
- commandList.append("cd \"%(localdir)s\" && if [ -e .gitmodules ]; then git submodule update
--init; fi" % {
- "localdir" : modulepath,
- })
+ command_list.append((modulepath, "if [ -e .gitmodules ]; then git submodule update --init;
fi"))
elif vcs_type == "bzr":
- commandList.append("cd \"%(localdir)s\" && bzr up" % {
- "localdir" : modulepath,
- })
+ command_list.append((modulepath, ['bzr', 'up']))
else:
# Checkout
vcs_path = self.get_vcs_url()
@@ -592,64 +573,35 @@ class Branch(models.Model):
moduledir = self.module.name + "." + self.name
if vcs_type == "cvs":
- commandList.append("cd \"%(localroot)s\" && cvs -d%(cvsroot)s -z4 co -d%(dir)s -r%(branch)s
%(module)s" % {
- "localroot" : localroot,
- "cvsroot" : scmroot,
- "dir" : moduledir,
- "branch" : self.name,
- "module" : module_name,
- })
+ command_list.append((localroot,
+ ['cvs', '-d%s' % scmroot, '-z4', 'co', '-d%s' % moduledir, '-r%s' % self.name,
+ module_name]))
elif vcs_type == "svn":
- commandList.append("cd \"%(localroot)s\" && svn co --non-interactive %(svnpath)s
\"%(dir)s\"" % {
- "localroot" : localroot,
- "svnpath" : vcs_path,
- "dir" : moduledir,
- })
+ command_list.append((localroot, ['svn', 'co', '--non-interactive', vcs_path, moduledir]))
elif vcs_type == "hg":
- commandList.append("cd \"%(localroot)s\" && hg clone %(hgpath)s \"%(dir)s\"" % {
- "localroot" : localroot,
- "hgpath" : vcs_path,
- "dir" : moduledir,
- })
- commandList.append("cd \"%(localdir)s\" && hg update %(branch)s" % {
- "localdir" : modulepath,
- "branch" : self.name,
- })
+ command_list.append((localroot, ['hg', 'clone', vcs_path, moduledir]))
+ command_list.append((modulepath, ['hg', 'update', self.name]))
elif vcs_type == "git":
# We are assuming here that master is the first branch created
if self.name == "master":
- commandList.append("cd \"%(localroot)s\" && git clone %(gitpath)s \"%(dir)s\"" % {
- "localroot" : localroot,
- "gitpath" : vcs_path,
- "dir" : moduledir,
- })
- commandList.append("cd \"%(localdir)s\" && git remote update && git checkout %(branch)s"
% {
- "localdir" : modulepath,
- "branch" : self.name,
- })
+ command_list.append((localroot, ['git', 'clone', vcs_path, moduledir]))
+ command_list.append((modulepath, ['git', 'remote', 'update']))
+ command_list.append((modulepath, ['git', 'checkout', self.name]))
else:
- commandList.append("cd \"%(localdir)s\" && git pull && git checkout --track -b
%(branch)s origin/%(branch)s" % {
- "localdir" : modulepath,
- "branch" : self.name,
- })
+ command_list.append((modulepath, ['git', 'pull']))
+ command_list.append((modulepath, ['git', 'checkout', '--track', '-b', self.name,
'origin/%s' % self.name]))
# check if there are any submodules and init & update them
- commandList.append("cd \"%(localdir)s\" && if [ -e .gitmodules ]; then git submodule update
--init; fi" % {
- "localdir" : modulepath,
- })
+ command_list.append((modulepath, "if [ -e .gitmodules ]; then git submodule update --init;
fi"))
elif vcs_type == "bzr":
- commandList.append("cd \"%(localroot)s\" && bzr co --lightweight %(bzrpath)s \"%(dir)s\"" % {
- "localroot" : localroot,
- "bzrpath" : vcs_path,
- "dir" : moduledir,
- })
+ command_list.append((localroot, ['bzr', 'co', '--lightweight', vcs_path, moduledir]))
# Run command(s)
logging.debug("Checking '%s.%s' out to '%s'..." % (module_name, self.name, modulepath))
# Do not allow 2 checkouts to run in parallel on the same branch
self.checkout_lock.acquire()
try:
- for command in commandList:
- utils.run_shell_command(command, raise_on_error=True)
+ for working_dir, command in command_list:
+ utils.run_shell_command(command, raise_on_error=True, cwd=working_dir)
finally:
self.checkout_lock.release()
return 1
@@ -669,20 +621,11 @@ class Branch(models.Model):
dest_path = os.path.join(commit_dir, dest_filename)
if vcs_type == "git":
- var_dict = {
- 'dest': commit_dir,
- 'branch': self.name,
- 'po_file': dest_filename,
- 'lg_file': "LINGUAS",
- 'author': author,
- 'msg': "Updated %s translation" % language.name,
- }
-
with ModuleLock(self.module):
utils.run_shell_command(
- "cd \"%(dest)s\" && git checkout %(branch)s" % var_dict, raise_on_error=True)
+ ['git', 'checkout', self.name], raise_on_error=True, cwd=commit_dir)
utils.run_shell_command(
- "cd \"%(dest)s\" && git pull" % var_dict, raise_on_error=True)
+ ['git', 'pull'], raise_on_error=True, cwd=commit_dir)
already_exist = os.access(dest_path, os.F_OK)
if not already_exist and domain.dtype != 'ui':
@@ -693,28 +636,30 @@ class Branch(models.Model):
# git add file.po
utils.run_shell_command(
- "cd \"%(dest)s\" && git add %(po_file)s" % var_dict, raise_on_error=True)
+ ['git', 'add', dest_filename], raise_on_error=True, cwd=commit_dir)
if not already_exist:
# Add locale to LINGUAS
linguas_file = os.path.join(commit_dir, "LINGUAS")
if os.access(linguas_file, os.F_OK):
utils.insert_locale_in_linguas(linguas_file, locale)
utils.run_shell_command(
- "cd \"%(dest)s\" && git add %(lg_file)s" % var_dict, raise_on_error=True)
- var_dict['msg'] = "Added %s translation" % language.name
- # git commit -m "Updated %s translation."
- commit_cmd = "cd \"%(dest)s\" && git commit -m \"%(msg)s\""
+ ['git', 'add', 'LINGUAS'], raise_on_error=True, cwd=commit_dir)
+ msg = "Added %s translation" % language.name
+ else:
+ msg = "Updated %s translation" % language.name
+
+ commit_cmd = ['git', 'commit', '-m', msg]
if author:
- commit_cmd += " --author \"%(author)s\""
- utils.run_shell_command(commit_cmd % var_dict, raise_on_error=True)
+ commit_cmd.extend(['--author', author])
+ utils.run_shell_command(commit_cmd, raise_on_error=True, cwd=commit_dir)
# git push
try:
utils.run_shell_command(
- "cd \"%(dest)s\" && git push origin %(branch)s" % var_dict, raise_on_error=True)
+ ['git', 'push', 'origin', self.name], raise_on_error=True, cwd=commit_dir)
except OSError:
# Revert the commit
utils.run_shell_command(
- "cd \"%(dest)s\" && git reset --hard origin/%(branch)s" % var_dict)
+ ['git', 'reset', '--hard', 'origin/%s' % self.name], cwd=commit_dir)
raise
# Finish by updating stats
if already_exist:
@@ -803,7 +748,7 @@ class Domain(models.Model):
env = None
if not self.pot_method: # default is intltool
env = {"XGETTEXT_ARGS": "\"--msgid-bugs-address=%s\"" % self.module.get_bugs_enter_url()}
- pot_command = "intltool-update -g '%(domain)s' -p" % {'domain': self.potbase()}
+ pot_command = ['intltool-update', '-g', self.potbase(), '-p']
elif self.pot_method.startswith(('http://', 'https://')):
# Get POT from URL and save file locally
import urllib2
@@ -815,17 +760,13 @@ class Domain(models.Model):
potfile = os.path.join(vcs_path, self.potbase() + ".pot")
with open(potfile, 'w') as f:
f.write(handle.read())
- pot_command = ":" # noop
+ pot_command = ':' # noop
elif self.module.name == 'damned-lies':
# special case for d-l, pot file should be generated from running instance dir
podir = "."
vcs_path = "./po"
- command = "cd \"%(dir)s\" && %(pot_command)s" % {
- "dir" : podir,
- "pot_command" : pot_command,
- }
- (status, output, errs) = utils.run_shell_command(command, env=env)
+ (status, output, errs) = utils.run_shell_command(pot_command, env=env, cwd=podir)
potfile = os.path.join(vcs_path, self.potbase() + ".pot")
if not os.access(potfile, os.R_OK):
@@ -837,7 +778,7 @@ class Domain(models.Model):
if status != utils.STATUS_OK or not os.access(potfile, os.R_OK):
return "", (("error", ugettext_noop("Error regenerating POT file for
%(file)s:\n<pre>%(cmd)s\n%(output)s</pre>")
% {'file': self.potbase(),
- 'cmd': pot_command,
+ 'cmd': ' '.join(pot_command) if isinstance(pot_command, list) else
pot_command,
'output': errs.decode('utf-8')}),
)
else:
diff --git a/stats/tests/tests.py b/stats/tests/tests.py
index 20ea96e..50cf385 100644
--- a/stats/tests/tests.py
+++ b/stats/tests/tests.py
@@ -68,7 +68,7 @@ class patch_shell_command:
def __enter__(self):
self.cmds = []
def mocked_run_shell_command(cmd, *args, **kwargs):
- self.cmds.append(cmd)
+ self.cmds.append(" ".join(cmd) if isinstance(cmd, list) else cmd)
return 0, '', ''
self.saved_run_shell_command = utils.run_shell_command
utils.run_shell_command = mocked_run_shell_command
@@ -221,7 +221,8 @@ class ModuleTestCase(TestCase):
# User interface (existing language)
git_ops = ('git checkout master', 'git pull', 'git add fr.po',
- 'git commit -m "Updated French translation" --author "Author <someone example org>"',
+ # Quoting is done at the Popen level
+ 'git commit -m Updated French translation --author Author <someone example org>',
'git push origin master')
with patch_shell_command() as cmds:
branch.commit_po(po_file, domain, fr_lang, 'Author <someone example org>')
@@ -231,7 +232,7 @@ class ModuleTestCase(TestCase):
# User interface (new language)
bem_lang = Language.objects.get(locale='bem')
git_ops = ('git checkout master', 'git pull', 'git add bem.po', 'git add LINGUAS',
- 'git commit -m "Added Bemba translation" --author "Author <someone example org>"',
+ 'git commit -m Added Bemba translation --author Author <someone example org>',
'git push origin master')
with patch_shell_command() as cmds:
branch.commit_po(po_file, domain, bem_lang, 'Author <someone example org>')
@@ -246,7 +247,7 @@ class ModuleTestCase(TestCase):
# Documentation
domain = self.mod.domain_set.get(name='help')
git_ops = ('git checkout master', 'git pull', 'git add fr/fr.po',
- 'git commit -m "Updated French translation" --author "Author <someone example org>"',
+ 'git commit -m Updated French translation --author Author <someone example org>',
'git push origin master')
with patch_shell_command() as cmds:
branch.commit_po(po_file, domain, fr_lang, 'Author <someone example org>')
diff --git a/stats/utils.py b/stats/utils.py
index 7fd04bc..178a1fb 100644
--- a/stats/utils.py
+++ b/stats/utils.py
@@ -57,14 +57,15 @@ class DocFormat(object):
self.format = is_mallard and "mallard" or "docbook"
self.tool = is_itstool and "itstool" or "xml2po"
- @property
- def command(self):
+ def command(self, potfile, files):
if self.tool == "itstool":
- return "cd \"%%(dir)s\" && %sitstool -o %%(potfile)s %%(files)s" % ITSTOOL_PATH
+ cmd = ['%sitstool' % ITSTOOL_PATH, '-o', potfile]
elif self.format == "mallard":
- return "cd \"%(dir)s\" && xml2po -m mallard -o %(potfile)s -e %(files)s"
+ cmd = ['xml2po', '-m', 'mallard', '-o', potfile, '-e']
else:
- return "cd \"%(dir)s\" && xml2po -o %(potfile)s -e %(files)s"
+ cmd = ['xml2po', '-o', potfile, '-e']
+ cmd.extend(files)
+ return cmd
@property
def module_var(self):
@@ -120,18 +121,21 @@ def ellipsize(val, length):
val = "%s..." % val[:length]
return val
-def run_shell_command(cmd, env=None, input_data=None, raise_on_error=False):
+def run_shell_command(cmd, input_data=None, raise_on_error=False, **popen_kwargs):
logging.debug(cmd)
stdin = None
if input_data:
stdin = PIPE
- if env:
- os.environ.update(env)
- env = os.environ
+ if 'env' in popen_kwargs:
+ os.environ.update(popen_kwargs['env'])
+ popen_kwargs['env'] = os.environ
+ shell = not isinstance(cmd, list)
if isinstance(cmd, unicode):
cmd = cmd.encode('utf-8')
- pipe = Popen(cmd, shell=True, env=env, stdin=stdin, stdout=PIPE, stderr=PIPE)
+ elif isinstance(cmd, list):
+ cmd = [c.encode('utf-8') for c in cmd]
+ pipe = Popen(cmd, shell=shell, stdin=stdin, stdout=PIPE, stderr=PIPE, **popen_kwargs)
if input_data:
try:
pipe.stdin.write(input_data)
@@ -148,7 +152,7 @@ def run_shell_command(cmd, env=None, input_data=None, raise_on_error=False):
def check_program_presence(prog_name):
""" Test if prog_name is an available command on the system """
- status, output, err = run_shell_command("which %s" % prog_name)
+ status, output, err = run_shell_command(['which', prog_name])
return status == 0
def po_grep(in_file, out_file, filter_):
@@ -178,8 +182,8 @@ def check_potfiles(po_path):
Return a list of errors """
errors = []
- command = "cd \"%(dir)s\" && rm -f missing notexist && intltool-update -m" % { "dir" : po_path, }
- (status, output, errs) = run_shell_command(command)
+ run_shell_command(['rm', '-f', 'missing', 'notexist'], cwd=po_path)
+ (status, output, errs) = run_shell_command(['intltool-update', '-m'], cwd=po_path)
if status != STATUS_OK:
errors.append( ("error", ugettext_noop("Errors while running 'intltool-update -m' check.")) )
@@ -238,16 +242,15 @@ def generate_doc_pot_file(vcs_path, potbase, moduleid):
# fallback to directory listing
files.extend([f for f in all_C_files if f.endswith(".page")])
- files = " ".join([os.path.join("C", f) for f in files])
potfile = os.path.join(vcs_path, "C", potbase + ".pot")
- command = doc_format.command % {'dir': vcs_path, 'potfile': potfile, 'files': files}
- (status, output, errs) = run_shell_command(command)
+ command = doc_format.command(potfile, [os.path.join("C", f) for f in files])
+ (status, output, errs) = run_shell_command(command, cwd=vcs_path)
if status != STATUS_OK:
errors.append(("error",
ugettext_noop("Error regenerating POT file for document
%(file)s:\n<pre>%(cmd)s\n%(output)s</pre>")
% {'file': potbase,
- 'cmd': command.replace(settings.SCRATCHDIR, "<scratchdir>"),
+ 'cmd': " ".join([c.replace(settings.SCRATCHDIR, "<scratchdir>") for c
in command]),
'output': errs})
)
potfile = ""
@@ -302,7 +305,7 @@ def search_variable_in_file(file_path, variable):
return found
def pot_diff_status(pota, potb):
- (status, output, errs) = run_shell_command("diff %s %s|wc -l" % (pota, potb))
+ (status, output, errs) = run_shell_command('diff "%s" "%s"|wc -l' % (pota, potb))
# POT generation date always change and produce a 4 line diff
if int(output) <= 4:
return NOT_CHANGED, ""
@@ -352,9 +355,9 @@ def po_file_stats(pofile, msgfmt_checks=True):
raise ValueError("pofile type not recognized")
if msgfmt_checks:
- command = "msgfmt -cv -o /dev/null %s" % input_file
+ command = ['msgfmt', '-cv', '-o', '/dev/null', input_file]
else:
- command = "msgfmt --statistics -o /dev/null %s" % input_file
+ command = ['msgfmt', '--statistics', '-o', '/dev/null', input_file]
(status, output, errs) = run_shell_command(command, env=c_env, input_data=input_data)
@@ -388,8 +391,8 @@ def po_file_stats(pofile, msgfmt_checks=True):
except:
status = STATUS_OK+1
else:
- command = ("msgconv -t UTF-8 %s | diff -i -I '^#~' -u %s - >/dev/null") % (pofile,
- pofile)
+ command = ("msgconv -t UTF-8 \"%s\" | diff -i -I '^#~' -u \"%s\" - >/dev/null") % (
+ pofile, pofile)
(status, output, errs) = run_shell_command(command, env=c_env)
if status != STATUS_OK:
res['errors'].append(("warn",
@@ -513,7 +516,7 @@ def add_custom_header(po_path, header, value):
(stat, out, err) = run_shell_command(cmd)
def is_po_reduced(file_path):
- status, output, errs = run_shell_command("""grep "X-DamnedLies-Scope: partial" %s""" % file_path)
+ status, output, errs = run_shell_command(['grep', 'X-DamnedLies-Scope: partial', file_path])
return (status == 0)
def copy_file(file1, file2):
diff --git a/vertimus/models.py b/vertimus/models.py
index 0816431..f352541 100644
--- a/vertimus/models.py
+++ b/vertimus/models.py
@@ -483,14 +483,10 @@ class Action(ActionAbstract):
self.save()
return # post_save will call merge_file_with_pot again
merged_path = self.merged_file.path
- command = "msgmerge --previous -o %(out_po)s %(po_file)s %(pot_file)s" % {
- 'out_po': merged_path,
- 'po_file': self.file.path,
- 'pot_file': pot_file
- }
+ command = ['msgmerge', '--previous', '-o', merged_path, self.file.path, pot_file]
run_shell_command(command)
# If uploaded file is reduced, run po_grep *after* merge
- if is_po_reduced(self.file):
+ if is_po_reduced(self.file.path):
temp_path = "%s.temp.po" % self.file.path[:-3]
shutil.copy(merged_path, temp_path)
po_grep(temp_path, merged_path, self.state_db.domain.red_filter)
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]