8febe10bcf6dc2e6f5f150ec4ecd0e60fb24d6c0
[be.git] / interfaces / email / interactive / send_pgp_mime.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2009-2012 Chris Ball <cjb@laptop.org>
4 #                         Gianluca Montecchi <gian@grys.it>
5 #                         W. Trevor King <wking@tremily.us>
6 #
7 # This file is part of Bugs Everywhere.
8 #
9 # Bugs Everywhere is free software: you can redistribute it and/or modify it
10 # under the terms of the GNU General Public License as published by the Free
11 # Software Foundation, either version 2 of the License, or (at your option) any
12 # later version.
13 #
14 # Bugs Everywhere is distributed in the hope that it will be useful, but
15 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
16 # FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
17 # more details.
18 #
19 # You should have received a copy of the GNU General Public License along with
20 # Bugs Everywhere.  If not, see <http://www.gnu.org/licenses/>.
21 """
22 Python module and command line tool for sending pgp/mime email.
23
24 Mostly uses subprocess to call gpg and a sendmail-compatible mailer.
25 If you lack gpg, either don't use the encryption functions or adjust
26 the pgp_* commands.  You may need to adjust the sendmail command to
27 point to whichever sendmail-compatible mailer you have on your system.
28 """
29
30 from cStringIO import StringIO
31 import os
32 import re
33 #import GnuPGInterface # Maybe should use this instead of subprocess
34 import smtplib
35 import subprocess
36 import sys
37 import tempfile
38 import types
39
40 try:
41     from email import Message
42     from email.mime.text import MIMEText
43     from email.mime.multipart import MIMEMultipart
44     from email.mime.application import MIMEApplication
45     from email.encoders import encode_7or8bit
46     from email.generator import Generator
47     from email.parser import Parser
48     from email.utils import getaddress
49 except ImportError:
50     # adjust to old python 2.4
51     from email import Message
52     from email.MIMEText import MIMEText
53     from email.MIMEMultipart import MIMEMultipart
54     from email.MIMENonMultipart import MIMENonMultipart
55     from email.Encoders import encode_7or8bit
56     from email.Generator import Generator
57     from email.Parser import Parser
58     from email.Utils import getaddresses
59
60     getaddress = getaddresses
61     class MIMEApplication (MIMENonMultipart):
62         def __init__(self, _data, _subtype, _encoder, **params):
63             MIMENonMultipart.__init__(self, 'application', _subtype, **params)
64             self.set_payload(_data)
65             _encoder(self)
66
67 usage="""usage: %prog [options]
68
69 Scriptable PGP MIME email using gpg.
70
71 You can use gpg-agent for passphrase caching if your key requires a
72 passphrase (it better!).  Example usage would be to install gpg-agent,
73 and then run
74   export GPG_TTY=`tty`
75   eval $(gpg-agent --daemon)
76 in your shell before invoking this script.  See gpg-agent(1) for more
77 details.  Alternatively, you can send your passphrase in on stdin
78   echo 'passphrase' | %prog [options]
79 or use the --passphrase-file option
80   %prog [options] --passphrase-file FILE [more options]
81 Both of these alternatives are much less secure than gpg-agent.  You
82 have been warned.
83 """
84
85 verboseInvoke = False
86 PGP_SIGN_AS = None
87 PASSPHRASE = None
88
89 # The following commands are adapted from my .mutt/pgp configuration
90 #
91 # Printf-like sequences:
92 #   %a The value of PGP_SIGN_AS.
93 #   %f Expands to the name of a file with text to be signed/encrypted.
94 #   %p Expands to the passphrase argument.
95 #   %R A string with some number (0 on up) of pgp_reciepient_arg
96 #      strings.
97 #   %r One key ID (e.g. recipient email address) to build a
98 #      pgp_reciepient_arg string.
99 #
100 # The above sequences can be used to optionally print a string if
101 # their length is nonzero. For example, you may only want to pass the
102 # -u/--local-user argument to gpg if PGP_SIGN_AS is defined.  To
103 # optionally print a string based upon one of the above sequences, the
104 # following construct is used
105 #   %?<sequence_char>?<optional_string>?
106 # where sequence_char is a character from the table above, and
107 # optional_string is the string you would like printed if status_char
108 # is nonzero. optional_string may contain other sequence as well as
109 # normal text, but it may not contain any question marks.
110 #
111 # see http://codesorcery.net/old/mutt/mutt-gnupg-howto
112 #     http://www.mutt.org/doc/manual/manual-6.html#pgp_autosign
113 #     http://tldp.org/HOWTO/Mutt-GnuPG-PGP-HOWTO-8.html
114 # for more details
115
116 pgp_recipient_arg='-r "%r"'
117 pgp_stdin_passphrase_arg='--passphrase-fd 0'
118 pgp_sign_command='/usr/bin/gpg --no-verbose --quiet --batch %p --output - --detach-sign --armor --textmode %?a?-u "%a"? %f'
119 pgp_encrypt_only_command='/usr/bin/gpg --no-verbose --quiet --batch --output - --encrypt --armor --textmode --always-trust --encrypt-to "%a" %R -- %f'
120 pgp_encrypt_sign_command='/usr/bin/gpg --no-verbose --quiet --batch %p --output - --encrypt --sign %?a?-u "%a"? --armor --textmode --always-trust --encrypt-to "%a" %R -- %f'
121 sendmail='/usr/sbin/sendmail -t'
122
123 def mail(msg, sendmail=None):
124     """
125     Send an email Message instance on its merry way.
126
127     We can shell out to the user specified sendmail in case
128     the local host doesn't have an SMTP server set up
129     for easy smtplib usage.
130     """
131     if sendmail != None:
132         execute(sendmail, stdin=flatten(msg))
133         return None
134     s = smtplib.SMTP()
135     s.connect()
136     s.sendmail(from_addr=source_email(msg),
137                to_addrs=target_emails(msg),
138                msg=flatten(msg))
139     s.close()
140
141 def header_from_text(text, encoding="us-ascii"):
142     """
143     Simple wrapper for instantiating an email.Message from text.
144     >>> header = header_from_text('\\n'.join(['From: me@big.edu','To: you@big.edu','Subject: testing']))
145     >>> print flatten(header)
146     From: me@big.edu
147     To: you@big.edu
148     Subject: testing
149     <BLANKLINE>
150     <BLANKLINE>
151     """
152     text = text.strip()
153     if type(text) == types.UnicodeType:
154         text = text.encode(encoding)
155     # assume StringType arguments are already encoded
156     p = Parser()
157     return p.parsestr(text, headersonly=True)
158
159 def guess_encoding(text):
160     if type(text) == types.StringType:
161         encoding = "us-ascii"
162     elif type(text) == types.UnicodeType:
163         for encoding in ["us-ascii", "iso-8859-1", "utf-8"]:
164             try:
165                 text.encode(encoding)
166             except UnicodeError:
167                 pass
168             else:
169                 break
170         assert encoding != None
171     return encoding
172
173 def encodedMIMEText(body, encoding=None):
174     if encoding == None:
175         encoding = guess_encoding(body)
176     if encoding == "us-ascii":
177         return MIMEText(body)
178     else:
179         # Create the message ('plain' stands for Content-Type: text/plain)
180         return MIMEText(body.encode(encoding), 'plain', encoding)
181
182 def append_text(text_part, new_text):
183     original_payload = text_part.get_payload(decode=True)
184     new_payload = u"%s%s" % (original_payload, new_text)
185     new_encoding = guess_encoding(new_payload)
186     text_part.set_payload(new_payload.encode(new_encoding), new_encoding)
187
188 def attach_root(header, root_part):
189     """
190     Attach the email.Message root_part to the email.Message header
191     without generating a multi-part message.
192     """
193     for k,v in header.items():
194         root_part[k] = v
195     return root_part    
196
197 def execute(args, stdin=None, expect=(0,)):
198     """
199     Execute a command (allows us to drive gpg).
200     """
201     if verboseInvoke == True:
202         print >> sys.stderr, '$ '+args
203     try:
204         p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, close_fds=True)
205     except OSError, e:
206         strerror = '%s\nwhile executing %s' % (e.args[1], args)
207         raise Exception, strerror
208     output, error = p.communicate(input=stdin)
209     status = p.wait()
210     if verboseInvoke == True:
211         print >> sys.stderr, '(status: %d)\n%s%s' % (status, output, error)
212     if status not in expect:
213         strerror = '%s\nwhile executing %s\n%s\n%d' % (args[1], args, error, status)
214         raise Exception, strerror
215     return status, output, error
216
217 def replace(template, format_char, replacement_text):
218     """
219     >>> replace('--textmode %?a?-u %a? %f', 'f', 'file.in')
220     '--textmode %?a?-u %a? file.in'
221     >>> replace('--textmode %?a?-u %a? %f', 'a', '0xHEXKEY')
222     '--textmode -u 0xHEXKEY %f'
223     >>> replace('--textmode %?a?-u %a? %f', 'a', '')
224     '--textmode  %f'
225     """
226     if replacement_text == None:
227         replacement_text = ""
228     regexp = re.compile('%[?]'+format_char+'[?]([^?]*)[?]')
229     if len(replacement_text) > 0:
230         str = regexp.sub('\g<1>', template)
231     else:
232         str = regexp.sub('', template)
233     regexp = re.compile('%'+format_char)
234     str = regexp.sub(replacement_text, str)
235     return str
236
237 def flatten(msg, to_unicode=False):
238     """
239     Produce flat text output from an email Message instance.
240     """
241     assert msg != None
242     fp = StringIO()
243     g = Generator(fp, mangle_from_=False)
244     g.flatten(msg)
245     text = fp.getvalue()
246     if to_unicode == True:
247         encoding = msg.get_content_charset() or "utf-8"
248         text = unicode(text, encoding=encoding)
249     return text
250
251 def source_email(msg, return_realname=False):
252     """
253     Search the header of an email Message instance to find the
254     sender's email address.
255     """
256     froms = msg.get_all('from', [])
257     from_tuples = getaddresses(froms) # [(realname, email_address), ...]
258     assert len(from_tuples) == 1
259     if return_realname == True:
260         return from_tuples[0] # (realname, email_address)
261     return from_tuples[0][1]  # email_address
262
263 def target_emails(msg):
264     """
265     Search the header of an email Message instance to find a
266     list of recipient's email addresses.
267     """
268     tos = msg.get_all('to', [])
269     ccs = msg.get_all('cc', [])
270     bccs = msg.get_all('bcc', [])
271     resent_tos = msg.get_all('resent-to', [])
272     resent_ccs = msg.get_all('resent-cc', [])
273     resent_bccs = msg.get_all('resent-bcc', [])
274     all_recipients = getaddresses(tos + ccs + bccs + resent_tos
275                                   + resent_ccs + resent_bccs)
276     return [addr[1] for addr in all_recipients]
277
278 class PGPMimeMessageFactory (object):
279     """
280     See http://www.ietf.org/rfc/rfc3156.txt for specification details.
281     >>> from_addr = "me@big.edu"
282     >>> to_addr = "you@you.edu"
283     >>> header = header_from_text('\\n'.join(['From: %s'%from_addr,'To: %s'%to_addr,'Subject: testing']))
284     >>> source_email(header) == from_addr
285     True
286     >>> target_emails(header) == [to_addr]
287     True
288     >>> m = PGPMimeMessageFactory('check 1 2\\ncheck 1 2\\n')
289     >>> print flatten(m.clearBodyPart())
290     Content-Type: text/plain; charset="us-ascii"
291     MIME-Version: 1.0
292     Content-Transfer-Encoding: 7bit
293     Content-Disposition: inline
294     <BLANKLINE>
295     check 1 2
296     check 1 2
297     <BLANKLINE>
298     >>> print flatten(m.plain())
299     Content-Type: text/plain; charset="us-ascii"
300     MIME-Version: 1.0
301     Content-Transfer-Encoding: 7bit
302     <BLANKLINE>
303     check 1 2
304     check 1 2
305     <BLANKLINE>
306     >>> signed = m.sign(header)
307     >>> signed.set_boundary('boundsep')
308     >>> print flatten(signed).replace('\\t', ' '*4) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
309     Content-Type: multipart/signed; protocol="application/pgp-signature";
310         micalg="pgp-sha1"; boundary="boundsep"
311     MIME-Version: 1.0
312     Content-Disposition: inline
313     <BLANKLINE>
314     --boundsep
315     Content-Type: text/plain; charset="us-ascii"
316     MIME-Version: 1.0
317     Content-Transfer-Encoding: 7bit
318     Content-Disposition: inline
319     <BLANKLINE>
320     check 1 2
321     check 1 2
322     <BLANKLINE>
323     --boundsep
324     MIME-Version: 1.0
325     Content-Transfer-Encoding: 7bit
326     Content-Description: signature
327     Content-Type: application/pgp-signature; name="signature.asc";
328         charset="us-ascii"
329     <BLANKLINE>
330     -----BEGIN PGP SIGNATURE-----
331     ...
332     -----END PGP SIGNATURE-----
333     <BLANKLINE>
334     --boundsep--
335     >>> encrypted = m.encrypt(header)
336     >>> encrypted.set_boundary('boundsep')
337     >>> print flatten(encrypted).replace('\\t', ' '*4) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
338     Content-Type: multipart/encrypted;
339         protocol="application/pgp-encrypted";
340         micalg="pgp-sha1"; boundary="boundsep"
341     MIME-Version: 1.0
342     Content-Disposition: inline
343     <BLANKLINE>
344     --boundsep
345     Content-Type: application/pgp-encrypted
346     MIME-Version: 1.0
347     Content-Transfer-Encoding: 7bit
348     <BLANKLINE>
349     Version: 1
350     <BLANKLINE>
351     --boundsep
352     MIME-Version: 1.0
353     Content-Transfer-Encoding: 7bit
354     Content-Type: application/octet-stream; charset="us-ascii"
355     <BLANKLINE>
356     -----BEGIN PGP MESSAGE-----
357     ...
358     -----END PGP MESSAGE-----
359     <BLANKLINE>
360     --boundsep--
361     >>> signedAndEncrypted = m.signAndEncrypt(header)
362     >>> signedAndEncrypted.set_boundary('boundsep')
363     >>> print flatten(signedAndEncrypted).replace('\\t', ' '*4) # doctest: +ELLIPSIS, +NORMALIZE_WHITESPACE
364     Content-Type: multipart/encrypted;
365         protocol="application/pgp-encrypted";
366         micalg="pgp-sha1"; boundary="boundsep"
367     MIME-Version: 1.0
368     Content-Disposition: inline
369     <BLANKLINE>
370     --boundsep
371     Content-Type: application/pgp-encrypted
372     MIME-Version: 1.0
373     Content-Transfer-Encoding: 7bit
374     <BLANKLINE>
375     Version: 1
376     <BLANKLINE>
377     --boundsep
378     MIME-Version: 1.0
379     Content-Transfer-Encoding: 7bit
380     Content-Type: application/octet-stream; charset="us-ascii"
381     <BLANKLINE>
382     -----BEGIN PGP MESSAGE-----
383     ...
384     -----END PGP MESSAGE-----
385     <BLANKLINE>
386     --boundsep--
387     """
388     def __init__(self, body):
389         self.body = body
390     def clearBodyPart(self):
391         body = encodedMIMEText(self.body)
392         body.add_header('Content-Disposition', 'inline')
393         return body
394     def passphrase_arg(self, passphrase=None):
395         if passphrase == None and PASSPHRASE != None:
396             passphrase = PASSPHRASE
397         if passphrase == None:
398             return (None,'')
399         return (passphrase, pgp_stdin_passphrase_arg)
400     def plain(self):
401         """
402         text/plain
403         """
404         return encodedMIMEText(self.body)
405     def sign(self, header, passphrase=None):
406         """
407         multipart/signed
408           +-> text/plain                 (body)
409           +-> application/pgp-signature  (signature)
410         """
411         passphrase,pass_arg = self.passphrase_arg(passphrase)
412         body = self.clearBodyPart()
413         bfile = tempfile.NamedTemporaryFile()
414         bfile.write(flatten(body))
415         bfile.flush()
416
417         args = replace(pgp_sign_command, 'f', bfile.name)
418         if PGP_SIGN_AS == None:
419             pgp_sign_as = '<%s>' % source_email(header)
420         else:
421             pgp_sign_as = PGP_SIGN_AS
422         args = replace(args, 'a', pgp_sign_as)
423         args = replace(args, 'p', pass_arg)
424         status,output,error = execute(args, stdin=passphrase)
425         signature = output
426
427         sig = MIMEApplication(_data=signature,
428                               _subtype='pgp-signature; name="signature.asc"',
429                               _encoder=encode_7or8bit)
430         sig['Content-Description'] = 'signature'
431         sig.set_charset('us-ascii')
432
433         msg = MIMEMultipart('signed', micalg='pgp-sha1',
434                             protocol='application/pgp-signature')
435         msg.attach(body)
436         msg.attach(sig)
437
438         msg['Content-Disposition'] = 'inline'
439         return msg
440     def encrypt(self, header, passphrase=None):
441         """
442         multipart/encrypted
443          +-> application/pgp-encrypted  (control information)
444          +-> application/octet-stream   (body)
445         """
446         body = self.clearBodyPart()
447         bfile = tempfile.NamedTemporaryFile()
448         bfile.write(flatten(body))
449         bfile.flush()
450
451         recipients = [replace(pgp_recipient_arg, 'r', recipient)
452                       for recipient in target_emails(header)]
453         recipient_string = ' '.join(recipients)
454         args = replace(pgp_encrypt_only_command, 'R', recipient_string)
455         args = replace(args, 'f', bfile.name)
456         if PGP_SIGN_AS == None:
457             pgp_sign_as = '<%s>' % source_email(header)
458         else:
459             pgp_sign_as = PGP_SIGN_AS
460         args = replace(args, 'a', pgp_sign_as)
461         status,output,error = execute(args)
462         encrypted = output
463
464         enc = MIMEApplication(_data=encrypted, _subtype='octet-stream',
465                               _encoder=encode_7or8bit)
466         enc.set_charset('us-ascii')
467
468         control = MIMEApplication(_data='Version: 1\n', _subtype='pgp-encrypted',
469                                   _encoder=encode_7or8bit)
470
471         msg = MIMEMultipart('encrypted', micalg='pgp-sha1',
472                             protocol='application/pgp-encrypted')
473         msg.attach(control)
474         msg.attach(enc)
475
476         msg['Content-Disposition'] = 'inline'
477         return msg
478     def signAndEncrypt(self, header, passphrase=None):
479         """
480         multipart/encrypted
481          +-> application/pgp-encrypted  (control information)
482          +-> application/octet-stream   (body)
483         """
484         passphrase,pass_arg = self.passphrase_arg(passphrase)
485         body = self.sign(header, passphrase)
486         body.__delitem__('Bcc')
487         bfile = tempfile.NamedTemporaryFile()
488         bfile.write(flatten(body))
489         bfile.flush()
490
491         recipients = [replace(pgp_recipient_arg, 'r', recipient)
492                       for recipient in target_emails(header)]
493         recipient_string = ' '.join(recipients)
494         args = replace(pgp_encrypt_only_command, 'R', recipient_string)
495         args = replace(args, 'f', bfile.name)
496         if PGP_SIGN_AS == None:
497             pgp_sign_as = '<%s>' % source_email(header)
498         else:
499             pgp_sign_as = PGP_SIGN_AS
500         args = replace(args, 'a', pgp_sign_as)
501         args = replace(args, 'p', pass_arg)
502         status,output,error = execute(args, stdin=passphrase)
503         encrypted = output
504
505         enc = MIMEApplication(_data=encrypted, _subtype='octet-stream',
506                               _encoder=encode_7or8bit)
507         enc.set_charset('us-ascii')
508
509         control = MIMEApplication(_data='Version: 1\n',
510                                   _subtype='pgp-encrypted',
511                                   _encoder=encode_7or8bit)
512
513         msg = MIMEMultipart('encrypted', micalg='pgp-sha1',
514                             protocol='application/pgp-encrypted')
515         msg.attach(control)
516         msg.attach(enc)
517
518         msg['Content-Disposition'] = 'inline'
519         return msg
520
521 def test():
522     import doctest
523     doctest.testmod()
524
525
526 if __name__ == '__main__':
527     from optparse import OptionParser
528
529     parser = OptionParser(usage=usage)
530     parser.add_option('-t', '--test', dest='test', action='store_true',
531                       help='Run doctests and exit')
532
533     parser.add_option('-H', '--header-file', dest='header_filename',
534                       help='file containing email header', metavar='FILE')
535     parser.add_option('-B', '--body-file', dest='body_filename',
536                       help='file containing email body', metavar='FILE')
537
538     parser.add_option('-P', '--passphrase-file', dest='passphrase_file',
539                       help='file containing gpg passphrase', metavar='FILE')
540     parser.add_option('-p', '--passphrase-fd', dest='passphrase_fd',
541                       help='file descriptor from which to read gpg passphrase (0 for stdin)',
542                       type="int", metavar='DESCRIPTOR')
543
544     parser.add_option('--mode', dest='mode', default='sign',
545                       help="One of 'sign', 'encrypt', 'sign-encrypt', or 'plain'.  Defaults to %default.",
546                       metavar='MODE')
547
548     parser.add_option('-a', '--sign-as', dest='sign_as',
549                       help="The gpg key to sign with (gpg's -u/--local-user)",
550                       metavar='KEY')
551
552     parser.add_option('--output', dest='output', action='store_true',
553                       help="Don't mail the generated message, print it to stdout instead.")
554
555     (options, args) = parser.parse_args()
556
557     stdin_used = False
558
559     if options.passphrase_file != None:
560         PASSPHRASE = file(options.passphrase_file, 'r').read()
561     elif options.passphrase_fd != None:
562         if options.passphrase_fd == 0:
563             stdin_used = True
564             PASSPHRASE = sys.stdin.read()
565         else:
566             PASSPHRASE = os.read(options.passphrase_fd)
567
568     if options.sign_as:
569         PGP_SIGN_AS = options.sign_as
570
571     if options.test == True:
572         test()
573         sys.exit(0)
574
575     header = None
576     if options.header_filename != None:
577         if options.header_filename == '-':
578             assert stdin_used == False
579             stdin_used = True
580             header = sys.stdin.read()
581         else:
582             header = file(options.header_filename, 'r').read()
583     if header == None:
584         raise Exception, "missing header"
585     headermsg = header_from_text(header)
586     body = None
587     if options.body_filename != None:
588         if options.body_filename == '-':
589             assert stdin_used == False
590             stdin_used = True
591             body = sys.stdin.read()
592         else:
593             body = file(options.body_filename, 'r').read()
594     if body == None:
595         raise Exception, "missing body"
596
597     m = PGPMimeMessageFactory(body)
598     if options.mode == "sign":
599         bodymsg = m.sign(header)
600     elif options.mode == "encrypt":
601         bodymsg = m.encrypt(header)
602     elif options.mode == "sign-encrypt":
603         bodymsg = m.signAndEncrypt(header)
604     elif options.mode == "plain":
605         bodymsg = m.plain()
606     else:
607         raise Exception("unrecognized mode '%s'" % options.mode)
608
609     message = attach_root(headermsg, bodymsg)
610     if options.output == True:
611         message = flatten(message)
612         print message
613     else:
614         mail(message, sendmail)