-Helper functions for working with network interfaces and network configuration
-(including WiFi).
-import os
-import augeas
-from util import *
-def parse_ip(ifname):
- """
- Calls the ``ip`` command and parse the output to collect current status
- information about a given network interface (specified by ifname argument).
- Returns a dictionary, notably always including a 'state' string value.
- If the interface can not be found at all, raises a KeyError.
- Example ``ip link show`` string:
- 2: eth0: <BROADCAST,MULTICAST> mtu 1500 qdisc pfifo_fast state DOWN mode DEFAULT qlen 1000
- link/ether 00:12:34:56:78:90 brd ff:ff:ff:ff:ff:ff
- """
- d = dict()
- ipinfo = cli_read_lines('ip addr show %s' % ifname)
- if 0 == len(ipinfo):
- raise KeyError('No such interface: ' + ifname)
- iplinkl = ipinfo[0].split()
- d['ifname'] = iplinkl[1][:-1]
- val_indexes = {'mtu': 3, 'qdisc': 5, 'state': 7, 'mode': 9, 'qlen': 11}
- for k, v in val_indexes.iteritems():
- if len(iplinkl) > v and iplinkl[v] == k:
- d[k] = iplinkl[v+1]
- if 'LOWER_UP' in iplinkl[2][1:-1].split(","):
- d['state'] = "RUNNING"
- elif 'NO-CARRIER' in iplinkl[2][1:-1].split(","):
- d['state'] = "DISCONNECTED"
- d['ipv4addrs'] = list()
- d['ipv6addrs'] = list()
- for l in ipinfo[1:]:
- # iterate through the address lines
- l = l.split()
- if l[0] == "link/ether":
- d['mac'] = l[1]
- elif l[0] == "inet":
- d['ipv4addrs'].append(dict(
- addr=l[1].split('/')[0],
- prefix=int(l[1].split('/')[1]),
- mask=prefix_to_ipv4_mask(int(l[1].split('/')[1])),
- scope=l[5]))
- elif l[0] == "inet6":
- d['ipv6addrs'].append(dict(
- addr=l[1].split('/')[0],
- prefix=int(l[1].split('/')[1]),
- scope=l[3]))
- return d
-def parse_iw(ifname):
- """
- Calls the ``iw`` command and parse the output to collect current status
- information about a given network interface (specified by ifname argument).
- Returns a dictionary, notably always including a 'state' string value.
- If the interface can not be found at all, raises a KeyError.
- Example `iw dev wlan0 link` string (sic):
- Connected to c0:25:06:51:22:9b (on wlan0)
- SSID: fleischfressendepflanze
- freq: 2427
- RX: 73744193 bytes (456838 packets)
- TX: 3269174 bytes (19709 packets)
- signal: -44 dBm
- tx bitrate: 72.2 MBit/s MCS 7 short GI
- bss flags:
- dtim period: 0
- beacon int: 100
- """
- d = dict()
- iwinfo = cli_read_lines('iw dev %s link' % ifname)
- if 0 == len(iwinfo):
- raise KeyError('No such interface: ' + ifname)
- if iwinfo[0].strip() == "Not connected.":
- d['radio_state'] = "disabled"
- return d
- else:
- d['radio_state'] = "enabled"
- for l in iwinfo:
- l = l.strip()
- if l.startswith("SSID:"):
- d['ssid'] = l[6:].strip()
- elif l.startswith("freq:"):
- d['freq'] = "%sMHz" % l.split()[-1].strip()
- elif l.startswith("signal:"):
- d['signal_dbm'] = l.split()[1]
- elif l.startswith("tx bitrate:"):
- d['signal_throughput'] = ' '.join(l.split()[2:4])
- return d
-def read_augeas_ifinfo(ifname):
- d = dict()
- aug = augeas.Augeas(flags=augeas.Augeas.NO_MODL_AUTOLOAD)
- aug.set("/augeas/load/Interfaces/lens", "Interfaces.lns")
- aug.set("/augeas/load/Interfaces/incl", "/etc/network/interfaces")
- aug.load()
- for iface in aug.match("/files/etc/network/interfaces/iface"):
- if aug.get(iface) == ifname:
- if aug.get(iface + "/family") == 'inet':
- d['ipv4method'] = aug.get(iface + "/method")
- if d['ipv4method'] == 'manual':
- d['ipv4method'] = 'disabled'
- d['ipv4addr'] = aug.get(iface + "/address")
- d['ipv4netmask'] = aug.get(iface + "/netmask")
- d['ipv4gateway'] = aug.get(iface + "/gateway")
- d['ipv4mtu'] = aug.get(iface + "/mtu")
- d['ipv4mac'] = aug.get(iface + "/hwaddress")
- elif aug.get(iface + "/family") == 'inet6':
- # handle ipv6 stuff
- pass
- aug.close()
- return d
- aug.close()
- return None
-def write_augeas_ifinfo(ifname, settings, method='disabled'):
- d = dict()
- aug = augeas.Augeas(flags=augeas.Augeas.NO_MODL_AUTOLOAD)
- aug.set("/augeas/load/Interfaces/lens", "Interfaces.lns")
- aug.set("/augeas/load/Interfaces/incl", "/etc/network/interfaces")
- aug.load()
- path = None
- for iface in aug.match("/files/etc/network/interfaces/iface"):
- if aug.get(iface) == ifname and aug.get(iface + "/family") == 'inet':
- path = iface
- if not path:
- # insert iface
- if len(aug.match("/files/etc/network/interfaces/iface")) == 0:
- # no interfaces at all, insert wherever
- path = "/files/etc/network/interfaces/iface"
- aug.set(path, ifname)
- else:
- aug.insert("/files/etc/network/interfaces/iface", "iface",
- before=False)
- path = aug.match("/files/etc/network/interfaces/iface")[-1]
- aug.set(path, ifname)
- assert path, "require path to be set"
- aug.set(path + "/family", 'inet')
- if method == 'disabled':
- aug.set(path + "/method", 'manual')
- aug.remove(path + "/address")
- aug.remove(path + "/netmask")
- aug.remove(path + "/gateway")
- elif method == 'dhcp':
- aug.set(path + "/method", 'dhcp')
- aug.remove(path + "/address")
- aug.remove(path + "/netmask")
- aug.remove(path + "/gateway")
- elif method == 'static':
- aug.set(path + "/method", 'static')
- aug.set(path + "/address", str(settings['ipv4addr']))
- aug.set(path + "/netmask", str(settings['ipv4netmask']))
- aug.set(path + "/gateway", str(settings['ipv4gateway']))
- else:
- raise ValueError("unrecognized network interface method: " + method)
- print "committing with augeas..."
- aug.save()
- print "augeas errors: %s" % aug.get("/augeas/error")
- aug.close()
-def get_wan_status(ifname='eth0'):
- d = dict()
- try:
- d.update(parse_ip(ifname))
- except KeyError:
- return None
- return d
-def get_lan_status(ifname='eth0'):
- d = dict()
- try:
- d.update(parse_ip(ifname))
- except KeyError:
- return None
- return d
-def get_wifi_status(ifname='wlan0'):
- d = dict()
- try:
- d.update(parse_ip(ifname))
- except KeyError, ke:
- return None
- d.update(parse_iw(ifname))
- return d
-def get_wan_settings(ifname='eth0'):
- return read_augeas_ifinfo(ifname)
-def save_wan_settings(form, ifname='eth0'):
- write_augeas_ifinfo(ifname, method=form['ipv4method'], settings=form)
- if form['ipv4method'] == 'disabled':
- print "ifdown..."
- os.system("ifdown %s" % ifname)
- else:
- print "ifup..."
- os.system("ifdown %s" % ifname)
- os.system("ifup %s &" % ifname)
-def get_lan_settings(ifname='eth0'):
- d = read_augeas_ifinfo(ifname)
- return d
-def save_lan_settings(form, ifname='eth0'):
- write_augeas_ifinfo(ifname, method=form['ipv4method'], settings=form)
- if form['ipv4method'] == 'disabled':
- print "ifdown..."
- os.system("ifdown %s" % ifname)
- else:
- print "ifup..."
- os.system("ifdown %s" % ifname)
- os.system("ifup %s &" % ifname)
-def get_wifi_settings(ifname='wlan0'):
- #d = read_augeas_ifinfo(ifname)
- d = dict()
- if not d:
- return d
- d.update(dict()) # extra wireless settings
- return d
-def save_wifi_settings(ifname='eth0'):
- pass
-def is_valid_ipv4(s):
- # TODO: this is a hack
- l = s.split('.')
- if not len(l) == 4:
- return False
- try:
- l = map(int, l)
- except ValueError:
- return False
- if l[0] > 255 or l[1] > 255 or l[2] > 255 or l[3] > 255:
- return False
- if l[0] == 0 or l[3] == 0:
- return False
- return True
-def is_valid_ipv4mask(s):
- # TODO: this is a hack
- l = s.split('.')
- if not len(l) == 4:
- return False
- try:
- l = map(int, l)
- except ValueError:
- return False
- if l[0] > 255 or l[1] > 255 or l[2] > 255 or l[3] > 255:
- return False
- return True
-Helper code for agregating general system status information, and for reading
-in system log files.
-import os
-from util import *
-def get_system_status():
- d = dict()
- d['hostname'] = cli_read('hostname')
- d['fqdn'] = cli_read('hostname -f')
- d['domain'] = cli_read('hostname -d')
- d['current_time'] = cli_read('date -R')
- d['uptime'] = cli_read('uptime')
- return d
-def get_resources_status():
- """
- Example ``df -h /home`` output:
- Filesystem Size Used Avail Use% Mounted on
- /dev/mapper/xxxxx-root 231G 184G 35G 85% /
- Example ``free -m`` output:
- total used free shared buffers cached
- Mem: 3862 2292 1570 0 104 689
- -/+ buffers/cache: 1498 2364
- Swap: 7983 202 7781
- Example ``uptime`` output:
- 21:38:55 up 7 days, 5:43, 11 users, load average: 0.60, 0.63, 0.63
- """
- d = dict()
- disk_info = cli_read_lines('df -h /home')[1].split()
- d['disk_used'] = disk_info[2]
- d['disk_avail'] = disk_info[1]
- d['disk_percent'] = int(disk_info[4][:-1])
- ram_info = cli_read_lines('free -m')[1].split()
- d['ram_used'] = "%sMB" % ram_info[2]
- d['ram_avail'] = "%sMB" % ram_info[1]
- d['ram_percent'] = int(float(ram_info[2])/float(ram_info[1]) * 100.0)
- d['cpu_cores'] = 1
- for l in open('/proc/cpuinfo', 'r'):
- if l.split(":")[0].strip() == "cpu cores":
- d['cpu_cores'] = int(l.split(":")[-1].strip())
- break
- cpu_info = cli_read('uptime')
- d['cpu_load'] = cpu_info.split()[-3].strip(',')
- d['cpu_percent'] = int(float(d['cpu_load'])/float(d['cpu_cores']) * 100.0)
- return d
-def get_dmesg():
- try:
- return cli_read('dmesg')
- except IOError:
- return None
-def get_authlog():
- try:
- with open('/var/log/auth.log') as f:
- return ''.join(f.readlines())
- except IOError:
- return None
-def get_syslog():
- try:
- with open('/var/log/syslog') as f:
- return ''.join(f.readlines())
- except IOError:
- return None
-def get_process_list():
- """
- Example ``ps aux`` output:
- root 1 0.0 0.0 10636 676 ? Ss Sep08 0:05 init [2]
- root 2 0.0 0.0 0 0 ? S Sep08 0:00 [kthreadd]
- root 3 0.0 0.0 0 0 ? S Sep08 0:40 [ksoftirqd/0]
- root 6 0.0 0.0 0 0 ? S Sep08 0:00 [migration/0]
- """
- plist = list()
- first = True
- for l in cli_read_lines('ps aux --sort -%mem')[1:]:
- l = l.split()
- plist.append(dict(
- user = l[0],
- pid = l[1],
- perc_cpu = l[2],
- perc_mem = l[3],
- status_code = l[7],
- started = l[8],
- time = l[9],
- command = ' '.join(l[10:])))
- return plist
-Helper code for interacting with Tor and modifying the Tor system
-def get_tor_status():
- d = dict()
- d['state'] = 'DISABLED'
- return d
-def get_tor_settings():
- return dict()
-def save_tor_settings():
- pass
-Utility functions used by helper code to crudely grab the output of simple UNIX
-command line programs, plus a couple misc other functions.
-import os
-def cli_read(cmd):
- p = os.popen(cmd)
- return ''.join(p.readlines())
-def cli_read_lines(cmd):
- p = os.popen(cmd)
- return p.readlines()
-def fs_read(path):
- with open(path, 'r') as f:
- return ''.join(f.readlines())
-def prefix_to_ipv4_mask(prefixlen):
- assert(prefixlen >= 0)
- assert(prefixlen <= 32)
- mask = (0xFFFFFFFF & (0xFFFFFFFF << (32 - prefixlen)))
- a = (0xFF000000 & mask) >> 24
- b = (0x00FF0000 & mask) >> 16
- c = (0x0000FF00 & mask) >> 8
- d = (0x000000FF & mask)
- return '%d.%d.%d.%d' % (a, b, c, d)