Skip to main content
Topic: using #Twatch as ICelsius wireless bbq thermometer monitor (Read 4670 times) previous topic - next topic

using #Twatch as ICelsius wireless bbq thermometer monitor

Finally I found a purpose for my twatch.
I use it as a monitor for my iclesius wirless thermometer.
So I wrote a quick and dirty python script which works as a relay between the icelsius and the twatch.
The icelsius periodically broadcast the measure data to the local network.
The python script listen to the local network and capture this packet ,and then transform the packet  to a format which the twatch understand.

Code: [Select]
#!/usr/bin/python
# Version 1.0
# LastChange 02.1.2016
#
import socket
import time
import datetime
import threading
import signal
import sys

class ICelsiusPacket:
def __init__(self,packetData):
self.isPacketValid = False
self.isTempOk = False
params = self.buildParameterHashMapFromPacketData(packetData)
if "SensorID" in params:
self.sensorID = params["SensorID"]
self.battery = self.translateVoltIntegerToVoltageUnit(params["battery"])
self.isPacketValid = True

if "temp1" in params:
self.temp1 = self.translateTempIntegerToCelsiusUnit(params["temp1"])
self.temp2 = self.translateTempIntegerToCelsiusUnit(params["temp2"])
self.isTempOk = True


def translateVoltIntegerToVoltageUnit(self,intVolt):
return long(intVolt) / 1000.0

def translateTempIntegerToCelsiusUnit(self,intTemp):
return long(intTemp) / 100.0 - 250.0

def buildParameterHashMapFromPacketData(self,packetData):
result = {}
params = packetData.split('&')
for param in params:
paramTuple = param.split('=')
if (len(paramTuple) > 0):
result[paramTuple[0]] =  "" if len(paramTuple) == 1 else paramTuple[1]
return result

class Twatch:
MAXCOLUMNS=20
MAXROWS=4
def __init__(self,address,port):
self.address = address
self.port = port

def updateAddress(self,address):
self.address = address

def sendICelsiusPacket(self,dataICelsiusPacket):
try:
packets = self.formatICelsiusPacket(dataICelsiusPacket)
self.openConnection()
for packet in packets:
self.send(packet)
self.closeConnection()
isSendOk = True
except:
#print "uups twach possible offline."
isSendOk = False
return isSendOk

def formatICelsiusPacket(self,dataICelsiusPacket):
result = []
result.append(self.getCursorPositionCommand(1,1))
if dataICelsiusPacket.isTempOk:
result.append(self.fixStringLength("Temp1: {0:.1f}C".format(dataICelsiusPacket.temp1)))
else:
result.append(self.fixStringLength("Temp1: -.-"))
result.append(self.getCursorPositionCommand(2,1))
if dataICelsiusPacket.isTempOk:
result.append(self.fixStringLength("Temp2: {0:.1f}C".format(dataICelsiusPacket.temp2)))
else:
result.append(self.fixStringLength("Temp2: -.-"))
result.append(self.getCursorPositionCommand(3,1))
result.append(self.fixStringLength("Akku : {0:.3f}V".format(dataICelsiusPacket.battery)))
result.append(self.getCursorPositionCommand(4,1))
result.append(self.fixStringLength("Time : "+ datetime.datetime.now().strftime('%H:%M:%S')))
return result

def getClearDisplayCommand(self):
return chr(0xFE)+chr(0x58)

def getCursorPositionCommand(self,row,column):
row = row if (row > 0 and row <= self.MAXROWS ) else 1
column = column if (column > 0 and column <= self.MAXCOLUMNS) else 1
return chr(0xFE)+chr(0x47)+chr(column)+chr(row)

def fixStringLength(self,text):
return "{0: <{maxColumns}}".format(text,maxColumns=self.MAXCOLUMNS)

def openConnection(self):
self.sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
self.sock.connect((self.address,self.port))

def closeConnection(self):
self.sock.close( )

def send(self,data):
self.sock.send(data)

class TwatchAutoConf:
def __init__(self,broadCastAddress,port,callBack):
self.broadCastAddress = broadCastAddress
self.port = port
self.callBack = callBack
self.isListing = True
def stopListen(self):
self.isListing = False
def startListen(self):
listenThreadTwatch = threading.Thread(target=self.listenForPackets)
listenThreadTwatch.start()
def listenForPackets(self):
while self.isListing:
try:
sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.settimeout(10)
sock.bind((self.broadCastAddress,self.port))
data, addrTuple = sock.recvfrom(61)
address,port = addrTuple
self.callBack(address)
except socket.timeout:
pass


def signal_term_handler(signal, frame):
global twatch
twatchConf.stopListen()
sys.exit(0)

signal.signal(signal.SIGTERM, signal_term_handler)
signal.signal(signal.SIGINT, signal_term_handler)


SubNetBroadCastAddress = '192.168.2.255'
ICelsiusBroadCastPort = 54521
TwatchBroadCastPort = 30303
TwatchPort = 1337
TwatchIp = ''

sock = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
sock.bind((SubNetBroadCastAddress,ICelsiusBroadCastPort))
twatch = Twatch(TwatchIp,TwatchPort)

try:
twatchConf = TwatchAutoConf('255.255.255.255',TwatchBroadCastPort,twatch.updateAddress)
if TwatchIp == '':
twatchConf.startListen()
while True:
data, addr = sock.recvfrom(1024)
iPacket = ICelsiusPacket(data)
if  iPacket.isPacketValid:
twatch.sendICelsiusPacket(iPacket)
#print data
finally:
twatchConf.stopListen()

[attachment=0]

greetings
Daniel

Re: using #Twatch as ICelsius wireless bbq thermometer monit

Reply #1
Thanks for posting this.  It was a great starting point for integrating my icelsius thermometers I just bought to monitor my wine cellar into my weewx webpage that I use for weather data.
Ed