Ticket #133: event.py

File event.py, 15.3 kB (added by Lawrence, 4 years ago)

Simplifies control flow to enable easier unit testi9ng

Line 
1 # Licensed under the MIT license
2 # http://opensource.org/licenses/mit-license.php
3
4 # Copyright (C) 2006 Fluendo, S.A. (www.fluendo.com).
5 # Copyright 2006, Frank Scholz <coherence@beebits.net>
6
7 import time
8 from urlparse import urlsplit
9
10 from twisted.internet import reactor, defer
11 from twisted.web import resource, server
12 from twisted.internet.protocol import Protocol, ClientCreator
13 from twisted.python import failure
14
15 from coherence import log, SERVER_ID
16 from coherence.upnp.core import utils
17
18 import louie
19
20 global myurl
21 myurl = ""
22
23 # these are services that are currently subscribed to events.
24 global subscribed_services
25 subscribed_services = {}
26
27 def add_subscribed_service( service, subscribeid ):
28     global subscribed_services
29     log.info( Event.__module__, "add_subscribed_service %s (%s)", subscribeid, service )
30     if subscribeid:
31         subscribed_services[subscribeid] = service
32     elif subscribed_services.has_key(subscribeid):
33         del subscribed_services[subscribeid]
34
35 def clear_subscribed_service( subscribeid ):
36     log.info( Event.__module__, "clear_subscribed_service %s", subscribeid )
37     global subscribed_services
38     if subscribed_services.has_key(subscribeid):
39         del subscribed_services[subscribeid]
40
41 def get_service_by_subscribeid( subscribeid ):
42     log.info(Event.__module__, "get_service_by_subscribeid %s", subscribeid )
43     if subscribed_services.has_key(subscribeid):
44         return subscribed_services[subscribeid]
45     return None
46
47 def propagate(event):
48     target_service = get_service_by_subscribeid(event.get_sid())
49     if target_service:
50             target_service.process_event(event)
51
52 class EventServer(resource.Resource, log.Loggable):
53     logCategory = 'event_server'
54
55 #    def __init__(self, control_point):
56 #        self.control_point = control_point
57     def __init__(self, webserver):
58
59         global myurl
60
61 # If using global coherence object
62 #        myurl = 'http://%s:%d/events' % (control_point.coherence.hostname, control_point.coherence.web_server_port)
63 #        control_point.coherence.add_web_resource('events', self)
64
65 # if webserver separated from coherence
66         myurl = webserver.add_web_resource('events', self)
67         self.info("EventServer ready. %s", myurl)
68
69     def render_NOTIFY(self, request):
70         self.info("EventServer received notify from %s, code: %d" % (request.client, request.code))
71         data = request.content.getvalue()
72         request.setResponseCode(200)
73
74         command = {'method': request.method, 'path': request.path}
75         headers = request.received_headers
76         louie.send('UPnP.Event.Server.message_received', None, command, headers, data)
77
78         if request.code != 200:
79             self.info("data: %s", data)
80         else:
81             self.debug("data: %s", data)
82             headers = request.getAllHeaders()
83             sid = headers['sid']
84             try:
85                 tree = utils.parse_xml(data).getroot()
86             except (SyntaxError,AttributeError):
87                 self.warning("malformed event notification from %r", request.client)
88                 self.debug("data: %r", data)
89                 return ""
90
91             ns = "urn:schemas-upnp-org:event-1-0"
92             event = Event(sid)
93             for prop in tree.findall('{%s}property' % ns):
94                 for var in prop.getchildren():
95                     tag = var.tag
96                     idx = tag.find('}') + 1
97                     event.update({tag[idx:]: var.text})
98             propagate(event)
99         return ""
100
101
102 class EventSubscriptionServer(resource.Resource, log.Loggable):
103     """
104     This class ist the server part on the device side. It listens to subscribe
105     requests and registers the subscriber to send event messages to this device.
106     If an unsubscribe request is received, the subscription is cancelled and no
107     more event messages will be sent.
108
109     we receive a subscription request
110     {'callback': '<http://192.168.213.130:9083/BYvZMzfTSQkjHwzOThaP/ConnectionManager>',
111      'host': '192.168.213.107:30020',
112      'nt': 'upnp:event',
113      'content-length': '0',
114      'timeout': 'Second-300'}
115
116     modify the callback value
117     callback = callback[1:len(callback)-1]
118     and pack it into a subscriber dict
119
120     {'uuid:oAQbxiNlyYojCAdznJnC':
121         {'callback': '<http://192.168.213.130:9083/BYvZMzfTSQkjHwzOThaP/ConnectionManager>',
122          'created': 1162374189.257338,
123          'timeout': 'Second-300',
124          'sid': 'uuid:oAQbxiNlyYojCAdznJnC'}}
125     """
126     logCategory = 'event_subscription_server'
127
128     def __init__(self, service):
129         resource.Resource.__init__(self)
130         log.Loggable.__init__(self)
131         self.service = service
132         self.subscribers = service.get_subscribers()
133         try:
134             self.backend_name = self.service.backend.name
135         except AttributeError:
136             self.backend_name = self.service.backend
137
138     def render_SUBSCRIBE(self, request):
139         self.info( "EventSubscriptionServer %s (%s) received subscribe request from %s, code: %d" % (
140                             self.service.id,
141                             self.backend_name,
142                             request.client, request.code))
143         data = request.content.getvalue()
144         request.setResponseCode(200)
145
146         command = {'method': request.method, 'path': request.path}
147         headers = request.received_headers
148         louie.send('UPnP.Event.Client.message_received', None, command, headers, data)
149
150         if request.code != 200:
151             self.info("data: %s", data)
152         else:
153             headers = request.getAllHeaders()
154             try:
155                 #print self.subscribers
156                 #print headers['sid']
157                 if self.subscribers.has_key(headers['sid']):
158                     s = self.subscribers[headers['sid']]
159                 elif not headers.has_key('callback'):
160                     request.setResponseCode(404)
161                     request.setHeader('SERVER', SERVER_ID)
162                     request.setHeader('CONTENT-LENGTH', 0)
163                     return ""
164             except:
165                 from uuid import UUID
166                 sid = UUID()
167                 s = { 'sid' : str(sid),
168                       'callback' : headers['callback'][1:len(headers['callback'])-1],
169                       'seq' : 0}
170                 reactor.callLater(0.8, self.service.new_subscriber, s)
171
172             s['timeout'] = headers['timeout']
173             s['created'] = time.time()
174
175             request.setHeader('SID', s['sid'])
176             #request.setHeader('Subscription-ID', sid)  wrong example in the UPnP UUID spec?
177             request.setHeader('TIMEOUT', s['timeout'])
178             request.setHeader('SERVER', SERVER_ID)
179             request.setHeader('CONTENT-LENGTH', 0)
180         return ""
181
182     def render_UNSUBSCRIBE(self, request):
183         self.info( "EventSubscriptionServer %s (%s) received unsubscribe request from %s, code: %d" % (
184                             self.service.id,
185                             self.backend_name,
186                             request.client, request.code))
187         data = request.content.getvalue()
188         request.setResponseCode(200)
189
190         command = {'method': request.method, 'path': request.path}
191         headers = request.received_headers
192         louie.send('UPnP.Event.Client.message_received', None, command, headers, data)
193
194         if request.code != 200:
195             self.info("data: %s", data)
196         else:
197             headers = request.getAllHeaders()
198             try:
199                 del self.subscribers[headers['sid']]
200             except:
201                 """ XXX if not found set right error code """
202                 pass
203             #print self.subscribers
204         return ""
205
206
207 class Event(dict):
208     def __init__(self, sid):
209         dict.__init__(self)
210         self._sid = sid
211
212     def get_sid(self):
213         return self._sid
214
215
216 class EventProtocol(Protocol, log.Loggable):
217
218     logCategory = 'event_protocol'
219
220     def __init__(self, service, action):
221         self.service = service
222         self.action = action
223
224     def connectionMade(self):
225         #self.timeout_checker = reactor.callLater(30, lambda : self.transport.loseConnection())
226         pass
227
228     def dataReceived(self, data):
229         self.info("response received from the Service Events HTTP server ")
230         #self.debug(data)
231         cmd, headers = utils.parse_http_response(data)
232         self.debug("%r %r", cmd, headers)
233         if int(cmd[1]) != 200:
234             self.warning("response with error code %r received upon our %r request", cmd[1], self.action)
235             clear_subscribed_service(self.service.get_sid())
236             self.service.set_sid(None)
237         else:
238             try:
239                 self.service.set_sid(headers.get('sid',None))   # this may be an unsubscribe request
240
241                 add_subscribed_service(self.service, self.service.get_sid())
242
243                 self.debug("add subscription for %s", self.id)
244
245                 timeout = headers['timeout']
246                 self.debug("%r %r", headers['sid'], headers['timeout'])
247                 if timeout == 'infinite':
248                     self.service.set_timeout(time.time() + 4294967296) # FIXME: that's lame
249                 elif timeout.startswith('Second-'):
250                     timeout = int(timeout[len('Second-'):])
251                     self.service.set_timeout(time.time() + timeout)
252             except:
253                 #print headers
254                 pass
255         self.transport.loseConnection()
256
257
258     def connectionLost( self, reason):
259         try:
260             self.timeout_checker.cancel()
261             # If connection goes need to restart, maybe.
262 #           self.service.set_sid( None )
263             self.service.event_connection = None
264         except:
265             pass
266         self.debug( "connection closed %r from the Service Events HTTP server", reason)
267
268
269 def unsubscribe(service, action='unsubscribe'):
270     return subscribe(service, action)
271
272 def subscribe(service, action='subscribe'):
273     """
274     send a subscribe/renewal/unsubscribe request to a service
275     return the device response
276     """
277     log_category = Event.__module__
278     log.info(log_category, "event.subscribe, action: %r", action)
279
280     _,host_port,path,_,_ = urlsplit(service.get_base_url())
281     if host_port.find(':') != -1:
282         host,port = tuple(host_port.split(':'))
283         port = int(port)
284     else:
285         host = host_port
286         port = 80
287
288     def send_request(p, action):
289         log.info(log_category, "event.subscribe.send_request %r, action: %r %r",
290                  p, action, service.get_event_sub_url())
291         if action == 'subscribe':
292             request = ["SUBSCRIBE %s HTTP/1.1" % service.get_event_sub_url(),
293                         "HOST: %s:%d" % (host, port),
294                         "TIMEOUT: Second-1800",
295                         ]
296             service.event_connection = p
297         else:
298             request = ["UNSUBSCRIBE %s HTTP/1.1" % service.get_event_sub_url(),
299                         "HOST: %s:%d" % (host, port),
300                         ]
301
302         if service.get_sid():
303             request.append("SID: %s" % service.get_sid())
304         else:
305             request.append("CALLBACK: <%s>" % myurl)
306             request.append("NT: upnp:event")
307
308         request.append('Date: %s' % time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()))
309         request.append( "Content-Length: 0")
310         request.append( "")
311         request.append( "")
312         request = '\r\n'.join(request)
313         log.debug(log_category, "event.subscribe.send_request %r %r", request, p)
314         try:
315             p.transport.writeSomeData(request)
316         except AttributeError:
317             service.event_connection = None # need new connection
318             log.info(log_category, "transport for event %r already gone", action)
319        # print "event.subscribe.send_request", d
320         #return d
321
322     def got_error(failure, action):
323         log.info(log_category, "error on %s request with %s" % (action,service.get_base_url()))
324         log.debug(log_category, failure)
325
326     def teardown_connection(c, d):
327         log.info(log_category, "event.subscribe.teardown_connection")
328         del d
329         del c
330
331     def prepare_connection( service, action):
332         log.info(log_category, "event.subscribe.prepare_connection action: %r %r",
333                  action, service.event_connection)
334         if service.event_connection == None:
335             c = ClientCreator(reactor, EventProtocol, service=service, action=action)
336             log.info(log_category, "event.subscribe.prepare_connection: %r %r",
337                      host, port)
338             d = c.connectTCP(host, port)
339             d.addCallback(send_request, action=action)
340             d.addErrback(got_error, action)
341             #reactor.callLater(3, teardown_connection, c, d)
342         else:
343             d = defer.Deferred()
344             d.addCallback(send_request, action=action)
345             d.callback(service.event_connection)
346             #send_request(service.event_connection, action)
347         return d
348
349     """ FIXME:
350         we need to find a way to be sure that our unsubscribe calls get through
351         on shutdown
352         reactor.addSystemEventTrigger( 'before', 'shutdown', prepare_connection, service, action)
353     """
354
355     return prepare_connection(service, action)
356     #print "event.subscribe finished"
357
358
359 class NotificationProtocol(Protocol, log.Loggable):
360
361     logCategory = "notification_protocol"
362
363     def connectionMade(self):
364         self.timeout_checker = reactor.callLater(30, lambda : self.transport.loseConnection())
365
366     def dataReceived(self, data):
367         cmd, headers = utils.parse_http_response(data)
368         self.debug( "notification response received %r %r", cmd, headers)
369         try:
370             if int(cmd[1]) != 200:
371                 self.warning("response with error code %r received upon our notification", cmd[1])
372         except:
373             self.debug("response without error code received upon our notification")
374         self.transport.loseConnection()
375
376     def connectionLost( self, reason):
377         try:
378             self.timeout_checker.cancel()
379         except:
380             pass
381         self.debug("connection closed %r", reason)
382
383
384 def send_notification(s, xml):
385     """
386     send a notification a subscriber
387     return its response
388     """
389     log_category = "notification_protocol"
390
391     _,host_port,path,_,_ = urlsplit(s['callback'])
392     if path == '':
393         path = '/'
394     if host_port.find(':') != -1:
395         host,port = tuple(host_port.split(':'))
396         port = int(port)
397     else:
398         host = host_port
399         port = 80
400
401     def send_request(p):
402         request = ['NOTIFY %s HTTP/1.1' % path,
403                     'HOST:  %s:%d' % (host, port),
404                     'SEQ:  %d' % s['seq'],
405                     'CONTENT-TYPE:  text/xml;charset="utf-8"',
406                     'SID:  %s' % s['sid'],
407                     'NTS:  upnp:propchange',
408                     'NT:  upnp:event',
409                     'Content-Length: %d' % len(xml),
410                     '',
411                     xml]
412
413         request = '\r\n'.join(request)
414         log.info(log_category, "send_notification.send_request to %r %r",
415                  s['sid'], s['callback'])
416         log.debug(log_category, "request: %r", request)
417         s['seq'] += 1
418         if s['seq'] > 0xffffffff:
419             s['seq'] = 1
420         p.transport.write(request)
421         #return p.transport.write(request)
422
423     def got_error(failure):
424         log.info(log_category, "error sending notification to %r %r",
425                  s['sid'], s['callback'])
426         log.debug(log_category, failure)
427
428     c = ClientCreator(reactor, NotificationProtocol)
429     d = c.connectTCP(host, port)
430     d.addCallback(send_request)
431     d.addErrback(got_error)