LED Sign: Difference between revisions
Jump to navigation
Jump to search
No edit summary |
No edit summary |
||
Line 6: | Line 6: | ||
signDaemon.py By Textile | signDaemon.py By Textile | ||
<pre> | <pre> | ||
Line 15: | Line 15: | ||
#pyserial imports | #pyserial imports | ||
import serial | import serial | ||
import os | |||
class server(threading.Thread): | class server(threading.Thread): | ||
def __init__(self,signNS,controllerNS,ip="0.0.0.0",port=9001): | def __init__(self,signNS,controllerNS,ip="0.0.0.0",port=9001): | ||
threading.Thread.__init__(self) | threading.Thread.__init__(self) | ||
self.ip = ip | threading.Thread.daemon = True | ||
self.ip = ip | |||
self.port = port | self.port = port | ||
self.sign = signNS #sign is the namespace of the led sign class | self.sign = signNS #sign is the namespace of the led sign class | ||
Line 39: | Line 41: | ||
print("Listen socket closed") | print("Listen socket closed") | ||
def GetMessage(self): | def GetMessage(self): | ||
""" | """ | ||
Get a message from a client and append it to the Sign Buffer to be displayed | Get a message from a client and append it to the Sign Buffer to be displayed | ||
""" | """ | ||
print("Waiting for Connections \r") | |||
channel, details = self.mySocket.accept() | |||
print("We have opened a Connection with "+str(details)) | |||
try: | |||
message = channel.recv(1024) | |||
if self.sign.SignCheck() == False: | |||
self.sign.signBufHistory.append('%s,%s'%(details[0],message)) | channel.sendall('The sign is turned off') | ||
return None | |||
self.sign.signBufHistory.append('%s,%s'%(details[0],message)) | |||
self.sign.signBuffer.append(message) | |||
except Exception: | |||
return False | |||
print(len(self.sign.signBuffer)) | |||
channel.sendall("Updating sign to "+message) | |||
os.system("python /uas/random/ringaling.py 40") | |||
return None | |||
def recover(self): | def recover(self): | ||
Line 92: | Line 95: | ||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||
pass | |||
self.CloseListener() | self.CloseListener() | ||
print("The Listener Closed") | print("The Listener Closed") | ||
self.cancel() | #self.cancel() | ||
self.exit(-1) | self.exit(-1) | ||
Line 104: | Line 108: | ||
self.parity = serial.PARITY_NONE | self.parity = serial.PARITY_NONE | ||
self.stopbit = serial.STOPBITS_ONE | self.stopbit = serial.STOPBITS_ONE | ||
self.timeout = | self.timeout = .5 | ||
self.xonxoff = 0 | self.xonxoff = 0 | ||
self.rtscts = 0 | self.rtscts = 0 | ||
Line 122: | Line 126: | ||
fmsg = "<ID01><P%s><CL> {0} %s \r\n"[:1016] %(page,msg) | fmsg = "<ID01><P%s><CL> {0} %s \r\n"[:1016] %(page,msg) | ||
return fmsg | return fmsg | ||
def SignCheck(self): | |||
""" | |||
Check to see if the sign is on, True if on False if off | |||
""" | |||
self.OpenSerial() | |||
self.signObj.write("<ID01>\r\n") | |||
responce = self.signObj.read(10) | |||
self.CloseSerial() | |||
if len(responce) >=8 and responce[:8] == "\x13<ID01>S": | |||
return True | |||
else: | |||
return False | |||
def OpenSerial(self): | def OpenSerial(self): | ||
Line 190: | Line 207: | ||
def cancel(self): | def cancel(self): | ||
" | """ | ||
exit the program | |||
""" | |||
with self.safety: | |||
if self.running.is_set(): | if self.running.is_set(): | ||
print("\nTerminating...") | print("\nTerminating...") | ||
sys.exit(-1) | |||
else: | else: | ||
return | return | ||
Line 225: | Line 244: | ||
csign.write(current[current.find(',')+1:]) | csign.write(current[current.find(',')+1:]) | ||
csign.close() | csign.close() | ||
os.system("bash ./sign2wave.sh") # for the asterisk box | |||
except IOError: | except IOError: | ||
print('Sign History file not found\n') | print('Sign History file not found\n') | ||
Line 270: | Line 292: | ||
except KeyboardInterrupt: | except KeyboardInterrupt: | ||
pass | |||
self.cancel() | self.cancel() | ||
if __name__ == "__main__": | if __name__ == "__main__": | ||
Line 281: | Line 302: | ||
Latest revision as of 14:07, 6 September 2011
http://wls.wwco.com/ledsigns/prolite/
Source Code
signDaemon.py By Textile
#support python 3.0 compliant print statement from __future__ import print_function #default imports import socket,sys,threading,time,re #pyserial imports import serial import os class server(threading.Thread): def __init__(self,signNS,controllerNS,ip="0.0.0.0",port=9001): threading.Thread.__init__(self) threading.Thread.daemon = True self.ip = ip self.port = port self.sign = signNS #sign is the namespace of the led sign class self.controller = controllerNS #namespace of the controller used catch request to kills self.die = False def OpenListener(self): """ Open socket to lisen for messages """ self.mySocket = socket.socket ( socket.AF_INET, socket.SOCK_STREAM ) self.mySocket.bind((self.ip, self.port)) self.mySocket.listen(5) def CloseListener(self): """ Close Socket """ self.mySocket.close() print("Listen socket closed") def GetMessage(self): """ Get a message from a client and append it to the Sign Buffer to be displayed """ print("Waiting for Connections \r") channel, details = self.mySocket.accept() print("We have opened a Connection with "+str(details)) try: message = channel.recv(1024) if self.sign.SignCheck() == False: channel.sendall('The sign is turned off') return None self.sign.signBufHistory.append('%s,%s'%(details[0],message)) self.sign.signBuffer.append(message) except Exception: return False print(len(self.sign.signBuffer)) channel.sendall("Updating sign to "+message) os.system("python /uas/random/ringaling.py 40") return None def recover(self): """ recover if the listener crashes """ try: self.CloseListener() self.OpenListener() except Exception: print("waiting for port to close:") try: self.CloseListener() time.sleep(10) self.OpenListener() except Exception: print("you broke it you dumb mook") self.controller.cancel() def run(self): """ Run the thread in a loop """ try: self.OpenListener() while True: if not self.controller.running.is_set(): self.CloseListener() break else: if self.GetMessage() == False: self.recover() except KeyboardInterrupt: pass self.CloseListener() print("The Listener Closed") #self.cancel() self.exit(-1) class ledSign: def __init__(self, tty='/dev/ttyUSB0'): self.tty = tty self.baud = 9600 self.bytesize = serial.EIGHTBITS self.parity = serial.PARITY_NONE self.stopbit = serial.STOPBITS_ONE self.timeout = .5 self.xonxoff = 0 self.rtscts = 0 self.signBuffer = [] #buffer to be written self.signBufHistory = [] #buffer that was written self.formatRE = re.compile('<ID\d\d><\w\w>.*') #match something that looks like <ID01><PA> def formatMSG(self,msg,page='A'): """ Format a message with page and propper headers and footers """ if str(msg) == '': #clear each page fmsg = '<ID01><P%s> %s \r\n'[:1016] %(page,msg) elif self.formatRE.match(msg) != None: #use client supplied formatting fmsg = str(msg)+'\r\n'[:1016] else: fmsg = "<ID01><P%s><CL> {0} %s \r\n"[:1016] %(page,msg) return fmsg def SignCheck(self): """ Check to see if the sign is on, True if on False if off """ self.OpenSerial() self.signObj.write("<ID01>\r\n") responce = self.signObj.read(10) self.CloseSerial() if len(responce) >=8 and responce[:8] == "\x13<ID01>S": return True else: return False def OpenSerial(self): """ Open led sign serial device """ try: self.signObj = serial.Serial(self.tty,self.baud,self.bytesize, self.parity,self.stopbit,self.timeout,self.xonxoff,self.rtscts) self.signObj.flush() #flush serial connection except Exception: print("Open Failed") return def ClearSign(self): """ A function to blank out every page on the sign """ print("Clearing all sign Pages") for x in range(65,91): #time.sleep(.2) self.CycleWriteMsg('',chr(x)) #1016 is max message len def SendSTDMessage(self,message,page='A'): """ Send a standard text message to the LED screen """ STDMessage = self.formatMSG(message,page) self.signObj.write(STDMessage) #1016 is max message len def CloseSerial(self): """ Flush all data to serial device and then close connection """ if self.signObj.isOpen() is True: self.signObj.flush() self.signObj.close() def CycleWriteMsg(self,message,page): """ Open serial device write message then close it """ if self.signObj.isOpen() is True: self.CloseSerial() self.OpenSerial() self.SendSTDMessage(message,page) self.CloseSerial() else: self.OpenSerial() self.SendSTDMessage(message,page) self.CloseSerial() class controller: """ Class to controll the LCD sign """ def __init__(self,signConnector): self.running = threading.Event() #used to kill thread self.newMessage = "" self.spaceSign = ledSign(signConnector) self.spaceSign.OpenSerial() self.spaceSign.ClearSign() self.spaceSign.CloseSerial() self.ledServer = server(self.spaceSign,self) self.ledServer.start() self.running.set() #mark that the server thread is running self.safety = threading.RLock() def cancel(self): """ exit the program """ with self.safety: if self.running.is_set(): print("\nTerminating...") sys.exit(-1) else: return def exitJob(self): """ Close out anything we had running and exit THIS FUNCTION IS ALL FUCKED UP IGNORE THIS CODE Ill fix the control C break later """ self.spaceSign.CloseSerial() self.ledServer.mySocket.connect((self.ledServer.ip,self.ledserver.port)) self.ledServer.mySocket.close() return def writeHistory(self): """ Write out log file of every item written to sign """ #should check what page its written to? try: if len(self.spaceSign.signBufHistory) == 0: pass else: current = self.spaceSign.signBufHistory[-1] history = open('SignHistory.csv','a') history.writelines(current) history.close() csign = open('/tmp/sign','w')# added by CP for bot stuff csign.write(current[current.find(',')+1:]) csign.close() os.system("bash ./sign2wave.sh") # for the asterisk box except IOError: print('Sign History file not found\n') def readHistory(self): """ Read in log file to display last message update """ try: history = open('SignHistory.csv','r') x = history.readlines()[-1].strip() if x == '': history.seek(0) x = history.readlines()[-2].strip() history.close() return x except IOError: print('Sign History file not found\n') return " , " def main(self): """ Control the LED sign from this function """ try: #display the last updated message self.spaceSign.OpenSerial() self.spaceSign.SendSTDMessage(self.readHistory().split(',')[1]) self.spaceSign.CloseSerial() while True: if not self.running.is_set(): self.exitJob() return if len(self.spaceSign.signBuffer) != 0: time.sleep(2) self.spaceSign.OpenSerial() self.newMessage = self.spaceSign.signBuffer.pop(0) self.spaceSign.SendSTDMessage(self.newMessage) print("Wrote to Sign: "+str(self.newMessage)) self.spaceSign.CloseSerial() self.writeHistory() else: time.sleep(5) except KeyboardInterrupt: pass self.cancel() if __name__ == "__main__": #test handler script controller(sys.argv[1]).main()
The sign now "dings" with a physical alarm bell when updated.
import serial,time,sys s = serial.Serial('/dev/ttyUSB0', baudrate = 9600, rtscts='rtscts', xonxoff='xonxoff', timeout=1000) time.sleep(1.5) s.setDTR(False) time.sleep(0.022) s.setDTR(True) t=sys.argv[1] s.write(str(t)+"\n") time.sleep((float(t))/1000)