Mercurial > ~darius > hgwebdir.cgi > py-wh1080
diff wh1080.py @ 0:de9fe8d30147
Initial commit.
Seems to work for the basics.
author | Daniel O'Connor <darius@dons.net.au> |
---|---|
date | Sat, 13 Feb 2010 18:19:42 +1030 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/wh1080.py Sat Feb 13 18:19:42 2010 +1030 @@ -0,0 +1,150 @@ +#!/usr/bin/env python + +import struct +import usb + +WH1080_VENDOR = 0x1941 +WH1080_DEVICE = 0x8021 + +WH1080_TIMEOUT = 100 + +WH1080_RECORD_SIZE = 32 +WH1080_PAGE_SIZE = 32 +WH1080_BASE = 0x100 + +WH1080_PAGE0_MAGIC1A = 0xffffffffffffaa55 +WH1080_PAGE0_MAGIC1B = 0xffffffffffaaaa55 +WH1080_PAGE0_MAGIC2 = 0xffffffffffffffff + +WH1080_WIND_DIRECTIONS = ["N", "NNE", "NE", "ENE", "E", "ESE", "SE", "SSE", + "S", "SSW", "SW", "WSW", "W", "WNW", "NW", "NNW"] + +def apparent_temp(Ta, rh, ws, Q = None): + """Compute apparent temperature. Obtained from the Australian BOM at http://www.bom.gov.au/info/thermal_stress/ +Ta = dry bulb temperature (Celcius) +rh = relative humidity (percentage) +ws = Wind speed (m/s) +Q = net radiation absorbed by body (W/m2) (optional)""" + e = rh / 100 * 6.105 * exp(17.27 * Ta / (237.7 + Ta)) + + if Q == None: + return(Ta + 0.33 * e - 0.70 * ws - 4.00) + else: + return(Ta + 0.33 * e - 0.70 * ws + 0.70 * Q/(ws + 10) - 4.25) + +def list2uintN(l, size): + res = 0 + for i in xrange(size): + res += l[i] << 8 * i + return res + +class WH1080(object): + def __init__(self): + + busses = usb.busses() + + # + # Search for the device we want + # + self.handle = None + for bus in busses: + for dev in bus.devices: + #print "Looking at 0x%04x 0x%04x" % (dev.idVendor, dev.idProduct) + if dev.idVendor == WH1080_VENDOR and dev.idProduct == WH1080_DEVICE: + # Open the device and claim the USB interface that supports the spec + self.handle = dev.open() + self.dev = dev + break + + if self.handle == None: + raise "Could not find a suitable USB device" + self.handle.claimInterface(0) + + def get_current_record(self): + page0 = Page0(self.read_page(0)) + return(page0.current_record) + + def read_current_record(self): + return self.read_record(self.get_current_record()) + + def read_record(self, record): + """Read the nominated record from the device""" + + # Calculate offset for this record + ofs = (record * WH1080_PAGE_SIZE) + WH1080_BASE + # 32 bytes covers 2 records. + # We don't want to read past the end of memory so we start at + # the even page then pick what we need. + ofs &= ~WH1080_RECORD_SIZE + + data = self.read_page(ofs) + #print "Reading record %d => 0x%04x" % (record, ofs) + if record % 2: + data = data[0:16] + else: + data = data[16:32] + + return Reading(data) + + def read_page(self, ofs): + """Read a page from the device at ofs +Due to apparent hardware bugs / race conditions we read a few times until we have 2 identical reads""" + for t in xrange(10): + pageA = self._read_page(ofs) + pageB = self._read_page(ofs) + if pageA == pageB: + break + else: + raise IOError("Could not read page cleanly") + + return pageA + + def _read_page(self, ofs): + """Read a page from from the device at ofs (no retries)""" + msb = (ofs >> 8) & 0xff + lsb = ofs & 0xff + + req = [0xa1, msb, lsb, 0x20] * 2 + if self.handle.controlMsg(usb.TYPE_CLASS | usb.RECIP_INTERFACE, 0x9, req, value = 0x200, timeout = WH1080_TIMEOUT) != 8: + raise IOError("Unable to send control message") + + data = self.handle.interruptRead(usb.ENDPOINT_IN | usb.RECIP_INTERFACE, WH1080_PAGE_SIZE, WH1080_TIMEOUT) + if len(data) != WH1080_PAGE_SIZE: + raise IOError("Unable to read from endpoint expected %d bytes got %d" % + (WH1080_PAGE_SIZE, len(data))) + + data = map(chr, data) + return reduce(lambda a, b: a + b, data) + +class Page0(object): + """Decode page 0, which contains a pointer to the current page""" + def __init__(self, data): + (magic1, magic2, current_offset) = struct.unpack('< Q Q 14x H', data) + if (magic1 != WH1080_PAGE0_MAGIC1A and magic1 != WH1080_PAGE0_MAGIC1B) or magic2 != WH1080_PAGE0_MAGIC2: + raise ValueError("page0 magic not valid") + + self.current_record = (current_offset - WH1080_BASE) / WH1080_PAGE_SIZE + #print "Offset 0x%04x => %d" % (current_offset, self.current_record) + +class Reading(object): + def __init__(self, data): + (self.last_save_mins, self.inside_humidity, self.inside_temp, + self.outside_humidity, self.outside_temp, self.pressure, + self.wind_speed, self.wind_gust, foo, self.wind_direction, + self.rain, bar) = struct.unpack('< B B h B h H B B B B H B', data) + + #print "foo = 0x%02x, bar = 0x%02x" % (foo, bar) + self.inside_temp /= 10.0 + self.outside_temp /= 10.0 + self.wind_speed /= 10.0 + self.pressure /= 10.0 + if self.wind_direction == 0x80 or self.wind_direction == 0xff: + self.wind_direction = None + else: + self.wind_direction = WH1080_WIND_DIRECTIONS[self.wind_direction] + + def __str__(self): + return "%2d %4.1f %3d %4.1f %3d %6.1f %5.1f %5.1f %s %4.1f" % ( + self.last_save_mins, self.inside_temp, self.inside_humidity, self.outside_temp, + self.outside_humidity, self.pressure, self.wind_speed, self.wind_gust, + self.wind_direction, self.rain)