Mercurial > lcfOS
diff python/usb.py @ 114:f42268da614f
Connected to stm32f4discovery
author | Windel Bouwman |
---|---|
date | Sat, 05 Jan 2013 19:07:14 +0100 |
parents | 1f40be088ee8 |
children | 654093a9a1e3 |
line wrap: on
line diff
--- a/python/usb.py Sat Jan 05 00:06:27 2013 +0100 +++ b/python/usb.py Sat Jan 05 19:07:14 2013 +0100 @@ -1,12 +1,12 @@ -from ctypes import Structure, POINTER, CDLL -from ctypes import c_uint16, c_uint8, c_int, c_ssize_t -from ctypes import byref +from ctypes import Structure, POINTER, CDLL, CFUNCTYPE +from ctypes import c_uint16, c_uint8, c_int, c_uint, c_ssize_t, c_void_p +from ctypes import byref, create_string_buffer # libusb wrapper: libusb = CDLL('libusb-1.0.so') # helper: -def buildfunc(name, argtypes, restype): +def buildfunc(name, argtypes, restype=c_int): f = getattr(libusb, name) f.argtypes = argtypes f.restype = restype @@ -34,6 +34,7 @@ CANCELLED=3, STALL=4, NO_DEVICE=5, OVERFLOW=6) # types +c_int_p = POINTER(c_int) class libusb_context(Structure): pass libusb_context_p = POINTER(libusb_context) @@ -69,6 +70,27 @@ ] libusb_device_descriptor_p = POINTER(libusb_device_descriptor) +""" +class libusb_transfer(Structure): + pass +libusb_transfer_p = POINTER(libusb_transfer) +libusb_transfer_cb_fn = CFUNCTYPE(None, libusb_transfer_p) +libusb_transfer._fields_ = [ + ('dev_handle', libusb_device_handle_p), + ('flags', c_uint8), + ('endpoint', c_uchar), + ('type', c_uchar), + ('timeout', c_uint), + ('status', c_int), # enum libusb_transfer_status + ('length', c_int), + ('actual_length', c_int), + ('callback', libusb_transfer_cb_fn), + ('userdata', c_void_p), + ('buffer', c_void_p), + ('num_iso_packets', c_int), + ('iso_packet_desc', libusb_iso_packet_descriptor) + ] +""" # functions buildfunc('libusb_init', [libusb_context_p_p], c_int) @@ -77,18 +99,21 @@ buildfunc('libusb_free_device_list', [libusb_device_p_p, c_int], None) buildfunc('libusb_get_bus_number', [libusb_device_p], c_uint8) buildfunc('libusb_get_device_address', [libusb_device_p], c_uint8) -buildfunc('libusb_get_device_speed', [libusb_device_p], c_int) +buildfunc('libusb_get_device_speed', [libusb_device_p]) buildfunc('libusb_unref_device', [libusb_device_p], None) -buildfunc('libusb_open', [libusb_device_p, libusb_device_handle_p_p], c_int) +buildfunc('libusb_open', [libusb_device_p, libusb_device_handle_p_p]) buildfunc('libusb_close', [libusb_device_handle_p], None) -buildfunc('libusb_get_configuration', \ - [libusb_device_handle_p, POINTER(c_int)], c_int) -buildfunc('libusb_set_configuration', [libusb_device_handle_p, c_int], c_int) -buildfunc('libusb_claim_interface', [libusb_device_handle_p, c_int], c_int) +buildfunc('libusb_get_configuration',[libusb_device_handle_p,POINTER(c_int)]) +buildfunc('libusb_set_configuration', [libusb_device_handle_p, c_int]) +buildfunc('libusb_claim_interface', [libusb_device_handle_p, c_int]) buildfunc('libusb_get_device_descriptor',\ - [libusb_device_p, libusb_device_descriptor_p], c_int) + [libusb_device_p, libusb_device_descriptor_p]) + +# synchronous functions: +buildfunc('libusb_bulk_transfer', [libusb_device_handle_p, c_uint8, \ + c_void_p, c_int, c_int_p, c_uint]) # pythonic API: @@ -159,6 +184,10 @@ self.ProductId, self.Speed) return r2 +USB_ENDPOINT_DIR_MASK = 0x80 +USB_ENDPOINT_IN = 0x80 +USB_ENDPOINT_OUT = 0x0 + class UsbDeviceHandle: """ Handle to a detected usb device """ def __init__(self, device, handle_p): @@ -176,7 +205,35 @@ def claimInterface(self, interface_number): r = libusb_claim_interface(self.handle_p, interface_number) if r != 0: raise UsbError('Error claiming interface', r) - + def bulkWrite(self, endpoint, data, timeout=0): + """ Synchronous bulk write """ + assert type(data) is bytes + # assure the endpoint indicates the correct: + endpoint = (endpoint & (~USB_ENDPOINT_DIR_MASK)) | USB_ENDPOINT_OUT + buf = create_string_buffer(data) + transferred = c_int() + r = libusb_bulk_transfer(self.handle_p, endpoint, buf, len(data), \ + byref(transferred), timeout) + if r != 0: + raise UsbError('Bulk write failed', r) + if transferred.value != len(data): + raise UsbError('Not all {0} transferred {1}'.format(len(data), \ + transferred.value)) + def bulkRead(self, endpoint, numbytes, timeout=0): + """ Synchronous bulk read """ + # assure the endpoint indicates the correct: + endpoint = (endpoint & (~USB_ENDPOINT_DIR_MASK)) | USB_ENDPOINT_IN + buf = create_string_buffer(numbytes) + transferred = c_int() + r = libusb_bulk_transfer(self.handle_p, endpoint, buf, numbytes, \ + byref(transferred), timeout) + if r != 0: + raise UsbError('Bulk read failed', r) + if transferred.value != numbytes: + raise UsbError('Not all {0} transferred {1}'.format(numbytes, \ + transferred.value)) + data = buf.raw[0:numbytes] + return data class UsbTransfer: def __init__(self):