IRC Bot
UnalloBot is an IRC bot that facilitates a number of functions for Unallocated Space. It started as a way to check to see if the space was open and grew from there.
Related: IRC & Minecraft
- Version 2.5 Major addition to 2.5 is the ability to add new commands and edit old commands on the fly by editing botfunc.py then echoing "update" into the irc named pipe.
- Version 2.7 Saw the inclusion of command approximation and the removal of several command aliases which can still be used via the aforementioned approximation (Levenshtein)
bot.py
#!/usr/bin/env python import socket,urllib,sys,threading,time,serial,botfunc class pipein(threading.Thread): def __init__(self): threading.Thread.__init__(self) threading.Thread.daemon = True def run (self): global irc while True: tmp=sys.stdin.readline().strip() if tmp != "": if tmp == "update": reload(botfunc) else: irc.send('PRIVMSG #unallocatedspace :\001ACTION '+tmp.strip()+'\001\r\n') print tmp.strip() time.sleep(1) while True: try: network = 'irc.mzima.net' network = 'irc.prison.net' port=6667 irc=socket.socket(socket.AF_INET,socket.SOCK_STREAM) irc.connect((network,port)) print irc.recv(4096) irc.send('NICK UnalloBot\r\n') irc.send('USER UnalloBot UnalloBot UnalloBot :Unallocated Bot\r\n') irc.send('VERSION 2\r\n') irc.send('JOIN #unallocatedspace\r\n') pipein().start() while True: data=irc.recv(4096) if data.find('PING')!=-1: irc.send ('PONG '+data.split()[1]+'\r\n') elif data.find('PRIVMSG #unallocatedspace :!')!=-1: data=data[data.find(' :!')+3:].strip().replace("'",'') command,u,data=data.partition(" ") if command in botfunc.commands: botfunc.send(irc,botfunc.commands[command](data)) #command exists, run it else: botfunc.send(irc,botfunc.commands[botfunc.closest(botfunc.commands,command)](data)) #command does not exist, approximate if possible except: print "something fucked up" print sys.exc_info()[1] time.sleep(30)
botfunc.py
import socket,urllib,time,serial,os,string,sys from xml.dom import minidom from random import choice def help(data): data=data.replace('!','') helps={ 'status':'!status returns the current status of the space. (open/closed)', 'sign':'!sign returns or updates the text displayed on the prolite LED sign.', 'tweet':'!tweet returns the latest tweet on the @Unallocated twitter account.', 'site':'!site returns the latest blog post on http://unallocatedspace.org/', 'mc':'!mc facilitates live communication with people playing on the Unallocated Minecraft server. (not active right now)', 'rollcall':'!rollcall lists Unallocated Space members that have checked into the space with their UAS member smart cards during the current session (opening to closing)', 'phone':'!phone prints the phone number of the space. 512-943-2827. ooh how meta.', 'weather':'!weather returns the weather conditions outside the space.', 'address':address(None), 'request':'!request is used to make improvment or feature requests for UnalloBot.', 'wiki':'!wiki when used with text returns a link most closely related to the search term on the Unallocated Wiki.', 'link':'!link returns various links to Unallocated Space related web pages. List can be altered here: http://www.unallocatedspace.org/wiki/Links', 'video':'!video queries the Unallocated Space youtube account and returns the top result.', 'nowplaying':'!nowplaying returns the current music track information playing on the sound system' } return helps[data] if data in helps else 'Available commands are: !'+' !'.join(helps.keys()) def status(data): return open('/tmp/status').read()[1:] def tweet(data): xmldoc=url_get('https://twitter.com/statuses/user_timeline/165951985.rss?count=1','dom') return "Last Tweet: "+xmldoc.getElementsByTagName("item")[0].getElementsByTagName("title")[0].toxml()[20:-8] def site(data): xmldoc=url_get('http://www.unallocatedspace.org/uas/feed/rss/','dom') title=xmldoc.getElementsByTagName('item')[0].getElementsByTagName('title')[0].toxml()[7:-8] link= xmldoc.getElementsByTagName('item')[0].getElementsByTagName('link')[0].toxml()[6:-7] return 'Last Post: %s - %s' % (title, link) def nowplaying(data): try: s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('192.168.50.133',8000)) message=s.recv(1024) s.close() except socket.error: message="The music machine is not on, or is not responding." return ''.join([x for x in message if ord(x) < 128]).strip() def sign(data): try: if data=="": message='The last sign update read as: '+open('/tmp/sign','r').read() else: if '<FO>' in data: message="<FO> is not allowed" else: s=socket.socket(socket.AF_INET,socket.SOCK_STREAM) s.connect(('127.0.0.1',9001)) s.sendall(data) message=s.recv(1024) s.close() os.system("echo '"+sys.argv[0]+"' > /tmp/lol") if sys.argv[0] == "/uas/responder/imap_responder.py" and "Updating sign to " in message: os.system("echo 'Mobile update to sign: "+message+"' > /uas/irc/irc ") except socket.error: message="Failed to update sign" return message def weather(data): xmldoc=url_get('http://www.google.com/ig/api?weather=21144','dom') data=xmldoc.getElementsByTagName('current_conditions')[0].getElementsByTagName('condition')[0].getAttribute('data').strip() + ' - ' data+="Temp: "+xmldoc.getElementsByTagName('current_conditions')[0].getElementsByTagName('temp_f')[0].getAttribute('data').strip() + ' - ' data+=xmldoc.getElementsByTagName('current_conditions')[0].getElementsByTagName('humidity')[0].getAttribute('data').strip() + ' - ' data+=xmldoc.getElementsByTagName('current_conditions')[0].getElementsByTagName('wind_condition')[0].getAttribute('data').strip() return data def links(data): data=data.lower() xmldoc = url_get('http://www.unallocatedspace.org/wiki/Links','dom') results=xmldoc.getElementsByTagName('p')[1].getElementsByTagName('a') for dat in results: if data in str(dat.toxml()).lower().split('{0}')[1:-1]: return 'Link for '+data+': '+str(dat.getAttribute('href')) return 'Nothing could be found.' def youtube(data): if data != '': xmldoc = url_get('https://gdata.youtube.com/feeds/api/videos?author=TheUnallocated&max-results=1&q=%s' % data, 'dom') if '>0<' not in xmldoc.getElementsByTagName('openSearch:totalResults')[0].toxml(): title=xmldoc.getElementsByTagName('entry')[0].getElementsByTagName('title')[0].toxml()[19:-8] link =xmldoc.getElementsByTagName('entry')[0].getElementsByTagName('link')[0].getAttribute('href')[0:-22] return title+': '+link return 'Nothing has been found' else: return 'Use this command with text to query the Unallocated Space youtube account for video.' def mcpipe(data): return 'Not right now.' os.system("echo 'say "+data+" (!irc to reply)' > /home/minecraft/minecraft/mcpipe") #throw in the background return '' def rollcall(data): status=open("/tmp/status").read() if status[0:1]=="+": peps=open("/uas/smart/whoishere").read().strip() peeps = ', '.join(list(set(peps.split('\n')))) if peeps.strip() != "": return "The following people have checked into the space this session. "+peeps[0:] else: return "No one has checked into the space this session." else: return "The space is closed, Rollcall is not allowed" def goingson(data): os.system("echo '"+status(None)+"' > /uas/irc/irc ") time.sleep(.1) os.system("echo '"+rollcall(None)+"' > /uas/irc/irc ") time.sleep(.1) os.system("echo '"+sign("")+"' > /uas/irc/irc ") time.sleep(.1) os.system("echo '"+site(None)+"' > /uas/irc/irc ") time.sleep(.1) os.system("echo '"+tweet(None)+"' > /uas/irc/irc ") time.sleep(.1) os.system("echo '"+weather(None)+"' > /uas/irc/irc ") return "" def wiki(data): if data: data=url_get('http://www.unallocatedspace.org/wik/index.php?title=Special:Search&search=%s&&fulltext=Search' % data) if "There were no results matching the query." in data: return "Nothing could be found." elif "<ul class='mw-search-results'>\n<li><a href=" in data: data=data[data.find("<ul class='mw-search-results'>\n<li><a href=")+44:] data=data[0:data.find('" title')] return 'Closest Match: http://www.unallocatedspace.org'+data return 'Something strange occured.' else: return 'Use this command with text to search the wiki.' def request(data): if data != "": f=open('requests.txt','a') f.write(data+"\n\n") f.close() return "Thank you for the feature request." else: return "Use this command with text to request new features." def alert(data): os.system("python /uas/ippower/alert.py &") return "Alert Sent." def trollcall(data): trollface=rollcall(data) return trollface.replace('the space this session. ','your mom. ') def phone(data): return "Call us at 512-943-2827!" def address(data): return "512 Shaw CT Suite 105, Severn MD 21144" def eightball(data): if data!='': return choice(['It is certain.','It is decidedly so.','Without a doubt.','Yes. definitely.','You may rely on it.','As I see it, yes.','Most likely.','Outlook good.','Signs point to yes.','Yes.','Reply hazy, try again.','Ask again later.','Better not tell you now.','Cannot predict now.','Concentrate and ask again.','Don\'t count on it.','My reply is no.','My sources say no.','Outlook not so good.','Very doubtful.']) else: return 'I can do nothing unless you ask me a question....' ################### def command_aliaser(commandz): commands={} for k in commandz: for c in k.split(): commands[c]=commandz[k] return commands commands = command_aliaser({ 'status space':status, 'rollcall':rollcall, 'tweet':tweet, 'site blog':site, 'sign':sign, 'mc':mcpipe, 'phone':phone, 'address':address, 'weather':weather, 'goingson allthethings':goingson, 'request':request, 'alert':alert, 'help commands':help, 'wiki':wiki, 'links':links, 'video youtube':youtube, 'nowplaying music song':nowplaying, 'trollcall':trollcall, '8ball eightball magic8ball magiceightball':eightball }) responder_commands={'status':status,'rollcall':rollcall,'tweet':tweet,'site':site,'sign':sign,'weather':weather,'address':address} def send(irc, text): if text.strip() != "": irc.send('PRIVMSG #unallocatedspace :\001ACTION '+str(text).strip()+'\001\r\n') def url_get(data,type="raw"): dat = urllib.urlopen(data) if type == "dom": data=minidom.parse(dat) else: data=dat.read() dat.close() return data def update(data): reload(botfunc) def levenshtein(a,b): n, m = len(a), len(b) if n > m: a,b = b,a n,m = m,n current = range(n+1) for i in range(1,m+1): previous, current = current, [i]+[0]*n for j in range(1,n+1): add, delete = previous[j]+1, current[j-1]+1 change = previous[j-1] if a[j-1] != b[i-1]: change = change + 1 current[j] = min(add, delete, change) return current[n] def closest(coms,u): match = min(commands,key=lambda v:len(set(u)^set(v))) return match if levenshtein(u, match) <= 3 else "help"