Mercurial > lcfOS
comparison python/usb.py @ 114:f42268da614f
Connected to stm32f4discovery
author | Windel Bouwman |
---|---|
date | Sat, 05 Jan 2013 19:07:14 +0100 |
parents | 1f40be088ee8 |
children | 654093a9a1e3 |
comparison
equal
deleted
inserted
replaced
113:1f40be088ee8 | 114:f42268da614f |
---|---|
1 from ctypes import Structure, POINTER, CDLL | 1 from ctypes import Structure, POINTER, CDLL, CFUNCTYPE |
2 from ctypes import c_uint16, c_uint8, c_int, c_ssize_t | 2 from ctypes import c_uint16, c_uint8, c_int, c_uint, c_ssize_t, c_void_p |
3 from ctypes import byref | 3 from ctypes import byref, create_string_buffer |
4 | 4 |
5 # libusb wrapper: | 5 # libusb wrapper: |
6 libusb = CDLL('libusb-1.0.so') | 6 libusb = CDLL('libusb-1.0.so') |
7 | 7 |
8 # helper: | 8 # helper: |
9 def buildfunc(name, argtypes, restype): | 9 def buildfunc(name, argtypes, restype=c_int): |
10 f = getattr(libusb, name) | 10 f = getattr(libusb, name) |
11 f.argtypes = argtypes | 11 f.argtypes = argtypes |
12 f.restype = restype | 12 f.restype = restype |
13 globals()[name] = f | 13 globals()[name] = f |
14 return f | 14 return f |
32 libusb_transfer_status = enum(\ | 32 libusb_transfer_status = enum(\ |
33 COMPLETED=0, ERROR=1, TIMED_OUT=2, \ | 33 COMPLETED=0, ERROR=1, TIMED_OUT=2, \ |
34 CANCELLED=3, STALL=4, NO_DEVICE=5, OVERFLOW=6) | 34 CANCELLED=3, STALL=4, NO_DEVICE=5, OVERFLOW=6) |
35 | 35 |
36 # types | 36 # types |
37 c_int_p = POINTER(c_int) | |
37 class libusb_context(Structure): | 38 class libusb_context(Structure): |
38 pass | 39 pass |
39 libusb_context_p = POINTER(libusb_context) | 40 libusb_context_p = POINTER(libusb_context) |
40 libusb_context_p_p = POINTER(libusb_context_p) | 41 libusb_context_p_p = POINTER(libusb_context_p) |
41 | 42 |
67 ('iSerialNumber', c_uint8), | 68 ('iSerialNumber', c_uint8), |
68 ('iNumConfigurations', c_uint8) | 69 ('iNumConfigurations', c_uint8) |
69 ] | 70 ] |
70 libusb_device_descriptor_p = POINTER(libusb_device_descriptor) | 71 libusb_device_descriptor_p = POINTER(libusb_device_descriptor) |
71 | 72 |
73 """ | |
74 class libusb_transfer(Structure): | |
75 pass | |
76 libusb_transfer_p = POINTER(libusb_transfer) | |
77 libusb_transfer_cb_fn = CFUNCTYPE(None, libusb_transfer_p) | |
78 libusb_transfer._fields_ = [ | |
79 ('dev_handle', libusb_device_handle_p), | |
80 ('flags', c_uint8), | |
81 ('endpoint', c_uchar), | |
82 ('type', c_uchar), | |
83 ('timeout', c_uint), | |
84 ('status', c_int), # enum libusb_transfer_status | |
85 ('length', c_int), | |
86 ('actual_length', c_int), | |
87 ('callback', libusb_transfer_cb_fn), | |
88 ('userdata', c_void_p), | |
89 ('buffer', c_void_p), | |
90 ('num_iso_packets', c_int), | |
91 ('iso_packet_desc', libusb_iso_packet_descriptor) | |
92 ] | |
93 """ | |
72 # functions | 94 # functions |
73 buildfunc('libusb_init', [libusb_context_p_p], c_int) | 95 buildfunc('libusb_init', [libusb_context_p_p], c_int) |
74 | 96 |
75 buildfunc('libusb_get_device_list', \ | 97 buildfunc('libusb_get_device_list', \ |
76 [libusb_context_p, libusb_device_p_p_p], c_ssize_t) | 98 [libusb_context_p, libusb_device_p_p_p], c_ssize_t) |
77 buildfunc('libusb_free_device_list', [libusb_device_p_p, c_int], None) | 99 buildfunc('libusb_free_device_list', [libusb_device_p_p, c_int], None) |
78 buildfunc('libusb_get_bus_number', [libusb_device_p], c_uint8) | 100 buildfunc('libusb_get_bus_number', [libusb_device_p], c_uint8) |
79 buildfunc('libusb_get_device_address', [libusb_device_p], c_uint8) | 101 buildfunc('libusb_get_device_address', [libusb_device_p], c_uint8) |
80 buildfunc('libusb_get_device_speed', [libusb_device_p], c_int) | 102 buildfunc('libusb_get_device_speed', [libusb_device_p]) |
81 buildfunc('libusb_unref_device', [libusb_device_p], None) | 103 buildfunc('libusb_unref_device', [libusb_device_p], None) |
82 buildfunc('libusb_open', [libusb_device_p, libusb_device_handle_p_p], c_int) | 104 buildfunc('libusb_open', [libusb_device_p, libusb_device_handle_p_p]) |
83 buildfunc('libusb_close', [libusb_device_handle_p], None) | 105 buildfunc('libusb_close', [libusb_device_handle_p], None) |
84 buildfunc('libusb_get_configuration', \ | 106 buildfunc('libusb_get_configuration',[libusb_device_handle_p,POINTER(c_int)]) |
85 [libusb_device_handle_p, POINTER(c_int)], c_int) | 107 buildfunc('libusb_set_configuration', [libusb_device_handle_p, c_int]) |
86 buildfunc('libusb_set_configuration', [libusb_device_handle_p, c_int], c_int) | 108 buildfunc('libusb_claim_interface', [libusb_device_handle_p, c_int]) |
87 buildfunc('libusb_claim_interface', [libusb_device_handle_p, c_int], c_int) | |
88 | 109 |
89 | 110 |
90 buildfunc('libusb_get_device_descriptor',\ | 111 buildfunc('libusb_get_device_descriptor',\ |
91 [libusb_device_p, libusb_device_descriptor_p], c_int) | 112 [libusb_device_p, libusb_device_descriptor_p]) |
113 | |
114 # synchronous functions: | |
115 buildfunc('libusb_bulk_transfer', [libusb_device_handle_p, c_uint8, \ | |
116 c_void_p, c_int, c_int_p, c_uint]) | |
92 | 117 |
93 # pythonic API: | 118 # pythonic API: |
94 | 119 |
95 class UsbError(Exception): | 120 class UsbError(Exception): |
96 def __init__(self, msg, errorcode): | 121 def __init__(self, msg, errorcode): |
157 .format( \ | 182 .format( \ |
158 self.BusNumber, self.DeviceAddress, self.VendorId, \ | 183 self.BusNumber, self.DeviceAddress, self.VendorId, \ |
159 self.ProductId, self.Speed) | 184 self.ProductId, self.Speed) |
160 return r2 | 185 return r2 |
161 | 186 |
187 USB_ENDPOINT_DIR_MASK = 0x80 | |
188 USB_ENDPOINT_IN = 0x80 | |
189 USB_ENDPOINT_OUT = 0x0 | |
190 | |
162 class UsbDeviceHandle: | 191 class UsbDeviceHandle: |
163 """ Handle to a detected usb device """ | 192 """ Handle to a detected usb device """ |
164 def __init__(self, device, handle_p): | 193 def __init__(self, device, handle_p): |
165 self.device = device | 194 self.device = device |
166 self.handle_p = handle_p | 195 self.handle_p = handle_p |
174 if r != 0: raise UsbError('Error setting configuration', r) | 203 if r != 0: raise UsbError('Error setting configuration', r) |
175 Configuration = property(getConfiguration, setConfiguration) | 204 Configuration = property(getConfiguration, setConfiguration) |
176 def claimInterface(self, interface_number): | 205 def claimInterface(self, interface_number): |
177 r = libusb_claim_interface(self.handle_p, interface_number) | 206 r = libusb_claim_interface(self.handle_p, interface_number) |
178 if r != 0: raise UsbError('Error claiming interface', r) | 207 if r != 0: raise UsbError('Error claiming interface', r) |
179 | 208 def bulkWrite(self, endpoint, data, timeout=0): |
209 """ Synchronous bulk write """ | |
210 assert type(data) is bytes | |
211 # assure the endpoint indicates the correct: | |
212 endpoint = (endpoint & (~USB_ENDPOINT_DIR_MASK)) | USB_ENDPOINT_OUT | |
213 buf = create_string_buffer(data) | |
214 transferred = c_int() | |
215 r = libusb_bulk_transfer(self.handle_p, endpoint, buf, len(data), \ | |
216 byref(transferred), timeout) | |
217 if r != 0: | |
218 raise UsbError('Bulk write failed', r) | |
219 if transferred.value != len(data): | |
220 raise UsbError('Not all {0} transferred {1}'.format(len(data), \ | |
221 transferred.value)) | |
222 def bulkRead(self, endpoint, numbytes, timeout=0): | |
223 """ Synchronous bulk read """ | |
224 # assure the endpoint indicates the correct: | |
225 endpoint = (endpoint & (~USB_ENDPOINT_DIR_MASK)) | USB_ENDPOINT_IN | |
226 buf = create_string_buffer(numbytes) | |
227 transferred = c_int() | |
228 r = libusb_bulk_transfer(self.handle_p, endpoint, buf, numbytes, \ | |
229 byref(transferred), timeout) | |
230 if r != 0: | |
231 raise UsbError('Bulk read failed', r) | |
232 if transferred.value != numbytes: | |
233 raise UsbError('Not all {0} transferred {1}'.format(numbytes, \ | |
234 transferred.value)) | |
235 data = buf.raw[0:numbytes] | |
236 return data | |
180 | 237 |
181 class UsbTransfer: | 238 class UsbTransfer: |
182 def __init__(self): | 239 def __init__(self): |
183 libusb_alloc_transfer(0) | 240 libusb_alloc_transfer(0) |