#!/usr/bin/env python # # Copyright (C) 2011 W. Trevor King # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as # published by the Free Software Foundation, either version 3 of the # License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this program. If not, see # . """Simple pinentry program for getting pins from a terminal. """ import copy import logging import logging.handlers import os import os.path import pprint import re import signal import sys import termios import traceback __version__ = '0.1' # create logger logger = logging.getLogger('pinentry') logger.setLevel(logging.WARNING) _h = logging.handlers.SysLogHandler(address='/dev/log') _h.setLevel(logging.DEBUG) _f = logging.Formatter('%(name)s: %(levelname)s: %(message)s') _h.setFormatter(_f) logger.addHandler(_h) del _h, _f class PinEntry (object): """pinentry protocol server See the `Assuan manual`_ for a description of the protocol. .. _Assuan manual: http://www.gnupg.org/documentation/manuals/assuan/ """ _digit_regexp = re.compile(r'\d+') # from proc(5): pid comm state ppid pgrp session tty_nr tpgid _tpgrp_regexp = re.compile(r'\d+ \(\S+\) . \d+ \d+ \d+ \d+ (\d+)') _assuan_encode_regexp = re.compile( '(' + '|'.join(['%', '\r', '\n']) + ')') _assuan_decode_regexp = re.compile('(%[0-9A-F]{2})') def __init__(self): self.stop = False self.options = {} self.strings = {} self.connection = {} def run(self): logger.info('---opening pinentry---') logger.info('OK Your orders please') sys.stdout.write('OK Your orders please\n') sys.stdout.flush() try: while not self.stop: line = sys.stdin.readline() if not line: break # EOF line = line.rstrip() # dangerous? logger.info(line) line = self._decode(line) fields = line.split(' ', 1) cmd = fields[0] if len(fields) > 1: arg = fields[1] else: arg = None handle = getattr(self, '_handle_%s' % cmd, None) if handle: for response in handle(arg): response = self._encode(response) logger.info(response) sys.stdout.write(response+'\n') try: sys.stdout.flush() except IOError: if not self.stop: raise else: raise ValueError(line) finally: logger.info('---closing pinentry---') # user interface def _connect(self): logger.info('--connecting to user--') logger.debug('options:\n%s' % pprint.pformat(self.options)) tty_name = self.options.get('ttyname', None) if tty_name: self.connection['tpgrp'] = self._get_pgrp(tty_name) logger.info('open to-user output stream for %s' % tty_name) self.connection['to_user'] = open(tty_name, 'w') logger.info('open from-user input stream for %s' % tty_name) self.connection['from_user'] = open(tty_name, 'r') logger.info('get current termios line discipline') self.connection['original termios'] = termios.tcgetattr( self.connection['to_user']) # [iflag, oflag, cflag, lflag, ...] new_termios = copy.deepcopy(self.connection['original termios']) # translate carriage return to newline on input new_termios[0] |= termios.ICRNL # do not ignore carriage return on input new_termios[0] &= ~termios.IGNCR # do not echo input characters new_termios[3] &= ~termios.ECHO # echo input characters #new_termios[3] |= termios.ECHO # echo the NL character even if ECHO is not set new_termios[3] |= termios.ECHONL # enable canonical mode new_termios[3] |= termios.ICANON logger.info('adjust termios line discipline') termios.tcsetattr( self.connection['to_user'], termios.TCSANOW, new_termios) logger.info('send SIGSTOP to pgrp %d' % self.connection['tpgrp']) #os.killpg(self.connection['tpgrp'], signal.SIGSTOP) os.kill(-self.connection['tpgrp'], signal.SIGSTOP) self.connection['tpgrp stopped'] = True else: logger.info('no TTY name given; use stdin/stdout for I/O') self.connection['to_user'] = sys.stdout self.connection['from_user'] = sys.stdin logger.info('--connected to user--') self.connection['to_user'].write('\n') # give a clean line to work on self.connection['active'] = True def _disconnect(self): logger.info('--disconnecting from user--') try: if self.connection.get('original termios', None): logger.info('restore original termios line discipline') termios.tcsetattr( self.connection['to_user'], termios.TCSANOW, self.connection['original termios']) if self.connection.get('tpgrp stopped', None) is True: logger.info( 'send SIGCONT to pgrp %d' % self.connection['tpgrp']) #os.killpg(self.connection['tpgrp'], signal.SIGCONT) os.kill(-self.connection['tpgrp'], signal.SIGCONT) if self.connection.get('to_user', None) not in [None, sys.stdout]: logger.info('close to-user output stream') self.connection['to_user'].close() if self.connection.get('from_user', None) not in [None,sys.stdout]: logger.info('close from-user input stream') self.connection['from_user'].close() finally: self.connection = {'active': False} logger.info('--disconnected from user--') def _get_pgrp(self, tty_name): logger.info('find process group contolling %s' % tty_name) proc = '/proc' for name in os.listdir(proc): path = os.path.join(proc, name) if not (self._digit_regexp.match(name) and os.path.isdir(path)): continue # not a process directory logger.debug('checking process %s' % name) fd_path = os.path.join(path, 'fd', '0') try: link = os.readlink(fd_path) except OSError, e: logger.debug('not our process: %s' % e) continue # permission denied (not one of our processes) if link != tty_name: logger.debug('wrong tty: %s' % link) continue # not attached to our target tty stat_path = os.path.join(path, 'stat') stat = open(stat_path, 'r').read() logger.debug('check stat for pgrp: %s' % stat) match = self._tpgrp_regexp.match(stat) assert match != None, stat pgrp = int(match.group(1)) logger.info('found pgrp %d for %s' % (pgrp, tty_name)) return pgrp raise ValueError(tty_name) def _write(self, string): "Write text to the user's terminal." self.connection['to_user'].write(string + '\n') self.connection['to_user'].flush() def _read(self): "Read and return a line from the user's terminal." # drop trailing newline return self.connection['from_user'].readline()[:-1] def _prompt(self, prompt='?', add_colon=True): if add_colon: prompt += ':' self.connection['to_user'].write('%s ' % prompt) self.connection['to_user'].flush() return self._read() # Assuan utilities def _encode(self, string): """ >>> p = PinEntry() >>> p._encode('It grew by 5%!\\n') 'It grew by 5%25!%0A' """ return self._assuan_encode_regexp.sub( lambda x : self._to_hex(x.group()), string) def _decode(self, string): """ >>> p = PinEntry() >>> p._decode('%22Look out!%22%0AWhere%3F') '"Look out!"\\nWhere?' """ return self._assuan_decode_regexp.sub( lambda x : self._from_hex(x.group()), string) def _from_hex(self, code): """ >>> p = PinEntry() >>> p._from_hex('%22') '"' >>> p._from_hex('%0A') '\\n' """ return chr(int(code[1:], 16)) def _to_hex(self, char): """ >>> p = PinEntry() >>> p._to_hex('"') '%22' >>> p._to_hex('\\n') '%0A' """ return '%%%02X' % ord(char) # handlers def _handle_BYE(self, arg): self.stop = True yield 'OK closing connection' def _handle_OPTION(self, arg): # ttytype to set TERM fields = arg.split('=', 1) key = fields[0] if len(fields) > 1: value = fields[1] else: value = True self.options[key] = value yield 'OK' def _handle_GETINFO(self, arg): if arg == 'pid': yield 'D %d' % os.getpid() else: raise ValueError(arg) yield 'OK' def _handle_SETDESC(self, arg): self.strings['description'] = arg yield 'OK' def _handle_SETPROMPT(self, arg): self.strings['prompt'] = arg yield 'OK' def _handle_SETERROR(self, arg): self.strings['error'] = arg yield 'OK' def _handle_SETTITLE(self, arg): self.strings['title'] = arg yield 'OK' def _handle_SETOK(self, arg): self.strings['ok'] = arg yield 'OK' def _handle_SETCANCEL(self, arg): self.strings['cancel'] = arg yield 'OK' def _handle_SETNOTOK(self, arg): self.strings['not ok'] = arg yield 'OK' def _handle_SETQUALITYBAR(self, arg): """Adds a quality indicator to the GETPIN window. This indicator is updated as the passphrase is typed. The clients needs to implement an inquiry named "QUALITY" which gets passed the current passpharse (percent-plus escaped) and should send back a string with a single numerical vauelue between -100 and 100. Negative values will be displayed in red. If a custom label for the quality bar is required, just add that label as an argument as percent escaped string. You will need this feature to translate the label because pinentry has no internal gettext except for stock strings from the toolkit library. If you want to show a tooltip for the quality bar, you may use C: SETQUALITYBAR_TT string S: OK With STRING being a percent escaped string shown as the tooltip. """ raise NotImplementedError() def _handle_GETPIN(self, arg): try: self._connect() self._write(self.strings['description']) pin = self._prompt(self.strings['prompt'], add_colon=False) finally: self._disconnect() yield 'D %s' % pin yield 'OK' def _handle_CONFIRM(self, arg): try: self._connect() self._write(self.strings['description']) self._write('1) '+self.strings['ok']) self._write('2) '+self.strings['not ok']) value = self._prompt('?') finally: self._disconnect() if value == '1': yield 'OK' else: yield 'ASSUAN_Not_Confirmed' def _handle_MESSAGE(self, arg): self._write(self.strings['description']) yield 'OK' def _handle_CONFIRM(self, args): assert args == '--one-button', args try: self._connect() self._write(self.strings['description']) self._write('1) '+self.strings['ok']) value = self._prompt('?') finally: self._disconnect() assert value == '1', value yield 'OK' if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description=__doc__, version=__version__) parser.add_argument( '-V', '--verbose', action='count', default=0, help='increase verbosity') parser.add_argument( '--display', help='set X display (ignored by this implementation)') args = parser.parse_args() if args.verbose >= 2: logger.setLevel(logging.DEBUG) elif args.verbose >= 1: logger.setLevel(logging.INFO) try: p = PinEntry() p.run() except: logger.error('exiting due to exception:\n%s' % traceback.format_exc().rstrip()) raise