################################################
# Sample (vulnerable) web application: Squigler
# a site to post any and all your squigs.
# by default, starts the webserver listening
# on localhost at:
#
# http://localhost:8080/
#
# To start server:
# python webserver.py
#
# Arel Cordero
from wsgiref import util # simple python web server
from string import Template # basic template engine
import random
import sqlite3 # database
import urlparse
import Cookie # parsing HTTP cookies
import traceback
import time
import re
import urllib
DBFN = "database.sqlite3"
link_finder = re.compile(r"(.*?)@(\w+)(.*)")
# Functions for creating and populating a new database.
def create_database():
conn = sqlite3.connect(DBFN)
c = conn.cursor()
c.execute("CREATE TABLE accounts (username string primary key, password string, public boolean default 'f');")
c.execute("CREATE TABLE squigs (username string, body text, time datetime);")
c.execute("CREATE TABLE sessions (session_id primary key, username string, time datetime);")
conn.commit()
c.close()
def populate_with_storyline():
f = open("storyline.txt").readlines()
for l in f:
u,s = l.split(":")
u = u.strip()
s = s.strip()
post_squig(u, s)
print u, "POSTED", s
time.sleep(1.1) # wait 1.1 seconds between posts
# Controllers for interacting with web application
def get_all_public_users():
conn = sqlite3.connect(DBFN)
c = conn.cursor()
users = c.execute("SELECT username from accounts WHERE public='t';").fetchall()
c.close()
return users if users else []
def get_user(session_id):
if not session_id: return None
conn = sqlite3.connect(DBFN)
c = conn.cursor()
user = c.execute("SELECT username from sessions WHERE session_id='%s';" % session_id).fetchone()
user = user[0] if user else None
c.close()
return user if user else None
def is_private(user):
if not user: return False
conn = sqlite3.connect(DBFN)
c = conn.cursor()
public = c.execute("SELECT public from accounts where username='%s'" % user).fetchone()
if public:
public = public[0]
print "IS PUBLIC?", public
public = True if public == 't' else False
c.close()
return not public
else:
return False
def logout_user(user):
if not user: return None
conn = sqlite3.connect(DBFN)
c = conn.cursor()
c.execute("DELETE FROM sessions WHERE username='%s';" % user)
conn.commit()
c.close()
return None
def test_password(user, password):
if not user: return False
conn = sqlite3.connect(DBFN)
c = conn.cursor()
db_password = c.execute("SELECT password from accounts where username='%s'" % user).fetchone()
db_password = db_password[0] if db_password else None
c.close()
print password, "=?", db_password
print password == db_password
return password == db_password
def login_user(user):
if not user: return None
logout_user(user)
r = str(random.randint(0,10**9))
conn = sqlite3.connect(DBFN)
c = conn.cursor()
c.execute("INSERT INTO sessions VALUES ('%s', '%s', datetime('now', 'localtime'));" % (r, user))
conn.commit()
c.close()
return r
def get_squigs(user):
if not user: return []
conn = sqlite3.connect(DBFN)
c = conn.cursor()
squigs = c.execute("SELECT body,time from squigs where username='%s' order by time desc limit 10" % user).fetchall()
c.close()
return squigs
def post_squig(user, squig):
if not user or not squig: return
conn = sqlite3.connect(DBFN)
c = conn.cursor()
c.executescript("INSERT INTO squigs VALUES ('%s', '%s', datetime('now', 'localtime'));" % (user, squig))
conn.commit()
c.close()
def del_squig(user, time):
if not user or not time: return
conn = sqlite3.connect(DBFN)
c = conn.cursor()
c.executescript("DELETE FROM squigs WHERE username='%s' and time='%s';" % (user, time))
conn.commit()
c.close()
def search_squigs(query):
if not query: return []
conn = sqlite3.connect(DBFN)
c = conn.cursor()
squigs = c.execute("""select squigs.username, squigs.body, squigs.time from squigs,accounts where squigs.username==accounts.username and (body like '%%%s%%' OR squigs.username='%s') and accounts.public='t' order by squigs.time desc limit 10;""" % (query,query.strip())).fetchall()
c.close()
return squigs
def add_links(squigR):
squigL = ""
m = link_finder.match(squigR)
while m:
if is_private(m.group(2)):
# do not link to private usernames
squigL = squigL + m.group(1) + "@" + m.group(2)
squigR = m.group(3)
else:
squigL = squigL + m.group(1) + ('@%s' % (m.group(2), m.group(2)))
squigR = m.group(3)
m = link_finder.match(squigR)
return squigL + squigR
# HTML Templates for each view
index_page = Template("""
$title
$body
""")
user_page = Template("""
My Squig Page
$username $lock
$form
$squigs
""")
list_user_page = Template("""
List of Public Users
$users
""")
search_results_page = Template("""
Search results
""")
login_page = Template("""
Log in
""")
redirect = Template("""
""")
four_oh_four = Template("""
404-ed!
The requested URL $url was not found.
""")
# Template Variables for each page (not really used)
pages = {
'index': { 'title': "Hello There",
'body':
"""This is a test of the WSGI system.
Perhaps you would also be interested in
this page?""",
'text': ""
},
'this_page': { 'title': "You're at this page",
'body':
"""Hey, you're at this page.
Go back?"""
},
'login': {}
}
# Request handler. This is called for every time a URL is requested from this web application.
# based on http://probablyprogramming.com/2008/06/26/building-a-python-web-application-part-1/
def handle_request(environment, start_response):
response = ""
try:
# get the file name from the requested URL
fn = environment.get("PATH_INFO")[1:]
print "REQUESTED FILENAME", fn
# get any query variables from the requested URL (e.g., http://.../?squig=the_message)
query = urlparse.parse_qs(environment.get("QUERY_STRING"))
# get session_id value from cookie
C = Cookie.SimpleCookie(environment.get("HTTP_COOKIE"))
session_id = C.get('session_id').value if C.get('session_id') else None
# look up user given session_id
user = get_user(session_id)
print "USER", user, environment.get("HTTP_COOKIE")
# Image handling approach: anything in "images/" is treated as a PNG file.
# Beware path traversal!
if not fn:
fn = 'index'
if fn.startswith("images/"):
print "OPENING IMAGE!"
start_response('200 OK', [('content-type', 'image/png')])
return [open(fn).read()]
# Code for handling page requests and actions.
if fn == 'index':
context = pages[fn]
links = []
if user:
links.append('Welcome %s' % user)
links.append('My squigs' % user)
links.append('Logout')
links.append('Browse users')
else:
links.append('Login')
links.append('Browse users')
context['body'] = " | ".join(links)
response = index_page.substitute(**context)
elif fn == 'redirect':
u = query.get("url")[0]
start_response('200 OK', [('location', str(u))])
return [redirect.substitute({"URL":u})]
elif fn == 'login':
response = login_page.substitute()
elif fn == 'logout':
next = query.get("redirect", ["/"])[0]
logout_user(user)
cookie = "session_id=-1"
start_response('200 OK', [('location', next),('set-cookie', cookie)]) # clear cookie...
return [redirect.substitute({"URL":next})]
elif fn == 'do_squig':
print "REDIRECT:", query.get("redirect")
next = query.get("redirect", ["/"])[0]
s = query.get("squig", [""])[0]
print "POSTING SQUIG", user, s
post_squig(user,s)
print "REDIRECT TO", next
start_response('200 OK', [('location', next)])
return [redirect.substitute({"URL":next})]
elif fn == 'do_delete_squig':
print "REDIRECT:", query.get("redirect")
next = query.get("redirect", ["/"])[0]
u = query.get("user", [""])[0]
t = query.get("time", [""])[0]
print "TRYING TO DELETE SQUIG", user, u, t
# Test that only logged in user is deleting their own squig...
if u == user:
print "DELETING SQUIG", user, u, t
del_squig(u,t)
print "REDIRECT TO", next
start_response('200 OK', [('location', next)])
return [redirect.substitute({"URL":next})]
elif fn == 'do_login':
next = query.get("redirect", ["/"])[0]
u = query.get("user", [""])[0]
p = query.get("password", [""])[0]
print u,p
# test password.
if test_password(u,p):
# create session object and login user.
sid=login_user(u)
cookie = "session_id=" + sid
print "COOKIE:", cookie
start_response('200 OK', [('location', next), ('set-cookie', cookie)])
return [redirect.substitute({"URL":next})]
else:
start_response('200 OK', [('location', '/login')])
return [redirect.substitute({"URL":"/login"})]
elif fn == "userpage":
u = query.get("user", [""])[0]
squigs = get_squigs(u)
print len(squigs)
form = ""
if u == user:
form = """
""" % (u)
output = ""
for sq in squigs:
# make delete button if necessary
delbutton = ""
if u == user:
delbutton = '' % (user, sq[1], user)
# build html...
output += ' %s %s %s
' % (delbutton, sq[1], add_links(sq[0]))
lock = ""
if is_private(u):
lock = """
"""
response = user_page.substitute({"squigs":output, "form":form, "username":u, "lock":lock})
elif fn == "public_users":
users = get_all_public_users()
output = ""
for u in users:
output += ' ' % (str(u[0]),str(u[0]))
response = list_user_page.substitute({"users":output})
elif fn == "search":
s = query.get("q", [""])[0]
results = search_squigs(s)
output = ""
for sq in results:
output += '' % (sq[0],sq[2], sq[0], add_links(sq[1]))
response = search_results_page.substitute({"results":str(output)})
elif fn.startswith("favicon.ico"):
# no favicon.
pass
else:
raise BaseException("not found")
start_response('200 OK', [('content-type', 'text/html')])
except:
start_response('404 Not Found', [('content-type', 'text/html')])
# response = four_oh_four.substitute(url=util.request_uri(environment))
response = four_oh_four.substitute(url=urllib.unquote(util.request_uri(environment)))
traceback.print_exc()
response = str(response)
return [response]
if __name__ == '__main__':
from wsgiref import simple_server
print("Starting server on port 8080...")
try:
simple_server.make_server('', 8080, handle_request).serve_forever()
except KeyboardInterrupt:
print("Ctrl-C caught, Server exiting...")