diff python/usb.py @ 114:f42268da614f

Connected to stm32f4discovery
author Windel Bouwman
date Sat, 05 Jan 2013 19:07:14 +0100
parents 1f40be088ee8
children 654093a9a1e3
line wrap: on
line diff
--- a/python/usb.py	Sat Jan 05 00:06:27 2013 +0100
+++ b/python/usb.py	Sat Jan 05 19:07:14 2013 +0100
@@ -1,12 +1,12 @@
-from ctypes import Structure, POINTER, CDLL
-from ctypes import c_uint16, c_uint8, c_int, c_ssize_t
-from ctypes import byref
+from ctypes import Structure, POINTER, CDLL, CFUNCTYPE
+from ctypes import c_uint16, c_uint8, c_int, c_uint, c_ssize_t, c_void_p
+from ctypes import byref, create_string_buffer
 
 # libusb wrapper:
 libusb = CDLL('libusb-1.0.so')
 
 # helper:
-def buildfunc(name, argtypes, restype):
+def buildfunc(name, argtypes, restype=c_int):
    f = getattr(libusb, name)
    f.argtypes = argtypes
    f.restype = restype
@@ -34,6 +34,7 @@
          CANCELLED=3, STALL=4, NO_DEVICE=5, OVERFLOW=6)
 
 # types
+c_int_p = POINTER(c_int)
 class libusb_context(Structure):
    pass
 libusb_context_p = POINTER(libusb_context)
@@ -69,6 +70,27 @@
               ]
 libusb_device_descriptor_p = POINTER(libusb_device_descriptor)
 
+"""
+class libusb_transfer(Structure):
+   pass
+libusb_transfer_p = POINTER(libusb_transfer)
+libusb_transfer_cb_fn = CFUNCTYPE(None, libusb_transfer_p)
+libusb_transfer._fields_ = [
+      ('dev_handle', libusb_device_handle_p),
+      ('flags', c_uint8),
+      ('endpoint', c_uchar),
+      ('type', c_uchar),
+      ('timeout', c_uint),
+      ('status', c_int), # enum libusb_transfer_status
+      ('length', c_int),
+      ('actual_length', c_int),
+      ('callback', libusb_transfer_cb_fn),
+      ('userdata', c_void_p),
+      ('buffer', c_void_p),
+      ('num_iso_packets', c_int),
+      ('iso_packet_desc', libusb_iso_packet_descriptor)
+   ]
+"""
 # functions
 buildfunc('libusb_init', [libusb_context_p_p], c_int)
 
@@ -77,18 +99,21 @@
 buildfunc('libusb_free_device_list', [libusb_device_p_p, c_int], None)
 buildfunc('libusb_get_bus_number', [libusb_device_p], c_uint8)
 buildfunc('libusb_get_device_address', [libusb_device_p], c_uint8)
-buildfunc('libusb_get_device_speed', [libusb_device_p], c_int)
+buildfunc('libusb_get_device_speed', [libusb_device_p])
 buildfunc('libusb_unref_device', [libusb_device_p], None)
-buildfunc('libusb_open', [libusb_device_p, libusb_device_handle_p_p], c_int)
+buildfunc('libusb_open', [libusb_device_p, libusb_device_handle_p_p])
 buildfunc('libusb_close', [libusb_device_handle_p], None)
-buildfunc('libusb_get_configuration', \
-   [libusb_device_handle_p, POINTER(c_int)], c_int)
-buildfunc('libusb_set_configuration', [libusb_device_handle_p, c_int], c_int)
-buildfunc('libusb_claim_interface', [libusb_device_handle_p, c_int], c_int)
+buildfunc('libusb_get_configuration',[libusb_device_handle_p,POINTER(c_int)])
+buildfunc('libusb_set_configuration', [libusb_device_handle_p, c_int])
+buildfunc('libusb_claim_interface', [libusb_device_handle_p, c_int])
 
 
 buildfunc('libusb_get_device_descriptor',\
-   [libusb_device_p, libusb_device_descriptor_p], c_int)
+   [libusb_device_p, libusb_device_descriptor_p])
+
+# synchronous functions:
+buildfunc('libusb_bulk_transfer', [libusb_device_handle_p, c_uint8, \
+   c_void_p, c_int, c_int_p, c_uint])
 
 # pythonic API:
 
@@ -159,6 +184,10 @@
          self.ProductId, self.Speed)
       return r2
 
+USB_ENDPOINT_DIR_MASK = 0x80
+USB_ENDPOINT_IN = 0x80
+USB_ENDPOINT_OUT = 0x0
+
 class UsbDeviceHandle:
    """ Handle to a detected usb device """
    def __init__(self, device, handle_p):
@@ -176,7 +205,35 @@
    def claimInterface(self, interface_number):
       r = libusb_claim_interface(self.handle_p, interface_number)
       if r != 0: raise UsbError('Error claiming interface', r)
-
+   def bulkWrite(self, endpoint, data, timeout=0):
+      """ Synchronous bulk write """
+      assert type(data) is bytes
+      # assure the endpoint indicates the correct:
+      endpoint = (endpoint & (~USB_ENDPOINT_DIR_MASK)) | USB_ENDPOINT_OUT
+      buf = create_string_buffer(data)
+      transferred = c_int()
+      r = libusb_bulk_transfer(self.handle_p, endpoint, buf, len(data), \
+         byref(transferred), timeout)
+      if r != 0:
+         raise UsbError('Bulk write failed', r)
+      if transferred.value != len(data):
+         raise UsbError('Not all {0} transferred {1}'.format(len(data), \
+            transferred.value))
+   def bulkRead(self, endpoint, numbytes, timeout=0):
+      """ Synchronous bulk read """
+      # assure the endpoint indicates the correct:
+      endpoint = (endpoint & (~USB_ENDPOINT_DIR_MASK)) | USB_ENDPOINT_IN
+      buf = create_string_buffer(numbytes)
+      transferred = c_int()
+      r = libusb_bulk_transfer(self.handle_p, endpoint, buf, numbytes, \
+         byref(transferred), timeout)
+      if r != 0:
+         raise UsbError('Bulk read failed', r)
+      if transferred.value != numbytes:
+         raise UsbError('Not all {0} transferred {1}'.format(numbytes, \
+            transferred.value))
+      data = buf.raw[0:numbytes]
+      return data
 
 class UsbTransfer:
    def __init__(self):