xed/plugins/snippets/snippets/Placeholder.py

701 lines
26 KiB
Python
Executable File

# Pluma snippets plugin
# Copyright (C) 2005-2006 Jesse van den Kieboom <jesse@icecrew.nl>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; either version 2 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
import traceback
import re
import os
import sys
import signal
import select
import locale
import subprocess
from SubstitutionParser import SubstitutionParser
import gobject
from Helper import *
# These are places in a view where the cursor can go and do things
class Placeholder:
def __init__(self, view, tabstop, defaults, begin):
self.ok = True
self.done = False
self.buf = view.get_buffer()
self.view = view
self.has_references = False
self.mirrors = []
self.leave_mirrors = []
self.tabstop = tabstop
self.set_default(defaults)
self.prev_contents = self.default
self.set_mark_gravity()
if begin:
self.begin = self.buf.create_mark(None, begin, self.mark_gravity[0])
else:
self.begin = None
self.end = None
def __str__(self):
return '%s (%s)' % (str(self.__class__), str(self.default))
def set_mark_gravity(self):
self.mark_gravity = [True, False]
def set_default(self, defaults):
self.default = None
self.defaults = []
if not defaults:
return
for d in defaults:
dm = self.expand_environment(d)
if dm:
self.defaults.append(dm)
if not self.default:
self.default = dm
if dm != d:
break
def literal(self, s):
return repr(s)
def format_environment(self, s):
return s
def re_environment(self, m):
if m.group(1) or not m.group(2) in os.environ:
return '$' + m.group(2)
else:
return self.format_environment(os.environ[m.group(2)])
def expand_environment(self, text):
if not text:
return text
return re.sub('(\\\\)?\\$([A-Z_]+)', self.re_environment, text)
def get_iter(self, mark):
if mark and not mark.get_deleted():
return self.buf.get_iter_at_mark(mark)
else:
return None
def begin_iter(self):
return self.get_iter(self.begin)
def end_iter(self):
return self.get_iter(self.end)
def run_last(self, placeholders):
begin = self.begin_iter()
self.end = self.buf.create_mark(None, begin, self.mark_gravity[1])
if self.default:
insert_with_indent(self.view, begin, self.default, False, self)
def remove(self, force = False):
if self.begin and not self.begin.get_deleted():
self.buf.delete_mark(self.begin)
if self.end and not self.end.get_deleted():
self.buf.delete_mark(self.end)
# Do something on beginning this placeholder
def enter(self):
if not self.begin or self.begin.get_deleted():
return
self.buf.move_mark(self.buf.get_insert(), self.begin_iter())
if self.end:
self.buf.move_mark(self.buf.get_selection_bound(), self.end_iter())
else:
self.buf.move_mark(self.buf.get_selection_bound(), self.begin_iter())
def get_text(self):
if self.begin and self.end:
biter = self.begin_iter()
eiter = self.end_iter()
if biter and eiter:
return self.buf.get_text(self.begin_iter(), self.end_iter())
else:
return ''
else:
return ''
def add_mirror(self, mirror, onleave = False):
mirror.has_references = True
if onleave:
self.leave_mirrors.append(mirror)
else:
self.mirrors.append(mirror)
def set_text(self, text):
if self.begin.get_deleted() or self.end.get_deleted():
return
# Set from self.begin to self.end to text!
self.buf.begin_user_action()
# Remove everything between self.begin and self.end
begin = self.begin_iter()
self.buf.delete(begin, self.end_iter())
# Insert the text from the mirror
insert_with_indent(self.view, begin, text, True, self)
self.buf.end_user_action()
self.update_contents()
def update_contents(self):
prev = self.prev_contents
self.prev_contents = self.get_text()
if prev != self.get_text():
for mirror in self.mirrors:
if not mirror.update(self):
return
def update_leave_mirrors(self):
# Notify mirrors
for mirror in self.leave_mirrors:
if not mirror.update(self):
return
# Do something on ending this placeholder
def leave(self):
self.update_leave_mirrors()
def find_mirrors(self, text, placeholders):
mirrors = []
while (True):
m = re.search('(\\\\)?\\$(?:{([0-9]+)}|([0-9]+))', text)
if not m:
break
# Skip escaped mirrors
if m.group(1):
text = text[m.end():]
continue
tabstop = int(m.group(2) or m.group(3))
if tabstop in placeholders:
if not tabstop in mirrors:
mirrors.append(tabstop)
text = text[m.end():]
else:
self.ok = False
return None
return mirrors
# This is an placeholder which inserts a mirror of another Placeholder
class PlaceholderMirror(Placeholder):
def __init__(self, view, tabstop, begin):
Placeholder.__init__(self, view, -1, None, begin)
self.mirror_stop = tabstop
def update(self, mirror):
self.set_text(mirror.get_text())
return True
def run_last(self, placeholders):
Placeholder.run_last(self, placeholders)
if self.mirror_stop in placeholders:
mirror = placeholders[self.mirror_stop]
mirror.add_mirror(self)
if mirror.default:
self.set_text(mirror.default)
else:
self.ok = False
# This placeholder indicates the end of a snippet
class PlaceholderEnd(Placeholder):
def __init__(self, view, begin, default):
Placeholder.__init__(self, view, 0, default, begin)
def run_last(self, placeholders):
Placeholder.run_last(self, placeholders)
# Remove the begin mark and set the begin mark
# to the end mark, this is needed so the end placeholder won't contain
# any text
if not self.default:
self.mark_gravity[0] = False
self.buf.delete_mark(self.begin)
self.begin = self.buf.create_mark(None, self.end_iter(), self.mark_gravity[0])
def enter(self):
if self.begin and not self.begin.get_deleted():
self.buf.move_mark(self.buf.get_insert(), self.begin_iter())
if self.end and not self.end.get_deleted():
self.buf.move_mark(self.buf.get_selection_bound(), self.end_iter())
def leave(self):
self.enter()
# This placeholder is used to expand a command with embedded mirrors
class PlaceholderExpand(Placeholder):
def __init__(self, view, tabstop, begin, s):
Placeholder.__init__(self, view, tabstop, None, begin)
self.mirror_text = {0: ''}
self.timeout_id = None
self.cmd = s
self.instant_update = False
def __str__(self):
s = Placeholder.__str__(self)
return s + ' ' + self.cmd
def get_mirrors(self, placeholders):
return self.find_mirrors(self.cmd, placeholders)
# Check if all substitution placeholders are accounted for
def run_last(self, placeholders):
Placeholder.run_last(self, placeholders)
self.ok = True
mirrors = self.get_mirrors(placeholders)
if mirrors:
allDefault = True
for mirror in mirrors:
p = placeholders[mirror]
p.add_mirror(self, not self.instant_update)
self.mirror_text[p.tabstop] = p.default
if not p.default and not isinstance(p, PlaceholderExpand):
allDefault = False
if allDefault:
self.update(None)
self.default = self.get_text() or None
else:
self.update(None)
self.default = self.get_text() or None
if self.tabstop == -1:
self.done = True
def re_placeholder(self, m, formatter):
if m.group(1):
return '"$' + m.group(2) + '"'
else:
if m.group(3):
index = int(m.group(3))
else:
index = int(m.group(4))
return formatter(self.mirror_text[index])
def remove_timeout(self):
if self.timeout_id != None:
gobject.source_remove(self.timeout_id)
self.timeout_id = None
def install_timeout(self):
self.remove_timeout()
self.timeout_id = gobject.timeout_add(1000, self.timeout_cb)
def timeout_cb(self):
self.timeout_id = None
return False
def format_environment(self, text):
return self.literal(text)
def substitute(self, text, formatter = None):
formatter = formatter or self.literal
# substitute all mirrors, but also environmental variables
text = re.sub('(\\\\)?\\$({([0-9]+)}|([0-9]+))', lambda m: self.re_placeholder(m, formatter),
text)
return self.expand_environment(text)
def run_update(self):
text = self.substitute(self.cmd)
if text:
ret = self.expand(text)
if ret:
self.update_leave_mirrors()
else:
ret = True
return ret
def update(self, mirror):
text = None
if mirror:
self.mirror_text[mirror.tabstop] = mirror.get_text()
# Check if all substitutions have been made
for tabstop in self.mirror_text:
if tabstop == 0:
continue
if self.mirror_text[tabstop] == None:
return False
return self.run_update()
def expand(self, text):
return True
# The shell placeholder executes commands in a subshell
class PlaceholderShell(PlaceholderExpand):
def __init__(self, view, tabstop, begin, s):
PlaceholderExpand.__init__(self, view, tabstop, begin, s)
self.shell = None
self.remove_me = False
def close_shell(self):
self.shell.stdout.close()
self.shell = None
def timeout_cb(self):
PlaceholderExpand.timeout_cb(self)
self.remove_timeout()
if not self.shell:
return False
gobject.source_remove(self.watch_id)
self.close_shell()
if self.remove_me:
PlaceholderExpand.remove(self)
message_dialog(None, gtk.MESSAGE_ERROR, 'Execution of the shell ' \
'command (%s) exceeded the maximum time; ' \
'execution aborted.' % self.command)
return False
def process_close(self):
self.close_shell()
self.remove_timeout()
self.set_text(str.join('', self.shell_output).rstrip('\n'))
if self.default == None:
self.default = self.get_text()
self.leave()
if self.remove_me:
PlaceholderExpand.remove(self, True)
def process_cb(self, source, condition):
if condition & gobject.IO_IN:
line = source.readline()
if len(line) > 0:
try:
line = unicode(line, 'utf-8')
except:
line = unicode(line, locale.getdefaultlocale()[1],
'replace')
self.shell_output += line
self.install_timeout()
return True
self.process_close()
return False
def literal_replace(self, match):
return "\\%s" % (match.group(0))
def literal(self, text):
return '"' + re.sub('([\\\\"])', self.literal_replace, text) + '"'
def expand(self, text):
self.remove_timeout()
if self.shell:
gobject.source_remove(self.watch_id)
self.close_shell()
popen_args = {
'cwd' : None,
'shell': True,
'env' : os.environ,
'stdout': subprocess.PIPE
}
self.command = text
self.shell = subprocess.Popen(text, **popen_args)
self.shell_output = ''
self.watch_id = gobject.io_add_watch(self.shell.stdout, gobject.IO_IN | \
gobject.IO_HUP, self.process_cb)
self.install_timeout()
return True
def remove(self, force = False):
if not force and self.shell:
# Still executing shell command
self.remove_me = True
else:
if force:
self.remove_timeout()
if self.shell:
self.close_shell()
PlaceholderExpand.remove(self, force)
class TimeoutError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
# The python placeholder evaluates commands in python
class PlaceholderEval(PlaceholderExpand):
def __init__(self, view, tabstop, refs, begin, s, namespace):
PlaceholderExpand.__init__(self, view, tabstop, begin, s)
self.fdread = 0
self.remove_me = False
self.namespace = namespace
self.refs = []
if refs:
for ref in refs:
self.refs.append(int(ref.strip()))
def get_mirrors(self, placeholders):
mirrors = PlaceholderExpand.get_mirrors(self, placeholders)
if not self.ok:
return None
for ref in self.refs:
if ref in placeholders:
if ref not in mirrors:
mirrors.append(ref)
else:
self.ok = False
return None
return mirrors
# SIGALRM is not supported on all platforms (e.g. windows). Timeout
# with SIGALRM will not be used on those platforms. This will
# potentially block pluma if you have a placeholder which gets stuck,
# but it's better than not supporting them at all. At some point we
# might have proper thread support and we can fix this in a better way
def timeout_supported(self):
return hasattr(signal, 'SIGALRM')
def timeout_cb(self, signum = 0, frame = 0):
raise TimeoutError, "Operation timed out (>2 seconds)"
def install_timeout(self):
if not self.timeout_supported():
return
if self.timeout_id != None:
self.remove_timeout()
self.timeout_id = signal.signal(signal.SIGALRM, self.timeout_cb)
signal.alarm(2)
def remove_timeout(self):
if not self.timeout_supported():
return
if self.timeout_id != None:
signal.alarm(0)
signal.signal(signal.SIGALRM, self.timeout_id)
self.timeout_id = None
def expand(self, text):
self.remove_timeout()
text = text.strip()
self.command = text
if not self.command or self.command == '':
self.set_text('')
return
text = "def process_snippet():\n\t" + "\n\t".join(text.split("\n"))
if 'process_snippet' in self.namespace:
del self.namespace['process_snippet']
try:
exec text in self.namespace
except:
traceback.print_exc()
if 'process_snippet' in self.namespace:
try:
# Install a sigalarm signal. This is a HACK to make sure
# pluma doesn't get freezed by someone creating a python
# placeholder which for instance loops indefinately. Since
# the code is executed synchronously it will hang pluma. With
# the alarm signal we raise an exception and catch this
# (see below). We show an error message and return False.
# ___this is a HACK___ and should be fixed properly (I just
# don't know how)
self.install_timeout()
result = self.namespace['process_snippet']()
self.remove_timeout()
except TimeoutError:
self.remove_timeout()
message_dialog(None, gtk.MESSAGE_ERROR, \
_('Execution of the Python command (%s) exceeds the maximum ' \
'time, execution aborted.') % self.command)
return False
except Exception, detail:
self.remove_timeout()
message_dialog(None, gtk.MESSAGE_ERROR,
_('Execution of the Python command (%s) failed: %s') %
(self.command, detail))
return False
if result == None:
# sys.stderr.write("%s:\n>> %s\n" % (_('The following python code, run in a snippet, does not return a value'), "\n>> ".join(self.command.split("\n"))))
result = ''
self.set_text(str(result))
return True
# Regular expression placeholder
class PlaceholderRegex(PlaceholderExpand):
def __init__(self, view, tabstop, begin, inp, pattern, substitution, modifiers):
PlaceholderExpand.__init__(self, view, tabstop, begin, '')
self.instant_update = True
self.inp = inp
self.pattern = pattern
self.substitution = substitution
self.init_modifiers(modifiers)
def init_modifiers(self, modifiers):
mods = {'I': re.I,
'L': re.L,
'M': re.M,
'S': re.S,
'U': re.U,
'X': re.X}
self.modifiers = 0
for modifier in modifiers:
if modifier in mods:
self.modifiers |= mods[modifier]
def get_mirrors(self, placeholders):
mirrors = self.find_mirrors(self.pattern, placeholders) + self.find_mirrors(self.substitution, placeholders)
if isinstance(self.inp, int):
if self.inp not in placeholders:
self.ok = False
return None
elif self.inp not in mirrors:
mirrors.append(self.inp)
return mirrors
def literal(self, s):
return re.escape(s)
def get_input(self):
if isinstance(self.inp, int):
return self.mirror_text[self.inp]
elif self.inp in os.environ:
return os.environ[self.inp]
else:
return ''
def run_update(self):
pattern = self.substitute(self.pattern)
substitution = self.substitute(self.substitution, SubstitutionParser.escape_substitution)
if pattern:
return self.expand(pattern, substitution)
return True
def expand(self, pattern, substitution):
# Try to compile pattern
try:
regex = re.compile(pattern, self.modifiers)
except re.error, message:
sys.stderr.write('Could not compile regular expression: %s\n%s\n' % (pattern, message))
return False
inp = self.get_input()
match = regex.search(inp)
if not match:
self.set_text(inp)
else:
groups = match.groupdict()
idx = 0
for group in match.groups():
groups[str(idx + 1)] = group
idx += 1
groups['0'] = match.group(0)
parser = SubstitutionParser(substitution, groups)
self.set_text(parser.parse())
return True
# ex:ts=8:et: