Dangerous Prototypes

Other projects => Past projects => #twatch network LCD backpack => Topic started by: schnurly on January 09, 2016, 08:03:40 pm

Title: using #Twatch as ICelsius wireless bbq thermometer monitor
Post by: schnurly on January 09, 2016, 08:03:40 pm
Finally I found a purpose for my twatch.
I use it as a monitor for my iclesius wirless thermometer.
So I wrote a quick and dirty python script which works as a relay between the icelsius and the twatch.
The icelsius periodically broadcast the measure data to the local network.
The python script listen to the local network and capture this packet ,and then transform the packet  to a format which the twatch understand.

Code: [Select]
#!/usr/bin/python
# Version 1.0
# LastChange 02.1.2016
#
import socket
import time
import datetime
import threading
import signal
import sys

class ICelsiusPacket:
def __init__(self,packetData):
self.isPacketValid = False
self.isTempOk = False
params = self.buildParameterHashMapFromPacketData(packetData)
if "SensorID" in params:
self.sensorID = params["SensorID"]
self.battery = self.translateVoltIntegerToVoltageUnit(params["battery"])
self.isPacketValid = True

if "temp1" in params:
self.temp1 = self.translateTempIntegerToCelsiusUnit(params["temp1"])
self.temp2 = self.translateTempIntegerToCelsiusUnit(params["temp2"])
self.isTempOk = True


def translateVoltIntegerToVoltageUnit(self,intVolt):
return long(intVolt) / 1000.0

def translateTempIntegerToCelsiusUnit(self,intTemp):
return long(intTemp) / 100.0 - 250.0

def buildParameterHashMapFromPacketData(self,packetData):
result = {}
params = packetData.split('&')
for param in params:
paramTuple = param.split('=')
if (len(paramTuple) > 0):
result[paramTuple[0]] =  "" if len(paramTuple) == 1 else paramTuple[1]
return result

class Twatch:
MAXCOLUMNS=20
MAXROWS=4
def __init__(self,address,port):
self.address = address
self.port = port

def updateAddress(self,address):
self.address = address

def sendICelsiusPacket(self,dataICelsiusPacket):
try:
packets = self.formatICelsiusPacket(dataICelsiusPacket)
self.openConnection()
for packet in packets:
self.send(packet)
self.closeConnection()
isSendOk = True
except:
#print "uups twach possible offline."
isSendOk = False
return isSendOk

def formatICelsiusPacket(self,dataICelsiusPacket):
result = []
result.append(self.getCursorPositionCommand(1,1))
if dataICelsiusPacket.isTempOk:
result.append(self.fixStringLength("Temp1: {0:.1f}C".format(dataICelsiusPacket.temp1)))
else:
result.append(self.fixStringLength("Temp1: -.-"))
result.append(self.getCursorPositionCommand(2,1))
if dataICelsiusPacket.isTempOk:
result.append(self.fixStringLength("Temp2: {0:.1f}C".format(dataICelsiusPacket.temp2)))
else:
result.append(self.fixStringLength("Temp2: -.-"))
result.append(self.getCursorPositionCommand(3,1))
result.append(self.fixStringLength("Akku : {0:.3f}V".format(dataICelsiusPacket.battery)))
result.append(self.getCursorPositionCommand(4,1))
result.append(self.fixStringLength("Time : "+ datetime.datetime.now().strftime('%H:%M:%S')))
return result

def getClearDisplayCommand(self):
return chr(0xFE)+chr(0x58)

def getCursorPositionCommand(self,row,column):
row = row if (row > 0 and row <= self.MAXROWS ) else 1
column = column if (column > 0 and column <= self.MAXCOLUMNS) else 1
return chr(0xFE)+chr(0x47)+chr(column)+chr(row)

def fixStringLength(self,text):
return "{0: <{maxColumns}}".format(text,maxColumns=self.MAXCOLUMNS)

def openConnection(self):
self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.sock.connect((self.address,self.port))

def closeConnection(self):
self.sock.close( )

def send(self,data):
self.sock.send(data)

class TwatchAutoConf:
def __init__(self,broadCastAddress,port,callBack):
self.broadCastAddress = broadCastAddress
self.port = port
self.callBack = callBack
self.isListing = True
def stopListen(self):
self.isListing = False
def startListen(self):
listenThreadTwatch = threading.Thread(target=self.listenForPackets)
listenThreadTwatch.start()
def listenForPackets(self):
while self.isListing:
try:
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.settimeout(10)
sock.bind((self.broadCastAddress,self.port))
data, addrTuple = sock.recvfrom(61)
address,port = addrTuple
self.callBack(address)
except socket.timeout:
pass


def signal_term_handler(signal, frame):
global twatch
twatchConf.stopListen()
sys.exit(0)

signal.signal(signal.SIGTERM, signal_term_handler)
signal.signal(signal.SIGINT, signal_term_handler)


SubNetBroadCastAddress = '192.168.2.255'
ICelsiusBroadCastPort = 54521
TwatchBroadCastPort = 30303
TwatchPort = 1337
TwatchIp = ''

sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.bind((SubNetBroadCastAddress,ICelsiusBroadCastPort))
twatch = Twatch(TwatchIp,TwatchPort)

try:
twatchConf = TwatchAutoConf('255.255.255.255',TwatchBroadCastPort,twatch.updateAddress)
if TwatchIp == '':
twatchConf.startListen()
while True:
data, addr = sock.recvfrom(1024)
iPacket = ICelsiusPacket(data)
if  iPacket.isPacketValid:
twatch.sendICelsiusPacket(iPacket)
#print data
finally:
twatchConf.stopListen()

[attachment=0]

greetings
Daniel
Title: Re: using #Twatch as ICelsius wireless bbq thermometer monit
Post by: DrEd on July 21, 2016, 07:23:49 am
Thanks for posting this.  It was a great starting point for integrating my icelsius thermometers I just bought to monitor my wine cellar into my weewx webpage that I use for weather data.
Ed