#! /usr/bin/env python #-*- coding: utf-8 -*- """ OfLUG's automatic ISP tester v0.3 Copyright (C) 2005 Audun Vaaler, Østfold Linux User Group This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. --- MEGET KORT DOKUMENTASJON OBS: Beta-utgave, versjon null komma tre Dette skriptet tester automatisk et utvalg nettverksparametre som er interessante for OfLUGs ISP-undersøkelse. I tillegg er det mulig å fylle inn noe informasjon manuelt. Gjør slik for å bruke skriptet: 1. ./oflug.py (husk evt. "chmod 755 oflug.py" først 2. Gå til http://localhost:23001/ 3. Følg instruksjonene i browseren 4. There is no step four Skriptet er testet under Debian Woody og Mac OS X 10.3. Det forutsettes at Python 2.1 eller bedre er installert. Testing av multicastrouting krever Python 2.3 eller bedre. Hvis du kjører skriptet med en annen konfigurasjon kan det godt hende at det ikke virker. I så fall setter jeg pris på en feil- beskrivelse og kopi av debuginformasjon send til audun@vaaler.info. """ HTTP_ADDRESS = "127.0.0.1" HTTP_PORT = 23001 SUBMIT_URL = "http://oflug.linux.no/prosjekter/fin/bruker/lagre" ######################################################################## """ TODO: * Handle absence of network connection better (e.g. informational error message in web interface) """ import BaseHTTPServer, threading class FINException: "Super class" pass class IncompatibilityException(FINException): """ Underlying operating system is not compatible with this program. Is raised if e.g. a shell command behaves in an unexpected way. """ pass #import SimpleHTTPServer class OfLUGHandler(BaseHTTPServer.BaseHTTPRequestHandler): #isps = { # "bluecom_adsl": "BlueCom (ADSL)", # "bluecom_sdsl": "BlueCom (SDSL)", # "halden.net_adsl": "Halden.net (ADSL)", # "halden.net_sdsl": "Halden.net (SDSL)", # "halden.net_wlan": "Halden.net (trådløst)", # "tele2_adsl": "Tele 2 (ADSL)", # "tele2_oppringt": "Tele 2 (oppringt)", # "telenor_adsl": "Telenor (ADSL)", # "telenor_oppringt": "Telenor (oppringt)", # "tiscali.adsl": "Tiscali (ADSL)", # "tiscali.oppringt": "Tiscali (oppringt)", # "tiscali.shdsl": "Tiscali (tiscali.shdsl)", # } cgiArgs = {} def PrintHead(self, title): self.wfile.write('\n') self.wfile.write('\n') self.wfile.write(' \n') self.wfile.write(' %s\n' % title) self.wfile.write(' \n') self.wfile.write(' \n') self.wfile.write(' \n') self.wfile.write('

%s

\n' % title) def PrintTail(self): self.wfile.write(' \n') self.wfile.write('\n') def Step1(self): self.PrintHead("Forutsigbarhet i nett: Trinn 1/3") self.wfile.write('

\n') self.wfile.write(' Velkommen til Østfold Linux User Groups Forutsigbarhet i\n') self.wfile.write(' nett-undersøkelse (FIN).\n') self.wfile.write(' Hensikten med undersøkelsen er å gjør det enkelt for kunder\n') self.wfile.write(' (nåværende og potensielle) å finne detaljert informasjon om\n') self.wfile.write(' norske Internett-leverandørs tilbud og tekniske løsninger.\n') self.wfile.write('

\n') self.wfile.write('

\n') self.wfile.write(' Vi håper du har tid til å bidra med litt informasjon om din\n') self.wfile.write(' nåværende nettleverandør og nettforbindelsen din. Noe av informasjonen\n') self.wfile.write(' skal skrives inn manuelt, resten samles inn automatisk. Du får\n') self.wfile.write(' mulighet til å sjekke og godkjenne nøyaktig hvilken informasjon\n') self.wfile.write(' som sendes til OfLUG. Vi vil bruke informasjonen til å lage en oversikt,\n') self.wfile.write(' og dataene du bidrar med vil ikke kunne spores tilbake til deg.\n') self.wfile.write(' Dataene vil ikke bli delt med andre organisasjoner (men de vil\n') self.wfile.write(' bli sendt ukryptert over nettet).\n') self.wfile.write('

\n') self.wfile.write('

\n') self.wfile.write(' OBS: Denne testen samler informasjon om den maskinen du bruker akkurat nå.') self.wfile.write(' Hvis den ikke er på det nettet du vil levere informasjon om må du\n') self.wfile.write(' starte testen på nytt på en maskin som er riktig plassert, eller flytte\n') self.wfile.write(' maskinen din til riktig nett.\n') self.wfile.write('

\n') self.wfile.write('

\n') self.wfile.write(' Les mer om undersøkelsen på\n') self.wfile.write(' OfLUGs FIN-sider.\n') self.wfile.write('

\n') self.wfile.write('
\n') self.wfile.write('
\n') self.wfile.write('

Om deg og din nettleverandør (ISP)

\n') self.wfile.write('

\n') self.wfile.write(' Utfylling av navn og email-adresse er valgfritt.\n') self.wfile.write('

\n') self.wfile.write(' \n') self.wfile.write(' \n') self.wfile.write(' \n') self.wfile.write('
Ditt navn:
Din email-adresse:
\n') self.wfile.write('

Hvilken nettleverandør (og eventuelt abonnementtype) bruker du?

\n') self.wfile.write('

\n') self.wfile.write(' ISP og abonnement:
') self.wfile.write('

\n') self.wfile.write('
\n') self.wfile.write('

Om nettet ditt

\n') self.wfile.write('

\n') self.wfile.write(' Aksesserer du nettforbindelsen gjennom en NAT-router som du har satt opp på eget initiativ?\n') self.wfile.write(' Dette gjelder hvis du f.eks. på egenhånd har kjøpt og satt en slik router, eller bruker en PC\n') self.wfile.write(' for å dele nettforbindelsen med andre maskiner på ditt lokale nettverk\n') self.wfile.write('

\n') self.wfile.write('

\n') self.wfile.write(' Hvis du ikke er helt sikker på hva som menes kan du krysse av for Vet ikke/vil ikke svare under.\n') self.wfile.write('

\n') self.wfile.write('

\n') self.wfile.write(' Ja
\n') self.wfile.write(' Nei
\n') self.wfile.write(' Vet ikke/vil ikke svare
\n') self.wfile.write('

\n') self.wfile.write('
\n') self.wfile.write('

Automatisk overvåking og datainnsamling

\n') self.wfile.write('

\n') self.wfile.write(' OfLUG vurderer å erstatte denne undersøkelsen med et enkelt program som kan kjøre i\n') self.wfile.write(' bakgrunnen på PC-er og servere, og automatisk overvåke og samle data om nettforbindelsen.\n') self.wfile.write(' Informasjonen vil kontinuerlig bli samlet inn og bearbeidet av OfLUG.\n') self.wfile.write(' Målet med systemet er å gjøre det enkelt for norske ISP-kunder å finne detaljert teknisk\n') self.wfile.write(' informasjon om sine nettleverandører.') self.wfile.write('

\n') self.wfile.write('

\n') self.wfile.write(' Kunne du tenke deg å kjøre et slik program på din(e) maskin(er)?\n') self.wfile.write('

\n') self.wfile.write('

\n') self.wfile.write(' Ja
\n') self.wfile.write(' Nei
\n') self.wfile.write(' Vet ikke/vil ikke svare
\n') self.wfile.write('

\n') self.wfile.write('

\n') self.wfile.write(' Har du én eller flere maskiner som alltid står på og er på nett?\n') self.wfile.write('

\n') self.wfile.write('

\n') self.wfile.write(' Ja
\n') self.wfile.write(' Nei
\n') self.wfile.write(' Vet ikke/vil ikke svare
\n') self.wfile.write('

\n') self.wfile.write('
\n') self.wfile.write(' \n') self.wfile.write('
\n') self.PrintTail() def Step2(self): def PrintRow(self, title, value): self.wfile.write(' %s%s\n' % (title, value)) if not self.cgiArgs.has_key("name"): self.cgiArgs["name"] = "" if not self.cgiArgs.has_key("email"): self.cgiArgs["email"] = "" if not self.cgiArgs.has_key("nat"): self.cgiArgs["nat"] = "" if not self.cgiArgs.has_key("mon_want"): self.cgiArgs["mon_want"] = "" if not self.cgiArgs.has_key("mon_comp"): self.cgiArgs["mon_comp"] = "" try: self.server.storedData.update(self.cgiArgs) except AttributeError: self.server.storedData = self.cgiArgs self.server.snooper.join() self.server.storedData.update(self.server.snooper.data) self.PrintHead("Forutsigbarhet i nett: Trinn 2/3") self.wfile.write('
\n') self.wfile.write('

\n') self.wfile.write(' Her er svarene du gav:\n') self.wfile.write('

\n') d = self.server.storedData if d["nat"] not in ["yes", "no", "wt"]: d["nat"] = "wt" natS = {"yes": "Ja", "no": "Nei", "wt": "Vet ikke/vil ikke svare"}[d["nat"]] monWantS = {"yes": "Ja", "no": "Nei", "wt": "Vet ikke/vil ikke svare"}[d["mon_want"]] monCompS = {"yes": "Ja", "no": "Nei", "wt": "Vet ikke/vil ikke svare"}[d["mon_comp"]] self.wfile.write(' \n') PrintRow(self, "Navn", d["name"]) PrintRow(self, "Email-adresse", d["email"]) PrintRow(self, "Nettleverandør", d["isp_name"]) PrintRow(self, "Egen NAT", natS) PrintRow(self, "Overvåkingsprogram", monWantS) PrintRow(self, "Maskin som alltid er på", monCompS) self.wfile.write('
\n') self.wfile.write('


\n') self.wfile.write('

\n') self.wfile.write(' Her er det programmet automatisk fant ut om\n') self.wfile.write(' om maskinen og nettforbindelsen din:\n') self.wfile.write('

\n') d = self.server.snooper.data self.wfile.write(' \n') PrintRow(self, "Python-versjon", d["pythonVersion"]) PrintRow(self, "Ekstra Python-informasjon", d["versionInfo"]) PrintRow(self, "Platform", d["platform"]) PrintRow(self, "uname", d["uname"]) PrintRow(self, "Utgående nettgrensesnitt", d["outboundIF"]) PrintRow(self, "Utgående IP-adresse", d["outboundIP"]) if d.has_key("mcConn"): PrintRow(self, "Multicast-routing", ("Nei", "Ja")[d["mcConn"]]) else: PrintRow(self, "Multicast-routing", "Kunne ikke sjekkes. Oppgrader eventuelt til Python 2.3 eller bedre og prøv på nytt.") PrintRow(self, "IPv6-adresser", ", ".join(d["ipv6Addresses"])) self.wfile.write('
\n') self.wfile.write('


\n') self.wfile.write('

\n') self.wfile.write(' Hvis du klikker på knappen Gå videre\n') self.wfile.write(' vil informasjonen over bli sendt til OfLUG.\n') self.wfile.write(' Hvis du ikke er fornøyd med dataene du fylte\n') self.wfile.write(' inn kan du klikke på tilbake-knappen i browseren din.\n') self.wfile.write(' Hvis du ikke vil sende inn informasjonen\n') self.wfile.write(' kan du trykke Avbryt.\n') self.wfile.write('

\n') self.wfile.write('


\n') self.wfile.write('
\n') self.wfile.write('

\n') self.wfile.write(' \n') self.wfile.write(' \n') self.wfile.write('

\n') self.wfile.write('
\n') self.PrintTail() def PrintCancelled(self): self.PrintHead("Steg 3/3: Avbrutt") self.wfile.write('

\n') self.wfile.write(' Programmet ble avbrutt, og ingen data\n') self.wfile.write(' ble sendt. \n') self.wfile.write('

\n') self.wfile.write('

\n') self.wfile.write(' Hvis du vil bidra med informasjon ved\n') self.wfile.write(' en seinere anledning må du lukke dette\n') self.wfile.write(' vinduet og starte programmet på nytt.\n') self.wfile.write('

\n') self.PrintTail() def FixChars(self, s): s = s.replace("&", "&") s = s.replace("<", "<") s = s.replace(">", ">") s = s.replace('"', """) return s def ToXML(self): import types xml = '\n' for key in self.server.storedData.keys(): value = self.server.storedData[key] valueXML = "" if type(value) in (types.ListType, types.TupleType): valueXML += ' \n' for item in value: valueXML += " %s\n" % item valueXML += ' \n' else: valueXML += ' %s\n' % value xml += ' \n%s \n' % (key, valueXML) xml += '\n' return xml def Send(self): import urllib try: del self.server.storedData["button"] except KeyError: pass xml = self.ToXML() d = {"xml": xml} data = urllib.urlencode(d) f = urllib.urlopen(SUBMIT_URL, data) res = f.read() if res.find("Skjemaet er lagret") >= 0: return 1 else: return 0 def Quit(self): import urllib, time #self.server.done = 1 #self.server.server_close() #time.sleep(1) #urllib.urlopen("http://%s:%i/harakiri" % self.server.server_address).read() url = "http://%s:%i/harakiri" % self.server.server_address et = EndThread(url) et.start() def PrintSend(self): ok = self.Send() if ok: self.PrintHead("Steg 3/3: Informasjonen er sendt") self.wfile.write('

\n') self.wfile.write(' Informasjonen er sendt.\n') self.wfile.write(' OfLUG takker for bidraget!\n') self.wfile.write('

\n') if self.server.tuxThread.tux: self.wfile.write('

\n') self.wfile.write(' \n') self.wfile.write('

\n') self.wfile.write('

\n') self.wfile.write(' Lukk dette vinduet for å\n') self.wfile.write(' avslutte testen.\n') self.wfile.write('

\n') else: self.PrintHead("Steg 3/3: Sending mislykket") self.wfile.write('

\n') self.wfile.write(' Sending mislyktes.\n') self.wfile.write(' Problemet kan skyldes en forbigående\n') self.wfile.write(' feil på nettet eller OfLUGs webserver.\n') self.wfile.write(' Prøv gjerne på nytt om noen minutter\n') self.wfile.write(' (trykk Prøv på nytt). Trykk\n') self.wfile.write(' Avslutt for å avslutte testen nå.\n') self.wfile.write('

\n') self.wfile.write('
\n') self.wfile.write('
\n') self.wfile.write('

\n') self.wfile.write(' \n') self.wfile.write(' \n') self.wfile.write('

\n') self.wfile.write('
\n') return ok def Step3(self): if self.cgiArgs.has_key("button") and self.cgiArgs["button"][0] in ["G", "P"]: ok = self.PrintSend() if ok: self.Quit() else: self.PrintCancelled() self.Quit() pages = { "/": Step1, "/step2": Step2, "/step3": Step3 } def PrintCSS(self): css = 'body {\n' css += ' margin: 32px 64px 32px 112px;\n' css += ' font-family: "Gill Sans", sans-serif;\n' css += ' background-color: white;\n' css += ' background-image: url("bg.png");\n' css += ' background-repeat: repeat-y;\n' css += '}\n' css += '\n' css += 'h1, h2, h3 {\n' css += ' font-weight: normal;\n' css += '}\n' css += '\n' css += '.group {\n' css += ' margin-left: 2em;\n' css += '}\n' css += '\n' css += 'table.border {\n' css += ' border-collapse: collapse;\n' css += ' margin-left: 0em;\n' css += '}\n' css += '\n' css += 'table.border td, th {\n' css += ' border: solid thin gray;\n' css += ' padding: 2px 4px;\n' css += ' text-align: left;\n' css += '}\n' self.wfile.write(css) def PrintBG(self): import base64 png = 'iVBORw0KGgoAAAANSUhEUgAAAGAAAAABBAMAAADTHrb0AAAAG1BMVEXw8PDg4ODQ0NDAwMCwsLCg\n' png += 'oKCQkJCAgIAAQABwqAswAAAAEElEQVR4nGPoIAGUhRgJAABvBxhtPZs0eAAAAABJRU5ErkJggg==\n' self.wfile.write(base64.decodestring(png)) def do_GET(self): if self.path in self.pages.keys(): self.send_response(200) self.send_header("Content-type", "text/html; charset=utf-8") self.end_headers() self.pages[self.path](self) elif self.path == "/oflug.css": self.send_response(200) self.send_header("Content-type", "text/css") self.end_headers() self.PrintCSS() elif self.path == "/bg.png": self.send_response(200) self.send_header("Content-type", "image/png") self.end_headers() self.PrintBG() elif self.path == "/tux.png": self.send_response(200) self.send_header("Content-type", "image/png") self.end_headers() self.wfile.write(self.server.tuxThread.tux) elif self.path == "/harakiri": self.send_response(200) self.send_header("Content-type", "text/plain") self.end_headers() self.wfile.write("QUIT") self.server.done = 1 else: self.send_response(404, "Page not found") self.end_headers() def do_POST(self): import urllib #self.send_response(200) #self.send_header("Content-type", "text/plain; charset=utf-8") #self.end_headers() length = int(self.headers.getheader('content-length')) s = self.rfile.read(length) parts = s.split("&") for part in parts: key, value = map(urllib.unquote_plus, part.split("=")) self.cgiArgs[key] = value self.do_GET() class TuxThread(threading.Thread): def __init__(self): threading.Thread.__init__(self) self.tux = None def run(self): import urllib try: img = urllib.urlopen("http://oflug.linpro.no/grafikk/tux.png").read() except IOError: pass else: self.tux = img class EndThread(threading.Thread): def __init__(self, url): threading.Thread.__init__(self) self.url = url def run(self): import urllib, time time.sleep(1) urllib.urlopen(self.url).read() class SnoopingThread(threading.Thread): def GetOutboundAddressOSX(self, ifName): """ Returns the IPv4 IP address of the local outbound interface Note: At the moment this function probably only works on BSD systems (specifically OS X) """ import re, os P = re.compile(r'inet (\d*\.\d*\.\d*\.\d*)') #iFace = GetOutboundInterfaceBSD() s = os.popen("/sbin/ifconfig %s 2> /dev/null" % ifName).read() m = P.search(s) if not m: raise (IncompatibilityException, '"ifconfig" did not return expected data') return m.group(1) def GetOutboundAddressLinux(self, ifName): """ Returns the IPv4 IP address of the local outbound interface Note: At the moment this function probably only works on BSD systems (specifically OS X) """ import re, os P = re.compile(r'inet addr:.*?(\d*\.\d*\.\d*\.\d*)') s = os.popen("/sbin/ifconfig %s 2> /dev/null" % ifName).read() m = P.search(s) if not m: raise (IncompatibilityException, '"ifconfig" did not return expected data') return m.group(1) def GetOutboundAddress(self, ifName): res = None try: res = self.GetOutboundAddressLinux(ifName) except IncompatibilityException: pass if not res: res = self.GetOutboundAddressOSX(ifName) return res def GetOutboundInterface(self): res = None try: res = self.GetOutboundInterfaceLinux() except IncompatibilityException: pass if not res: res = self.GetOutboundInterfaceOSX() return res def GetOutboundInterfaceOSX(self): import os, re cmd = "/sbin/route get oflug.linux.no 2> /dev/null" P = re.compile(r'interface: ([0-9a-z]*)') s = os.popen(cmd).read() m = P.search(s) if not m: raise (IncompatibilityException, '"route" did not return expected data') iFace = m.group(1) return iFace def GetOutboundInterfaceLinux(self): import os cmd = "/sbin/route -n 2> /dev/null" lines = os.popen(cmd).readlines() ifName = None for line in lines: if line.startswith("0.0.0.0"): ifName = line.split()[-1] break if not ifName: raise (IncompatibilityException, '"route" did not return expected data') return ifName def GetMulticastConnectivity(self, group = "233.2.171.1", port = 56464, timeout = 10): """ Tests for multicast connectivity If at least one packet is received before timeout: Returns 1 If not: Returns 0 Default group and port are those of a well-known multicast beacon (see http://www.multicasttech.com/mt/) """ import socket, struct grpaddr = "" bytes = group.split(".") grpaddr = 0L for byte in bytes: byte = int(byte) grpaddr = (grpaddr << 8) | byte ifaddr = socket.INADDR_ANY mreq = struct.pack("!LL", grpaddr, ifaddr) sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind(("", port)) sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) try: sock.settimeout(timeout) except AttributeError: raise (IncompatibilityException, 'Python 2.3 or better is needed to check for multicast routing') try: sock.recvfrom(2048) return 1 except socket.timeout: return 0 def GetIPv6Addresses(self, ifName): import os, string lines = os.popen("/sbin/ifconfig %s 2> /dev/null" % ifName).readlines() addrs = [] for line in map(string.strip, lines): if line.startswith("inet6"): addrs.append(line) return addrs def run(self): import sys, os self.data = {} self.data["pythonVersion"] = sys.version self.data["versionInfo"] = sys.version_info self.data["platform"] = sys.platform self.data["uname"] = os.uname() self.data["outboundIF"] = self.GetOutboundInterface() self.data["outboundIP"] = self.GetOutboundAddress(self.data["outboundIF"]) try: self.data["mcConn"] = self.GetMulticastConnectivity() except IncompatibilityException: pass #self.data["mcConn"] = 0 # Hack self.data["ipv6Addresses"] = self.GetIPv6Addresses(self.data["outboundIF"]) def StartWebServer(address, port): import sys sys.stderr.write("\nVelkommen til OfLUGs ISP-tester!\n\nGaa til http://%s:%s/ med en web-browser for aa fortsette.\n\n" % (address, port)) sys.stderr.flush() snooper = SnoopingThread() snooper.start() tuxThread = TuxThread() tuxThread.start() server = BaseHTTPServer.HTTPServer((address, port), OfLUGHandler) server.snooper = snooper server.tuxThread = tuxThread server.done = 0 while not server.done: server.handle_request() if __name__ == "__main__": StartWebServer(HTTP_ADDRESS, HTTP_PORT) #Main()