diff python/utils/usb.py @ 292:534b94b40aa8

Fixup reorganize
author Windel Bouwman
date Wed, 27 Nov 2013 08:06:42 +0100
parents python/usb.py@654093a9a1e3
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/python/utils/usb.py	Wed Nov 27 08:06:42 2013 +0100
@@ -0,0 +1,245 @@
+from ctypes import Structure, POINTER, CDLL, CFUNCTYPE
+from ctypes import c_uint16, c_uint8, c_int, c_uint, c_ssize_t, c_void_p
+from ctypes import byref, create_string_buffer
+
+# libusb wrapper:
+libusb = CDLL('libusb-1.0.so')
+
+# helper:
+def buildfunc(name, argtypes, restype=c_int):
+   f = getattr(libusb, name)
+   f.argtypes = argtypes
+   f.restype = restype
+   globals()[name] = f
+   return f
+def enum(**enums):
+   reverse = dict((value, key) for key, value in enums.items())
+   enums['reverse_mapping'] = reverse
+   return type('enum', (), enums)
+
+# enums
+libusb_class_code = enum(PER_INTERFACE=0, AUDIO=1, COMM=2, HID=3, \
+         PHYSICAL=5, PRINTER=7, PTP=6, MASS_STORAGE=8, HUB=9, \
+         DATA=10, SMART_CARD=0xb, CONTENT_SECURITY=0xd, VIDEO=0xe, \
+         PERSONAL_HEALTHCARE=0xf, DIAGNOSTIC_DEVICE=0xdc, WIRELESS=0xe,\
+         APPLICATION=0xfe, VENDOR_SPEC=0xff)
+libusb_speed = enum(UNKNOWN=0, LOW=1, FULL=2, HIGH=3, SUPER=4)
+libusb_error = enum(SUCCES=0, ERROR_IO=-1, ERROR_INVALID_PARAM=-2, \
+         ERROR_ACCESS=-3, ERROR_NO_DEVICE=-4, ERROR_NOT_FOUND=-5, \
+         ERROR_BUSY=-6, ERROR_TIMEOUT=-7, ERROR_OVERFLOW=-8, \
+         ERROR_PIPE=-9, ERROR_INTERRUPTED=-10, ERROR_NO_MEM=-11, \
+         ERROR_NOT_SUPPORTED=-12, ERROR_OTHER=-99)
+libusb_transfer_status = enum(\
+         COMPLETED=0, ERROR=1, TIMED_OUT=2, \
+         CANCELLED=3, STALL=4, NO_DEVICE=5, OVERFLOW=6)
+
+# types
+c_int_p = POINTER(c_int)
+class libusb_context(Structure):
+   pass
+libusb_context_p = POINTER(libusb_context)
+libusb_context_p_p = POINTER(libusb_context_p)
+
+class libusb_device(Structure):
+   pass
+libusb_device_p = POINTER(libusb_device)
+libusb_device_p_p = POINTER(libusb_device_p)
+libusb_device_p_p_p = POINTER(libusb_device_p_p)
+
+class libusb_device_handle(Structure):
+   pass
+libusb_device_handle_p = POINTER(libusb_device_handle)
+libusb_device_handle_p_p = POINTER(libusb_device_handle_p)
+
+class libusb_device_descriptor(Structure):
+   _fields_ = [
+               ('bLength', c_uint8),
+               ('bDescriptorType', c_uint8),
+               ('bcdUSB', c_uint16),
+               ('bDeviceClass', c_uint8),
+               ('bDeviceSubClass', c_uint8),
+               ('bDeviceProtocol', c_uint8),
+               ('bMaxPacketSize0', c_uint8),
+               ('idVendor', c_uint16),
+               ('idProduct', c_uint16),
+               ('bcdDevice', c_uint16),
+               ('iManufacturer', c_uint8),
+               ('iProduct', c_uint8),
+               ('iSerialNumber', c_uint8),
+               ('iNumConfigurations', c_uint8)
+              ]
+libusb_device_descriptor_p = POINTER(libusb_device_descriptor)
+
+"""
+class libusb_transfer(Structure):
+   pass
+libusb_transfer_p = POINTER(libusb_transfer)
+libusb_transfer_cb_fn = CFUNCTYPE(None, libusb_transfer_p)
+libusb_transfer._fields_ = [
+      ('dev_handle', libusb_device_handle_p),
+      ('flags', c_uint8),
+      ('endpoint', c_uchar),
+      ('type', c_uchar),
+      ('timeout', c_uint),
+      ('status', c_int), # enum libusb_transfer_status
+      ('length', c_int),
+      ('actual_length', c_int),
+      ('callback', libusb_transfer_cb_fn),
+      ('userdata', c_void_p),
+      ('buffer', c_void_p),
+      ('num_iso_packets', c_int),
+      ('iso_packet_desc', libusb_iso_packet_descriptor)
+   ]
+"""
+# functions
+buildfunc('libusb_init', [libusb_context_p_p], c_int)
+
+buildfunc('libusb_get_device_list', \
+   [libusb_context_p, libusb_device_p_p_p], c_ssize_t)
+buildfunc('libusb_free_device_list', [libusb_device_p_p, c_int], None)
+buildfunc('libusb_get_bus_number', [libusb_device_p], c_uint8)
+buildfunc('libusb_get_device_address', [libusb_device_p], c_uint8)
+buildfunc('libusb_get_device_speed', [libusb_device_p])
+buildfunc('libusb_unref_device', [libusb_device_p], None)
+buildfunc('libusb_open', [libusb_device_p, libusb_device_handle_p_p])
+buildfunc('libusb_close', [libusb_device_handle_p], None)
+buildfunc('libusb_get_configuration',[libusb_device_handle_p,POINTER(c_int)])
+buildfunc('libusb_set_configuration', [libusb_device_handle_p, c_int])
+buildfunc('libusb_claim_interface', [libusb_device_handle_p, c_int])
+
+buildfunc('libusb_get_device_descriptor',\
+   [libusb_device_p, libusb_device_descriptor_p])
+
+# synchronous functions:
+buildfunc('libusb_bulk_transfer', [libusb_device_handle_p, c_uint8, \
+   c_void_p, c_int, c_int_p, c_uint])
+
+# pythonic API:
+
+class UsbError(Exception):
+   def __init__(self, msg, errorcode):
+      if errorcode in libusb_error.reverse_mapping:
+         errorcode = libusb_error.reverse_mapping[errorcode]
+      msg = msg + 'Error code: {0}'.format(errorcode)
+      super().__init__(msg)
+
+class UsbContext(object):
+   """ A usb context in case of multiple use """
+   def __init__(self):
+      self.context_p = libusb_context_p()
+      r = libusb_init(byref(self.context_p))
+      if r != 0:
+         raise UsbError('libusb_init error!', r)
+   def getDeviceList(self):
+      devlist = libusb_device_p_p()
+      count = libusb_get_device_list(self.context_p, byref(devlist))
+      if count < 0:
+         raise UsbError('Error getting device list', count)
+      l = [UsbDevice(self, device_p.contents) for device_p in devlist[0:count]]
+      libusb_free_device_list(devlist, 0)
+      return l
+   DeviceList = property(getDeviceList)
+
+class UsbDevice:
+   """ A detected usb device """
+   def __init__(self, context, device_p):
+      self.context = context
+      self.dev_p = device_p
+   def __del__(self):
+      libusb_unref_device(self.dev_p)
+   def getBusNumber(self):
+      return libusb_get_bus_number(self.dev_p)
+   BusNumber = property(getBusNumber)
+   def getDeviceAddress(self):
+      return libusb_get_device_address(self.dev_p)
+   DeviceAddress = property(getDeviceAddress)
+   def getSpeed(self):
+      s = libusb_get_device_speed(self.dev_p)
+      if s in libusb_speed.reverse_mapping:
+         s = libusb_speed.reverse_mapping[s]
+      return s
+   Speed = property(getSpeed)
+   def getDescriptor(self):
+      descriptor = libusb_device_descriptor()
+      r = libusb_get_device_descriptor(self.dev_p, byref(descriptor))
+      if r != 0:
+         raise UsbError('Error getting descriptor', r)
+      return descriptor
+   Descriptor = property(getDescriptor)
+   VendorId = property(lambda self: self.Descriptor.idVendor)
+   ProductId = property(lambda self: self.Descriptor.idProduct)
+   NumConfigurations = property(lambda self: self.Descriptor.bNumConfigurations)
+   def open(self):
+      """ Opens this device and returns a handle """
+      handle_p = libusb_device_handle_p()
+      r = libusb_open(self.dev_p, byref(handle_p))
+      if r != 0:
+         raise UsbError('error opening device', r)
+      return UsbDeviceHandle(self, handle_p)
+   def __repr__(self):
+      r2 = 'Usb device: bus {0} address {1} {2:04X}:{3:04X} speed {4}' \
+         .format( \
+         self.BusNumber, self.DeviceAddress, self.VendorId, \
+         self.ProductId, self.Speed)
+      return r2
+
+USB_ENDPOINT_DIR_MASK = 0x80
+USB_ENDPOINT_IN = 0x80
+USB_ENDPOINT_OUT = 0x0
+
+class UsbDeviceHandle:
+   """ Handle to a detected usb device """
+   def __init__(self, device, handle_p):
+      self.device = device
+      self.handle_p = handle_p
+   def __del__(self):
+      self.close()
+   def close(self):
+      if self.handle_p:
+         libusb_close(self.handle_p)
+         self.handle_p = None
+   def getConfiguration(self):
+      config = c_int()
+      r = libusb_get_configuration(self.handle_p, byref(config))
+      if r != 0: raise UsbError('Error getting configuration', r)
+      return config.value
+   def setConfiguration(self, config):
+      r = libusb_set_configuration(self.handle_p, config)
+      if r != 0: raise UsbError('Error setting configuration', r)
+   Configuration = property(getConfiguration, setConfiguration)
+   def claimInterface(self, interface_number):
+      r = libusb_claim_interface(self.handle_p, interface_number)
+      if r != 0: raise UsbError('Error claiming interface', r)
+   def bulkWrite(self, endpoint, data, timeout=0):
+      """ Synchronous bulk write """
+      assert type(data) is bytes
+      # assure the endpoint indicates the correct:
+      endpoint = (endpoint & (~USB_ENDPOINT_DIR_MASK)) | USB_ENDPOINT_OUT
+      buf = create_string_buffer(data)
+      transferred = c_int()
+      r = libusb_bulk_transfer(self.handle_p, endpoint, buf, len(data), \
+         byref(transferred), timeout)
+      if r != 0:
+         raise UsbError('Bulk write failed', r)
+      if transferred.value != len(data):
+         raise UsbError('Not all {0} transferred {1}'.format(len(data), \
+            transferred.value))
+   def bulkRead(self, endpoint, numbytes, timeout=0):
+      """ Synchronous bulk read """
+      # assure the endpoint indicates the correct:
+      endpoint = (endpoint & (~USB_ENDPOINT_DIR_MASK)) | USB_ENDPOINT_IN
+      buf = create_string_buffer(numbytes)
+      transferred = c_int()
+      r = libusb_bulk_transfer(self.handle_p, endpoint, buf, numbytes, \
+         byref(transferred), timeout)
+      if r != 0:
+         raise UsbError('Bulk read failed', r)
+      if transferred.value != numbytes:
+         raise UsbError('Not all {0} transferred {1}'.format(numbytes, \
+            transferred.value))
+      data = buf.raw[0:numbytes]
+      return data
+
+class UsbTransfer:
+   def __init__(self):
+      libusb_alloc_transfer(0)