Ticket #131: upnp-tester.py

File upnp-tester.py, 7.7 kB (added by Lawrence, 4 years ago)

Patched copy so as will run under Windoze

Line 
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3
4 # Licensed under the MIT license
5 # http://opensource.org/licenses/mit-license.php
6
7 # Copyright 2008, Frank Scholz <coherence@beebits.net>
8
9 # upnp-tester.py
10 #
11 # very basic atm
12 #
13 # provides these functions:
14 #
15 # list           - display all devices
16 # extract <uuid> - extract device and service xml files and put them in a
17 #                  /tmp/<uuid> directory
18 # send <uuid>    - pack the before extracted xml files in a tar.gz and
19 #                  send them via email to the Coherence googlemail account
20 #
21
22 #LPK
23 #   instead of /tmp use gettempdir for platform temporary location.
24 #   use worker thread for windows command handling.
25 #
26 from tempfile import gettempdir
27 tmpdir = gettempdir()
28
29 import os, threading, sys
30
31 from sets import Set
32
33 from twisted.internet import stdio
34 from twisted.protocols import basic
35 from twisted.internet import protocol
36
37 from twisted.mail import smtp
38
39 from twisted.internet import reactor, defer
40 from twisted.web import client
41
42 from twisted.names import client as namesclient
43 from twisted.names import dns
44
45 from coherence.base import Coherence
46
47 import StringIO
48
49 class SMTPClient(smtp.ESMTPClient):
50
51     """ build an email message and send it to our googlemail account
52     """
53
54     def __init__(self, mail_from, mail_to, mail_subject, mail_file, *args, **kwargs):
55         smtp.ESMTPClient.__init__(self, *args, **kwargs)
56         self.mailFrom = mail_from
57         self.mailTo = mail_to
58         self.mailSubject = mail_subject
59         self.mail_file =  mail_file
60         self.mail_from =  mail_from
61
62     def getMailFrom(self):
63         result = self.mailFrom
64         self.mailFrom = None
65         return result
66
67     def getMailTo(self):
68         return [self.mailTo]
69
70     def getMailData(self):
71         from email.mime.application import MIMEApplication
72         from email.mime.multipart import MIMEMultipart
73
74         msg = MIMEMultipart()
75         msg['Subject'] = self.mailSubject
76         msg['From'] = self.mail_from
77         msg['To'] = self.mailTo
78         fp = open(self.mail_file, 'rb')
79         tar = MIMEApplication(fp.read(),'x-tar')
80         fp.close()
81         tar.add_header('Content-Disposition', 'attachment', filename=os.path.basename(self.mail_file))
82         msg.attach(tar)
83         return StringIO.StringIO(msg.as_string())
84
85     def sentMail(self, code, resp, numOk, addresses, log):
86         print 'Sent', numOk, 'messages'
87
88 class SMTPClientFactory(protocol.ClientFactory):
89     protocol = SMTPClient
90
91     def __init__(self, mail_from, mail_to, mail_subject, mail_file, *args, **kwargs):
92         self.mail_from = mail_from
93         self.mail_to = mail_to
94         self.mail_subject = mail_subject
95         self.mail_file = mail_file
96
97     def buildProtocol(self, addr):
98         return self.protocol(self.mail_from, self.mail_to,
99                              self.mail_subject, self.mail_file,
100                              secret=None, identity='localhost')
101
102
103 class UI(basic.LineReceiver):
104     from os import linesep as delimiter
105
106     def connectionMade(self):
107         self.print_prompt()
108
109     def lineReceived(self, line):
110         args = line.strip().split()
111         if args:
112             cmd = args[0].lower()
113             if hasattr(self, 'cmd_%s' % cmd):
114                 getattr(self, 'cmd_%s' % (cmd))(args[1:])
115             elif cmd == "?":
116                 self.cmd_help(args[1:])
117             else:
118                 self.transport.write("""Unknown command '%s'\n"""%(cmd))
119         self.print_prompt()
120
121     def cmd_help(self,args):
122         "help -- show help"
123         methods = Set([ getattr(self, x) for x in dir(self) if x[:4] == "cmd_" ])
124         self.transport.write("Commands:\n")
125         for method in methods:
126             if hasattr(method, '__doc__'):
127                 self.transport.write("%s\n"%(method.__doc__))
128
129     def cmd_list(self, args):
130         "list -- list devices"
131         self.transport.write("Devices:\n")
132         for d in self.coherence.get_devices():
133             self.transport.write(str("%s %s [%s/%s/%s]\n" % (d.friendly_name, ':'.join(d.device_type.split(':')[3:5]), d.st, d.usn.split(':')[1], d.host)))
134
135     def cmd_extract(self, args):
136         "extract <uuid> -- download xml files from device"
137
138         device = self.coherence.get_device_with_id(args[0])
139         if device == None:
140             self.transport.write("device %s not found - aborting\n" % args[0])
141         else:
142             self.transport.write(str("extracting from %s @ %s\n" % (device.friendly_name, device.host)))
143             try:
144                 l = []
145
146                 def device_extract(workdevice, path):
147                     tmp_dir = os.path.join(path,workdevice.get_uuid())
148                     os.mkdir(tmp_dir)
149                     d = client.downloadPage(workdevice.get_location(),os.path.join(tmp_dir,'device-description.xml'))
150                     l.append(d)
151
152                     for service in workdevice.services:
153                         d = client.downloadPage(service.get_scpd_url(),os.path.join(tmp_dir,'%s-description.xml'%service.service_type.split(':',3)[3]))
154                         l.append(d)
155
156
157                     for ed in workdevice.devices:
158                         device_extract(ed, tmp_dir)
159
160                 def finished(result):
161                     self.transport.write(str("\nextraction of device %s finished\nfiles have been saved to /%s/%s\n" %(args[0],tmpdir,args[0])))
162                     self.print_prompt()
163
164                 device_extract(device,tmpdir)
165
166                 dl = defer.DeferredList(l)
167                 dl.addCallback(finished)
168             except Exception, msg:
169                 self.transport.write(str("problem creating download directory %s\n" % msg))
170
171     def cmd_send(self, args):
172         "send <uuid> -- send before extracted xml files to the Coherence home base"
173         if os.path.isdir(os.path.join(tmpdir,args[0])) == 1:
174             cwd = os.getcwd()
175             os.chdir(tmpdir)
176             import tarfile
177             tar = tarfile.open(os.path.join(tmpdir,args[0]+'.tgz'), "w:gz")
178             for file in os.listdir(os.path.join(tmpdir,args[0])):
179                 tar.add(os.path.join(args[0],file))
180             tar.close()
181             os.chdir(cwd)
182
183             def got_mx(result):
184                 mx_list = result[0]
185                 mx_list.sort(lambda x, y: cmp(x.payload.preference, y.payload.preference))
186                 if len(mx_list) > 0:
187                     import posix, pwd
188                     import socket
189                     reactor.connectTCP(str(mx_list[0].payload.name), 25,
190                         SMTPClientFactory('@'.join((pwd.getpwuid(posix.getuid())[0],socket.gethostname())), 'upnp.fingerprint@googlemail.com', 'xml-files', os.path.join(tmpdir,args[0]+'.tgz')))
191
192             mx = namesclient.lookupMailExchange('googlemail.com')
193             mx.addCallback(got_mx)
194
195
196     def cmd_quit(self, args):
197         "quit -- quits this program"
198         reactor.stop()
199
200     cmd_exit = cmd_quit
201
202     def print_prompt(self):
203         self.transport.write('>>> ')
204
205 class win_command(threading.Thread):
206     def __init__(self, ui ):
207         threading.Thread.__init__(self)
208         self.ui = ui
209
210     def run(self):
211         self.ui.print_prompt()
212         while True:
213             try :
214                 cmd = sys.stdin.readline()
215                 self.ui.lineReceived(cmd)
216             except Exception, ex:
217                 import traceback
218                 traceback.print_exc()
219
220 def start_win_command_thread( ui ):
221     ui.transport = sys.stdout
222     _thread = win_command(ui)
223     _thread.setDaemon(True)
224     _thread.start()
225
226 if __name__ == '__main__':
227
228     c = Coherence({'logmode':'none', 'webserver':{}, 'controlpoint':'yes', 'start':'yes'})
229 #    c = Coherence({'logmode':'none'})
230
231     ui = UI()
232     ui.coherence = c
233
234     import os
235     if os.name == 'nt': #sys.platform == 'win32':
236         start_win_command_thread( ui )
237     else:
238         stdio.StandardIO(ui)
239
240     reactor.run()