]> git.gir.st - tmk_keyboard.git/blob - tool/mbed/mbed-sdk/workspace_tools/host_tests/udp_link_layer_auto.py
Squashed 'tmk_core/' changes from 7967731..b9e0ea0
[tmk_keyboard.git] / tool / mbed / mbed-sdk / workspace_tools / host_tests / udp_link_layer_auto.py
1 """
2 mbed SDK
3 Copyright (c) 2011-2013 ARM Limited
4
5 Licensed under the Apache License, Version 2.0 (the "License");
6 you may not use this file except in compliance with the License.
7 You may obtain a copy of the License at
8
9 http://www.apache.org/licenses/LICENSE-2.0
10
11 Unless required by applicable law or agreed to in writing, software
12 distributed under the License is distributed on an "AS IS" BASIS,
13 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 See the License for the specific language governing permissions and
15 limitations under the License.
16 """
17
18 """
19 How to use:
20 make.py -m LPC1768 -t ARM -d E:\ -n NET_14
21 udp_link_layer_auto.py -p COM20 -d E:\ -t 10
22 """
23
24 import re
25 import uuid
26 import socket
27 import thread
28 from sys import stdout
29 from time import time, sleep
30 from host_test import DefaultTest
31 from SocketServer import BaseRequestHandler, UDPServer
32
33
34 # Received datagrams (with time)
35 dict_udp_recv_datagrams = dict()
36
37 # Sent datagrams (with time)
38 dict_udp_sent_datagrams = dict()
39
40
41 class UDPEchoClient_Handler(BaseRequestHandler):
42 def handle(self):
43 """ One handle per connection
44 """
45 _data, _socket = self.request
46 # Process received datagram
47 data_str = repr(_data)[1:-1]
48 dict_udp_recv_datagrams[data_str] = time()
49
50
51 def udp_packet_recv(threadName, server_ip, server_port):
52 """ This function will receive packet stream from mbed device
53 """
54 server = UDPServer((server_ip, server_port), UDPEchoClient_Handler)
55 print "[UDP_COUNTER] Listening for connections... %s:%d"% (server_ip, server_port)
56 server.serve_forever()
57
58
59 class UDPEchoServerTest(DefaultTest):
60 ECHO_SERVER_ADDRESS = "" # UDP IP of datagram bursts
61 ECHO_PORT = 0 # UDP port for datagram bursts
62 CONTROL_PORT = 23 # TCP port used to get stats from mbed device, e.g. counters
63 s = None # Socket
64
65 TEST_PACKET_COUNT = 1000 # how many packets should be send
66 TEST_STRESS_FACTOR = 0.001 # stress factor: 10 ms
67 PACKET_SATURATION_RATIO = 29.9 # Acceptable packet transmission in %
68
69 PATTERN_SERVER_IP = "Server IP Address is (\d+).(\d+).(\d+).(\d+):(\d+)"
70 re_detect_server_ip = re.compile(PATTERN_SERVER_IP)
71
72 def get_control_data(self, command="stat\n"):
73 BUFFER_SIZE = 256
74 try:
75 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
76 s.connect((self.ECHO_SERVER_ADDRESS, self.CONTROL_PORT))
77 except Exception, e:
78 data = None
79 s.send(command)
80 data = s.recv(BUFFER_SIZE)
81 s.close()
82 return data
83
84 def test(self):
85 serial_ip_msg = self.mbed.serial_readline()
86 if serial_ip_msg is None:
87 return self.RESULT_IO_SERIAL
88 stdout.write(serial_ip_msg)
89 stdout.flush()
90 # Searching for IP address and port prompted by server
91 m = self.re_detect_server_ip.search(serial_ip_msg)
92 if m and len(m.groups()):
93 self.ECHO_SERVER_ADDRESS = ".".join(m.groups()[:4])
94 self.ECHO_PORT = int(m.groups()[4]) # must be integer for socket.connect method
95 self.notify("HOST: UDP Server found at: " + self.ECHO_SERVER_ADDRESS + ":" + str(self.ECHO_PORT))
96
97 # Open client socket to burst datagrams to UDP server in mbed
98 try:
99 self.s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
100 except Exception, e:
101 self.s = None
102 self.notify("HOST: Error: %s"% e)
103 return self.RESULT_ERROR
104
105 # UDP replied receiver works in background to get echoed datagrams
106 SERVER_IP = str(socket.gethostbyname(socket.getfqdn()))
107 SERVER_PORT = self.ECHO_PORT + 1
108 thread.start_new_thread(udp_packet_recv, ("Thread-udp-recv", SERVER_IP, SERVER_PORT))
109 sleep(0.5)
110
111 # Burst part
112 for no in range(self.TEST_PACKET_COUNT):
113 TEST_STRING = str(uuid.uuid4())
114 payload = str(no) + "__" + TEST_STRING
115 self.s.sendto(payload, (self.ECHO_SERVER_ADDRESS, self.ECHO_PORT))
116 dict_udp_sent_datagrams[payload] = time()
117 sleep(self.TEST_STRESS_FACTOR)
118
119 if self.s is not None:
120 self.s.close()
121
122 # Wait 5 seconds for packets to come
123 result = True
124 self.notify("HOST: Test Summary:")
125 for d in range(5):
126 sleep(1.0)
127 summary_datagram_success = (float(len(dict_udp_recv_datagrams)) / float(self.TEST_PACKET_COUNT)) * 100.0
128 self.notify("HOST: Datagrams received after +%d sec: %.3f%% (%d / %d), stress=%.3f ms"% (d,
129 summary_datagram_success,
130 len(dict_udp_recv_datagrams),
131 self.TEST_PACKET_COUNT,
132 self.TEST_STRESS_FACTOR))
133 result = result and (summary_datagram_success >= self.PACKET_SATURATION_RATIO)
134 stdout.flush()
135
136 # Getting control data from test
137 self.notify("...")
138 self.notify("HOST: Mbed Summary:")
139 mbed_stats = self.get_control_data()
140 self.notify(mbed_stats)
141 return self.RESULT_SUCCESS if result else self.RESULT_FAILURE
142
143
144 if __name__ == '__main__':
145 UDPEchoServerTest().run()
Imprint / Impressum