Mercurial > lcfOS
comparison python/utils/usb.py @ 292:534b94b40aa8
Fixup reorganize
author | Windel Bouwman |
---|---|
date | Wed, 27 Nov 2013 08:06:42 +0100 |
parents | python/usb.py@654093a9a1e3 |
children |
comparison
equal
deleted
inserted
replaced
290:7b38782ed496 | 292:534b94b40aa8 |
---|---|
1 from ctypes import Structure, POINTER, CDLL, CFUNCTYPE | |
2 from ctypes import c_uint16, c_uint8, c_int, c_uint, c_ssize_t, c_void_p | |
3 from ctypes import byref, create_string_buffer | |
4 | |
5 # libusb wrapper: | |
6 libusb = CDLL('libusb-1.0.so') | |
7 | |
8 # helper: | |
9 def buildfunc(name, argtypes, restype=c_int): | |
10 f = getattr(libusb, name) | |
11 f.argtypes = argtypes | |
12 f.restype = restype | |
13 globals()[name] = f | |
14 return f | |
15 def enum(**enums): | |
16 reverse = dict((value, key) for key, value in enums.items()) | |
17 enums['reverse_mapping'] = reverse | |
18 return type('enum', (), enums) | |
19 | |
20 # enums | |
21 libusb_class_code = enum(PER_INTERFACE=0, AUDIO=1, COMM=2, HID=3, \ | |
22 PHYSICAL=5, PRINTER=7, PTP=6, MASS_STORAGE=8, HUB=9, \ | |
23 DATA=10, SMART_CARD=0xb, CONTENT_SECURITY=0xd, VIDEO=0xe, \ | |
24 PERSONAL_HEALTHCARE=0xf, DIAGNOSTIC_DEVICE=0xdc, WIRELESS=0xe,\ | |
25 APPLICATION=0xfe, VENDOR_SPEC=0xff) | |
26 libusb_speed = enum(UNKNOWN=0, LOW=1, FULL=2, HIGH=3, SUPER=4) | |
27 libusb_error = enum(SUCCES=0, ERROR_IO=-1, ERROR_INVALID_PARAM=-2, \ | |
28 ERROR_ACCESS=-3, ERROR_NO_DEVICE=-4, ERROR_NOT_FOUND=-5, \ | |
29 ERROR_BUSY=-6, ERROR_TIMEOUT=-7, ERROR_OVERFLOW=-8, \ | |
30 ERROR_PIPE=-9, ERROR_INTERRUPTED=-10, ERROR_NO_MEM=-11, \ | |
31 ERROR_NOT_SUPPORTED=-12, ERROR_OTHER=-99) | |
32 libusb_transfer_status = enum(\ | |
33 COMPLETED=0, ERROR=1, TIMED_OUT=2, \ | |
34 CANCELLED=3, STALL=4, NO_DEVICE=5, OVERFLOW=6) | |
35 | |
36 # types | |
37 c_int_p = POINTER(c_int) | |
38 class libusb_context(Structure): | |
39 pass | |
40 libusb_context_p = POINTER(libusb_context) | |
41 libusb_context_p_p = POINTER(libusb_context_p) | |
42 | |
43 class libusb_device(Structure): | |
44 pass | |
45 libusb_device_p = POINTER(libusb_device) | |
46 libusb_device_p_p = POINTER(libusb_device_p) | |
47 libusb_device_p_p_p = POINTER(libusb_device_p_p) | |
48 | |
49 class libusb_device_handle(Structure): | |
50 pass | |
51 libusb_device_handle_p = POINTER(libusb_device_handle) | |
52 libusb_device_handle_p_p = POINTER(libusb_device_handle_p) | |
53 | |
54 class libusb_device_descriptor(Structure): | |
55 _fields_ = [ | |
56 ('bLength', c_uint8), | |
57 ('bDescriptorType', c_uint8), | |
58 ('bcdUSB', c_uint16), | |
59 ('bDeviceClass', c_uint8), | |
60 ('bDeviceSubClass', c_uint8), | |
61 ('bDeviceProtocol', c_uint8), | |
62 ('bMaxPacketSize0', c_uint8), | |
63 ('idVendor', c_uint16), | |
64 ('idProduct', c_uint16), | |
65 ('bcdDevice', c_uint16), | |
66 ('iManufacturer', c_uint8), | |
67 ('iProduct', c_uint8), | |
68 ('iSerialNumber', c_uint8), | |
69 ('iNumConfigurations', c_uint8) | |
70 ] | |
71 libusb_device_descriptor_p = POINTER(libusb_device_descriptor) | |
72 | |
73 """ | |
74 class libusb_transfer(Structure): | |
75 pass | |
76 libusb_transfer_p = POINTER(libusb_transfer) | |
77 libusb_transfer_cb_fn = CFUNCTYPE(None, libusb_transfer_p) | |
78 libusb_transfer._fields_ = [ | |
79 ('dev_handle', libusb_device_handle_p), | |
80 ('flags', c_uint8), | |
81 ('endpoint', c_uchar), | |
82 ('type', c_uchar), | |
83 ('timeout', c_uint), | |
84 ('status', c_int), # enum libusb_transfer_status | |
85 ('length', c_int), | |
86 ('actual_length', c_int), | |
87 ('callback', libusb_transfer_cb_fn), | |
88 ('userdata', c_void_p), | |
89 ('buffer', c_void_p), | |
90 ('num_iso_packets', c_int), | |
91 ('iso_packet_desc', libusb_iso_packet_descriptor) | |
92 ] | |
93 """ | |
94 # functions | |
95 buildfunc('libusb_init', [libusb_context_p_p], c_int) | |
96 | |
97 buildfunc('libusb_get_device_list', \ | |
98 [libusb_context_p, libusb_device_p_p_p], c_ssize_t) | |
99 buildfunc('libusb_free_device_list', [libusb_device_p_p, c_int], None) | |
100 buildfunc('libusb_get_bus_number', [libusb_device_p], c_uint8) | |
101 buildfunc('libusb_get_device_address', [libusb_device_p], c_uint8) | |
102 buildfunc('libusb_get_device_speed', [libusb_device_p]) | |
103 buildfunc('libusb_unref_device', [libusb_device_p], None) | |
104 buildfunc('libusb_open', [libusb_device_p, libusb_device_handle_p_p]) | |
105 buildfunc('libusb_close', [libusb_device_handle_p], None) | |
106 buildfunc('libusb_get_configuration',[libusb_device_handle_p,POINTER(c_int)]) | |
107 buildfunc('libusb_set_configuration', [libusb_device_handle_p, c_int]) | |
108 buildfunc('libusb_claim_interface', [libusb_device_handle_p, c_int]) | |
109 | |
110 buildfunc('libusb_get_device_descriptor',\ | |
111 [libusb_device_p, libusb_device_descriptor_p]) | |
112 | |
113 # synchronous functions: | |
114 buildfunc('libusb_bulk_transfer', [libusb_device_handle_p, c_uint8, \ | |
115 c_void_p, c_int, c_int_p, c_uint]) | |
116 | |
117 # pythonic API: | |
118 | |
119 class UsbError(Exception): | |
120 def __init__(self, msg, errorcode): | |
121 if errorcode in libusb_error.reverse_mapping: | |
122 errorcode = libusb_error.reverse_mapping[errorcode] | |
123 msg = msg + 'Error code: {0}'.format(errorcode) | |
124 super().__init__(msg) | |
125 | |
126 class UsbContext(object): | |
127 """ A usb context in case of multiple use """ | |
128 def __init__(self): | |
129 self.context_p = libusb_context_p() | |
130 r = libusb_init(byref(self.context_p)) | |
131 if r != 0: | |
132 raise UsbError('libusb_init error!', r) | |
133 def getDeviceList(self): | |
134 devlist = libusb_device_p_p() | |
135 count = libusb_get_device_list(self.context_p, byref(devlist)) | |
136 if count < 0: | |
137 raise UsbError('Error getting device list', count) | |
138 l = [UsbDevice(self, device_p.contents) for device_p in devlist[0:count]] | |
139 libusb_free_device_list(devlist, 0) | |
140 return l | |
141 DeviceList = property(getDeviceList) | |
142 | |
143 class UsbDevice: | |
144 """ A detected usb device """ | |
145 def __init__(self, context, device_p): | |
146 self.context = context | |
147 self.dev_p = device_p | |
148 def __del__(self): | |
149 libusb_unref_device(self.dev_p) | |
150 def getBusNumber(self): | |
151 return libusb_get_bus_number(self.dev_p) | |
152 BusNumber = property(getBusNumber) | |
153 def getDeviceAddress(self): | |
154 return libusb_get_device_address(self.dev_p) | |
155 DeviceAddress = property(getDeviceAddress) | |
156 def getSpeed(self): | |
157 s = libusb_get_device_speed(self.dev_p) | |
158 if s in libusb_speed.reverse_mapping: | |
159 s = libusb_speed.reverse_mapping[s] | |
160 return s | |
161 Speed = property(getSpeed) | |
162 def getDescriptor(self): | |
163 descriptor = libusb_device_descriptor() | |
164 r = libusb_get_device_descriptor(self.dev_p, byref(descriptor)) | |
165 if r != 0: | |
166 raise UsbError('Error getting descriptor', r) | |
167 return descriptor | |
168 Descriptor = property(getDescriptor) | |
169 VendorId = property(lambda self: self.Descriptor.idVendor) | |
170 ProductId = property(lambda self: self.Descriptor.idProduct) | |
171 NumConfigurations = property(lambda self: self.Descriptor.bNumConfigurations) | |
172 def open(self): | |
173 """ Opens this device and returns a handle """ | |
174 handle_p = libusb_device_handle_p() | |
175 r = libusb_open(self.dev_p, byref(handle_p)) | |
176 if r != 0: | |
177 raise UsbError('error opening device', r) | |
178 return UsbDeviceHandle(self, handle_p) | |
179 def __repr__(self): | |
180 r2 = 'Usb device: bus {0} address {1} {2:04X}:{3:04X} speed {4}' \ | |
181 .format( \ | |
182 self.BusNumber, self.DeviceAddress, self.VendorId, \ | |
183 self.ProductId, self.Speed) | |
184 return r2 | |
185 | |
186 USB_ENDPOINT_DIR_MASK = 0x80 | |
187 USB_ENDPOINT_IN = 0x80 | |
188 USB_ENDPOINT_OUT = 0x0 | |
189 | |
190 class UsbDeviceHandle: | |
191 """ Handle to a detected usb device """ | |
192 def __init__(self, device, handle_p): | |
193 self.device = device | |
194 self.handle_p = handle_p | |
195 def __del__(self): | |
196 self.close() | |
197 def close(self): | |
198 if self.handle_p: | |
199 libusb_close(self.handle_p) | |
200 self.handle_p = None | |
201 def getConfiguration(self): | |
202 config = c_int() | |
203 r = libusb_get_configuration(self.handle_p, byref(config)) | |
204 if r != 0: raise UsbError('Error getting configuration', r) | |
205 return config.value | |
206 def setConfiguration(self, config): | |
207 r = libusb_set_configuration(self.handle_p, config) | |
208 if r != 0: raise UsbError('Error setting configuration', r) | |
209 Configuration = property(getConfiguration, setConfiguration) | |
210 def claimInterface(self, interface_number): | |
211 r = libusb_claim_interface(self.handle_p, interface_number) | |
212 if r != 0: raise UsbError('Error claiming interface', r) | |
213 def bulkWrite(self, endpoint, data, timeout=0): | |
214 """ Synchronous bulk write """ | |
215 assert type(data) is bytes | |
216 # assure the endpoint indicates the correct: | |
217 endpoint = (endpoint & (~USB_ENDPOINT_DIR_MASK)) | USB_ENDPOINT_OUT | |
218 buf = create_string_buffer(data) | |
219 transferred = c_int() | |
220 r = libusb_bulk_transfer(self.handle_p, endpoint, buf, len(data), \ | |
221 byref(transferred), timeout) | |
222 if r != 0: | |
223 raise UsbError('Bulk write failed', r) | |
224 if transferred.value != len(data): | |
225 raise UsbError('Not all {0} transferred {1}'.format(len(data), \ | |
226 transferred.value)) | |
227 def bulkRead(self, endpoint, numbytes, timeout=0): | |
228 """ Synchronous bulk read """ | |
229 # assure the endpoint indicates the correct: | |
230 endpoint = (endpoint & (~USB_ENDPOINT_DIR_MASK)) | USB_ENDPOINT_IN | |
231 buf = create_string_buffer(numbytes) | |
232 transferred = c_int() | |
233 r = libusb_bulk_transfer(self.handle_p, endpoint, buf, numbytes, \ | |
234 byref(transferred), timeout) | |
235 if r != 0: | |
236 raise UsbError('Bulk read failed', r) | |
237 if transferred.value != numbytes: | |
238 raise UsbError('Not all {0} transferred {1}'.format(numbytes, \ | |
239 transferred.value)) | |
240 data = buf.raw[0:numbytes] | |
241 return data | |
242 | |
243 class UsbTransfer: | |
244 def __init__(self): | |
245 libusb_alloc_transfer(0) |