#!/usr/bin/env python # Copyright (C) 2005 by Peter V. Radatti VERSION = (1, 3, 0, 'March 2005') import socket, thread, os, sys, urlparse, string, time, pwd from errno import * try: True, False except NameError: True, False = 1, 0 try: object except NameError: class object: pass class config(object): INIT_COMMANDS = ['ENABLE EXPAND', 'ENABLE ORIGINAL', 'ENABLE HTML 1'] LISTEN_ADDRESS = '0.0.0.0' LISTEN_PORT = 8082 VFIND_ADDRESS = '127.0.0.1' VFIND_PORT = 8081 PROXY_ADDRESS = None PROXY_PORT = None THREAD_TIMEOUT = None MAX_RETRY_WAIT = 60 TEMP_DIR = '/tmp' LOG_FILE = '$VSTK_HOME/var/log/vfproxy.log' PID_FILE = '$VSTK_HOME/var/run/vfproxy.pid' INI_FILE = '$VSTK_HOME/data/vfproxy/vfproxy.ini' UNPRIVILEGED_USER = 'nobody' TRANSPARENT = False ERRORS = { '403': { 'text': ('You have received this notice because VFind has detected' ' that the action you have taken has caused a threat to the' ' computer network. The name and the description of the threat' ' is listed below. If you think you have received the message' ' in error, please contact your network administrator.'), 'notice': 'Notice', 'warning': 'Warning', 'threat': 'Threat', }, 'default': { 'html': ('\r\n' '\r\n' ' \r\n' ' CyberSoft Threat Detection\r\n' ' \r\n' ' \r\n' ' \r\n' '
\r\n' '

%(code)s: %(cause)s

\r\n' '

CyberSoft Threat Detection %(notice)s

\r\n' '

%(warning)s: %(text)s

\r\n' '

%(threat)s Detected:

\r\n' ' \r\n' '
\r\n' ' \r\n' '\r\n'), 'text': ('You have received this notice because VFind failed to' ' determine wether the action you have taken could cause' ' a threat to the computer network. The name and the' ' description of the error is listed below. If this error' ' persists, please contact your network administrator.'), 'notice': 'Error', 'warning': 'Error', 'threat': 'Error', }, } def htmlquote(text): text = string.replace(text, '&', '&') text = string.replace(text, '<', '<') text = string.replace(text, '>', '>') return text resource_errors = [] for error in ['EAGAIN', 'ENOMEM', 'EBUSY', 'ENFILE', 'EMFILE', 'ENOSPC', 'ETIMEDOUT', 'EDEADLK']: try: resource_errors.append(eval(error)) except NameError: pass def syserror(message): try: errno, message = message.errno, message.strerror except AttributeError: try: errno, message = message except (ValueError, TypeError): errno = 0 message = str(message) if errno in resource_errors: return 503, errno, message else: return 500, errno, message def configure(): errors = 0 vstk_home = os.environ.get('VSTK_HOME', '.') config.LOG_FILE = config.LOG_FILE.replace('$VSTK_HOME', vstk_home) config.PID_FILE = config.PID_FILE.replace('$VSTK_HOME', vstk_home) config.INI_FILE = config.INI_FILE.replace('$VSTK_HOME', vstk_home) try: ini = open(config.INI_FILE, 'r') except IOError: pass else: linenum = 0 while True: linenum = linenum + 1 line = ini.readline() if not line: break line = string.strip(line) if not line or line[0] == '#': continue line = string.split(line, '=') variable = string.join(string.split(string.upper(string.strip(line[0]))), '_') value = string.strip(string.join(line[1:], '=')) try: setattr(config, variable, eval(value)) except Exception: try: setattr(config, variable, value) except Exception: errors = errors + 1 sys.stderr.write('Syntax error in vfproxy.ini line %d: %s\n' % (linenum, string.join(line, '='))) variables = { 'code': None, 'cause': None, 'count': None, 'threats': None } for error in ERRORS.values(): for variable in error.keys(): variables[variable] = None variables = variables.keys() dir = os.path.join(vstk_home, 'data', 'vfproxy') try: files = os.listdir(dir) except OSError, message: code, errno, message = syserror(message) if errno != ENOENT: sys.stderr.write('%s: %s\n' % (dir, message)) sys.exit(1) files = [] for file in files: name = string.split(file, '.') if len(name) != 2: continue name, ext = name if ext not in ['text', 'html']: continue content = open(os.path.join(dir, file), 'r').read() if ext == 'html': content = string.replace(content, '%', '%%') for variable in variables: content = string.replace(content, '', '%(' + variable + ')s') if ext == 'text': content = htmlquote(content) content = string.replace(content, '\n', '\r\n') ERRORS.setdefault(name, {})[ext] = content for error in ERRORS.values(): if error.has_key('html'): error['html'] = ( 'HTTP/1.0 %(code)s %(cause)s\r\n' 'Content-Type: text/html\r\n' 'Connection: close\r\n' '\r\n' ) + error['html'] if errors: sys.exit(1) def debug(message): pass # sys.stderr.write(message + '\n') def dot(dot): pass # sys.stderr.write(dot) def list_item(text): return '
  • ' + htmlquote(text) + '
  • ' def error_message(code, cause, threats): edit = { 'code': str(code), 'cause': cause, 'count': len(threats), 'threats': string.join(map(list_item, threats), '\n'), } edit.update(ERRORS['default']) error = ERRORS.get(str(code)) if error is not None: edit.update(error) if len(threats) != 1: edit['threat'] = edit['threat'] + 's' return edit['html'] % edit class ProtocolError(Exception): pass class BailOut(Exception): pass class bail(object): def __getattr__(*args): raise BailOut() def __setattr__(*args): raise BailOut() def __call__(*args): raise BailOut() bail = bail() def sendall(socket, buffer): while True: sent = socket.send(buffer) remains = len(buffer) - sent if not remains: break buffer = buffer[sent:] class makefile(object): def __init__(self, socket, mode): self.socket = socket self.buffer = [] def flush(self): pass def close(self): self.socket.close() def write(self, data): sendall(self.socket, data) def read(self, size): if self.buffer: if len(self.buffer[0]) > size: try: return self.buffer[0][:size] finally: self.buffer[0] = self.buffer[0][size:] else: try: return self.buffer[0] finally: del self.buffer[0] else: result = self.socket.recv(size) return result def readline(self): while True: if self.buffer and '\n' in self.buffer[-1]: break data = self.socket.recv(512) if not data: break self.buffer.append(data) if not self.buffer: return '' result = self.buffer self.buffer = [] try: end = string.index(result[-1], '\n') + 1 except ValueError: pass else: if end < len(result[-1]): self.buffer = [result[-1][end:]] result[-1] = result[-1][:end] return string.join(result, '') class VFind(object): def __init__(self, port): self.clients = {} self.mutex = thread.allocate_lock() self.server = None self.readline = None self.port = port self.retries = 0 def connect(self): self.mutex.acquire() try: if not self.server or not self.readline: server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.connect((config.VFIND_ADDRESS, self.port)) self.readline = makefile(server, 'r').readline self.server = server ready = string.strip(self.readline()) debug('<< ' + ready) if string.lower(ready) != 'ready': raise ProtocolError(0, 'VFind daemon not ready: ' + ready) for command in config.INIT_COMMANDS: self.command(command) return self.server, self.readline finally: self.mutex.release() def command(self, command): debug('>> ' + command) sendall(self.server, command + '\r\n') response = string.strip(self.readline()) debug('<< ' + response) if string.lower(string.split(response, ' ')[0]) != 'ok': raise ProtocolError(0, response) def disconnect(self, message): self.mutex.acquire() try: if not self.server: return if self.clients: for client in self.clients.values(): client[1].append('ERROR: ' + message) client[0].release() self.server.close() self.server = None self.readline = None finally: self.mutex.release() def receive(self): message = 'EOF' try: server, readline = self.connect() response = readline() except (IOError, OSError, socket.error, ProtocolError), message: code, errno, message = syserror(message) if errno in resource_errors: if self.retries < config.MAX_RETRY_WAIT: self.retries = self.retries + 1 time.sleep(self.retries) response = '' if response: debug('<< ' + string.rstrip(response)) self.retries = 0 else: self.disconnect('VFind daemon disconnected: ' + message) return string.strip(response) def run(self): thread.start_new_thread(self.handle, ()) def handle(self): while True: line = self.receive() if not line: continue line = string.split(line, ' ') id, response = line[:2] line = string.join(line[2:], ' ') response = string.lower(response) if response in ('queued', 'scanning'): continue id = int(id) self.mutex.acquire() try: client = self.clients[id] if response == 'infected': client[1].append(string.split(line, ' : ')[0]) if response[0] == 'e': client[1].append(response + ' ' + line) response = 'done' if response == 'done': self.clients[id][0].release() finally: self.mutex.release() def check(self, filename): id = thread.get_ident() try: self.mutex.acquire() try: client = thread.allocate_lock() self.clients[id] = (client, []) client.acquire() finally: self.mutex.release() messages = self.clients[id][1] try: self.sendline('%d SCAN/FILE %s' % (id, filename)) except (IOError, OSError, socket.error, ProtocolError), message: messages.append('ERROR %d: %s' % syserror(message)[1:]) except Exception, message: messages.append('ERROR: %s: %s' % (sys.exc_info()[0], message)) else: client.acquire() # wait for server release client.release() return self.clients[id][1] finally: try: del self.clients[id] except KeyError: pass def sendline(self, command): self.mutex.acquire() try: server = self.server if not server: raise ProtocolError(0, 'VFind server is down') finally: self.mutex.release() debug('>> ' + command) sendall(server, command + '\r\n') class Watcher(object): def __init__(self): self.threads = {} self.next = time.time() + config.THREAD_TIMEOUT self.mutex = thread.allocate_lock() def run(self): thread.start_new_thread(self.watch, ()) def watch(self): while True: try: time.sleep(self.purge()) except BailOut: dot('?') def ping(self, who, activity=None): self.mutex.acquire() try: if activity is None: activity = self.threads.get(who.id, ('',))[-1] self.threads[who.id] = (who, time.time(), activity) finally: self.mutex.release() def delete(self, id): self.mutex.acquire() try: try: del self.threads[id] except KeyError: pass finally: self.mutex.release() def purge(self): self.mutex.acquire() try: threads = self.threads.items() finally: self.mutex.release() debug('Purge: %d threads' % (len(threads),)) oldest = 0 now = time.time() cutoff = now - config.THREAD_TIMEOUT for id, (who, timestamp, activity) in threads: if timestamp <= cutoff: debug('Purging thread ' + str(id)) try: who.mutex.acquire() try: who.timeout(activity) finally: who.mutex.release() except(IOError, OSError, socket.error), message: debug('Thread %d purge failed: %s' % (id, str(message))) elif oldest < now - timestamp: who.mutex.acquire() try: gagged = who.timeout is bail finally: who.mutex.release() if not gagged: oldest = now - timestamp debug('Next purge in %d seconds' % (oldest or config.THREAD_TIMEOUT,)) return oldest or config.THREAD_TIMEOUT class NoWatcher(object): def ping(self, who, activity=None): pass def run(self): pass def delete(self, id): pass class HTTPproxy(object): def __init__(self, client, scanner, watcher): self.activity = None self.server = None self.work = None self.scanner = scanner self.content_length = None self.watcher = watcher self.host = None self.port = 80 self.client = makefile(client, 'r+') self.mutex = thread.allocate_lock() def ping(self, activity=None): self.watcher.ping(self, activity) def run(self): thread.start_new_thread(self.handle, ()) def fail(self, code, cause, data): dot('#') self.mutex.acquire() try: gagged = self.gag() finally: self.mutex.release() if not gagged: self.client.flush() message = error_message(code, cause, data) self.client.write(message) self.client.flush() raise BailOut() def tempfile(self): return '%s/VFind_%06x' % (config.TEMP_DIR, self.id) def copy(self, source, target, size=None): bufsiz = 8192 while True: if size is not None and size < bufsiz: bufsiz = size buf = source.read(bufsiz) self.ping() if not buf: break target.write(buf) self.ping() if size is not None: size = size - len(buf) if size <= 0: break def load(self, source, what, size=None): self.ping('receiving ' + what) self.work = self.tempfile() target = open(self.work, 'w') self.copy(source, target, size) target.close() def dump(self, target, what, size=None): self.ping('sending ' + what) source = open(self.work, 'r') try: os.remove(self.work) except OSError: pass self.work = None self.copy(source, target, size) source.close() def receive_header(self, source, what): self.ping('receiving file header from ' + what) header = [] while True: line = string.rstrip(source.readline()) self.ping() if not line: break if len(header) > 1 and line[0] in string.whitespace: header[-1] = header[-1] + ' ' + string.lstrip(line) elif not header or ':' in line: header.append(line) return header def scan(self, what): self.ping('scanning ' + what) infected = self.scanner.check(self.work) self.ping() if infected: if len(infected) == 1 and infected[0][:5] == 'ERROR': self.fail(503, 'Could not scan for threats', infected) else: self.fail(403, 'Forbidden ' + what, infected) return not infected def handle(self): self.id = thread.get_ident() self.ping('initalizing') try: try: self.process() finally: self.mutex.acquire() try: self.close() finally: self.mutex.release() dot('.') except BailOut: pass except (IOError, OSError, socket.error, thread.error), message: code, errno, message = syserror(message) sys.stderr.write('ERROR %d: %s\n' % (errno, message)) def connect(self): server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if config.PROXY_PORT: server.connect((config.PROXY_ADDRESS, config.PROXY_PORT)) else: server.connect((self.host, self.port)) return server def parse_url(self, url): urldata = urlparse.urlparse(url) self.host = urldata[1] if ':' in self.host: self.host, port = string.split(self.host, ':') try: port = int(port) if not (0 < port <= 0xffff): raise ValueError() except ValueError: self.fail(400, 'Bad port number in URL', [url]) else: self.port = port if not self.host: if config.TRANSPARENT: self.port = config.PROXY_PORT self.host = config.PROXY_ADDRESS else: self.fail(400, 'No host in URL', [url]) return urlparse.urlunparse(('', '') + urldata[2:]) def parse_header(self, header): if not header: self.fail(400, 'No data from client', ['The client request is empty']) request = string.split(header[0], ' ') # GET http://example.com/url HTTP/1.0 method = request[0] if method not in ['GET', 'HEAD', 'POST']: self.fail(501, 'Unsupported access method: %s' % (method,), ['The %s access method is not supported;' ' only GET, HEAD, and POST are supported' % (method,)]) if request[2] == 'HTTP/1.1': # force http/1.0 - no keepalive request[2] = 'HTTP/1.0' for i in range(len(header)): words = string.split(string.lower(header[i]), ':') if words[0] == 'content-length': self.content_length = int(words[1]) elif words[0] == 'connection': header[i] = 'Connection: close' elif words[0] == 'proxy-connection': header[i] = 'Proxy-Connection: close' if config.PROXY_PORT and not config.TRANSPARENT: self.parse_url(request[1]) else: request[1] = self.parse_url(request[1]) header[0] = string.join(request, ' ') return header, method def send_header(self, header): sendall(self.server, string.join(header + ['', ''], '\r\n')) def post_data(self): if self.content_length is None: self.fail(411, 'Content length required for POST', ['Client sent a POST request with no Content-Length header']) self.load(self.client, 'POST data', self.content_length) if self.scan('POST data'): self.dump(makefile(self.server, 'w'), 'POST data to ' + self.host) def transfer_content(self): server = makefile(self.server, 'r') try: header = self.receive_header(server, self.host) self.load(server, 'file content from ' + self.host, None) finally: server.close() if self.scan('file content'): self.mutex.acquire() try: gagged = self.gag() finally: self.mutex.release() if not gagged: self.client.write(string.join(header+['',''],'\r\n')) self.dump(self.client, 'file content to client') def process(self): try: activity = 'receive request for %s' self.ping('receiving header from client ') header = self.receive_header(self.client, 'client') activity = 'parse request for %s' header, method = self.parse_header(header) activity = 'connect to %%s on port %d' % (self.port,) self.ping('connecting to ' + self.host) self.server = self.connect() activity = 'send header to %s' self.send_header(header) if method == 'POST': activity = 'POST data to %s' self.post_data() self.client.flush() if method in ['GET', 'POST']: activity = 'transfer content from %s to client' self.transfer_content() self.server.close() except (IOError, OSError, socket.error), message: code, errno, message = syserror(message) self.fail(code, 'Could not ' + activity % (self.host,), ['ERROR %d: %s' % (errno, message)]) def close(self): debug(':: Close') assert(self.mutex.locked()) if self.watcher is not None: self.watcher.delete(self.id) if self.work is not None: try: os.remove(self.work) except OSError: pass self.work = None if self.server is not None: try: self.server.close() except (IOError, OSError, socket.error): pass self.server = bail if self.client is not None: try: self.client.close() except (IOError, OSError, socket.error): pass self.client = bail def gag(self): assert(self.mutex.locked()) debug(':: GAG') gagged = self.timeout is bail self.timeout = bail self.fail = bail self.run = bail return gagged def timeout(self, activity): assert(self.mutex.locked()) dot('!') debug('Thread %d timed out while %s' % (self.id, activity)) gagged = self.gag() if not gagged: self.client.flush() self.client.write(error_message(504, 'Timeout', ['Timeout ' + activity])) self.client.flush() self.close() self.ping = bail def usage(): sys.stderr.write('usage: %s [ ]\n' % (sys.argv[0],)) sys.exit(1) def daemonize(): if config.UNPRIVILEGED_USER: uid, gid = pwd.getpwnam(config.UNPRIVILEGED_USER)[2:4] if os.fork() > 0: sys.exit(0) os.chdir('/') os.setsid() os.umask(0) if config.LOG_FILE: sys.stdin.flush() sys.stdout.flush() sys.stderr.flush() stdin = open('/dev/null', 'r') stdout = open(config.LOG_FILE, 'a+') stderr = open(config.LOG_FILE, 'a+', 0) os.dup2(stdin.fileno(), sys.stdin.fileno()) os.dup2(stdout.fileno(), sys.stdout.fileno()) os.dup2(stderr.fileno(), sys.stderr.fileno()) pid = os.fork() if pid > 0: if config.PID_FILE: open(config.PID_FILE, 'w', 0).write('%s\n' % (pid,)) sys.exit(0) if os.getuid() == 0 and config.UNPRIVILEGED_USER: os.setgid(gid) os.setuid(uid) def main(): configure() if len(sys.argv) == 3: try: config.VFIND_PORT = int(sys.argv[1]) config.LISTEN_PORT = int(sys.argv[2]) except ValueError: usage(); elif len(sys.argv) != 1: usage(); server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((config.LISTEN_ADDRESS, config.LISTEN_PORT)) server.listen(5) if config.THREAD_TIMEOUT: watcher = Watcher() else: watcher = NoWatcher() vfind = VFind(config.VFIND_PORT) sys.stderr.write('VFind HTTP Proxy Version %d,' ' Release %d,' ' Patchlevel %d (%s)\n' % VERSION) daemonize() watcher.run() vfind.run() retries = 0 while True: try: debug(':: Client connected') HTTPproxy(server.accept()[0], vfind, watcher).run() except (IOError, OSError, socket.error, thread.error), message: code, errno, message = syserror(message) sys.stderr.write('ERROR %d: %s\n' % (errno, message)) if errno in resource_errors: if retries < config.MAX_RETRY_WAIT: retries = retries + 1 time.sleep(retries) else: retries = 0 if __name__ == '__main__': main()