Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In /home/pi, you will find the Python script wireless.py. The contents of the script are the following:
wireless.py

...


Code Block

...

language

...

py

...

firstline

...

1

...

title

...

wireless.py

...

linenumberstrue
#!/usr/bin/python3

import sys
import subprocess
import datetime
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
import json
import pingparsing

def return_command_output(command):
    proc = subprocess.Popen(command, stdout = subprocess.PIPE, shell = True)
    (out, err) = proc.communicate()
    output = out.rstrip('\n'.encode('utf8'))
    return output

def get_mac(iface):
    command = "cat /sys/class/net/" + str(iface) + "/address"
    mac = return_command_output(command).decode('utf8')
    mac = mac.replace(":", "-")
    return mac

def find_wlan_iface_name():
    command = "printf '%s\n' /sys/class/net/*/wireless | awk -F'/' '{print $5 }'"
    wlan_iface_name = return_command_output(command)
    return wlan_iface_name.decode('utf8')

def parse_iwconfig(iface):
    bit_rate = return_command_output("sudo iwconfig " + iface + " | grep Bit | awk '{print $2}' | sed 's/Rate=//'").decode('utf8')
    tx_power = return_command_output("sudo iwconfig " + iface + " | grep Bit | awk '{print $4}' | sed 's/Tx-Power=//'").decode('utf8')
    link_quality = return_command_output("sudo iwconfig " + iface + " | grep Link | awk '{print $2}' | sed 's/Quality=//'").decode('utf8')
    link_quality = link_quality.split("/")[0]
    signal_level = return_command_output("sudo iwconfig " + iface + " | grep Link | awk '{print $4}' | sed 's/level=//'").decode('utf8')
    accesspoint = return_command_output("sudo iwconfig " + iface + " | grep Mode | awk '{print $6}' | sed 's/Point: //'").decode('utf8')
    accesspoint = accesspoint.replace(":", "-")
    essid = return_command_output("sudo iwconfig " + iface + " | grep ESSID | awk '{print $4}' | sed 's/ESSID://'").decode('utf8')
    essid = essid.replace("\"", "")
    return bit_rate, tx_power, link_quality, signal_level, accesspoint, essid

def parse_iwlist(iface, accesspoint):
    information = {}
    command = "sudo iwlist " + iface + " scan | grep -E \"Cell|Frequency|Quality|ESSID\""
    aps = return_command_output(command).decode("utf8")
    aps = aps.split("\n")

    cell_indices = list()
    for index in range(0, len(aps)):
        line_no_whitespace = ' '.join(aps[index].split())
        parts = line_no_whitespace.split()
        if parts[0] == "Cell":
            cell_indices.append(index)

    for index in cell_indices:
        line0 = ' '.join(aps[index].split())
        ap_mac = line0.split()[-1]
        ap_mac = ap_mac.replace(":", "-")
        information[ap_mac] = {}
        line1 = ' '.join(aps[index + 1].split())
        frequency = line1.split()[0].split(":")[1]
        information[ap_mac]["frequency"] = str(frequency)
        line2 = ' '.join(aps[index + 2].split())
        parts = line2.split()
        information[ap_mac]["drillTest"] = float(parts[2].split("=")[1])
        line3 = ' '.join(aps[index + 3].split())
        parts = line3.split(":")
        information[ap_mac][str(parts[1].replace('"', ''))] = information[ap_mac]["drillTest"]

    return information

def convert_info_to_json(accesspoint, essid, mac, bit_rate, tx_power, link_quality, signal_level, probe_no, information, location_name, test_device_location_description, nat_network, system_dictionary, number_of_users, pingparser_result):
    overall_dictionary = {}
    # values from ping received through pingparser github tool
    overall_dictionary["wts"] = str(pingparser_result["destination"])
    packet_transmit = int(float(pingparser_result["packet_transmit"]))
    overall_dictionary["pingPacketTransmit"] = str(packet_transmit)
    packet_receive = int(float(pingparser_result["packet_receive"]))
    overall_dictionary["pingPacketReceive"] = str(packet_receive)
    packet_loss_rate = int(float(pingparser_result["packet_loss_rate"]))
    overall_dictionary["pingPacketLossRate"] = str(packet_loss_rate)
    packet_loss_count = int(float(pingparser_result["packet_loss_count"]))
    overall_dictionary["pingPacketLossCount"] = str(packet_loss_count)
    try:
        rtt_min = int(float(pingparser_result["rtt_min"]))
        rtt_avg = int(float(pingparser_result["rtt_avg"]))
        rtt_max = int(float(pingparser_result["rtt_max"]))
        rtt_mdev = int(float(pingparser_result["rtt_mdev"]))
        packet_duplicate_rate = int(float(pingparser_result["packet_duplicate_rate"]))
        packet_duplicate_count = int(float(pingparser_result["packet_duplicate_count"]))
    except:
        # -1 indicates failure to reach the wts and calculate the above values
        rtt_min = -1
        rtt_avg = -1
        rtt_max = -1
        rtt_mdev = -1
        packet_duplicate_rate = -1
        packet_duplicate_count = -1
    overall_dictionary["pingRttMin"] = str(rtt_min)
    overall_dictionary["pingRttAvg"] = str(rtt_avg)
    overall_dictionary["pingRttMax"] = str(rtt_max)
    overall_dictionary["pingRttMdev"] = str(rtt_mdev)
    overall_dictionary["pingPacketDuplicateRate"] = str(packet_duplicate_rate)
    overall_dictionary["pingPacketDuplicateCount"] = str(packet_duplicate_count)
    # values from iw* commands
    overall_dictionary["macAddress"] = "\"" + str(mac) + "\""
    overall_dictionary["accesspoint"] = "\"" + str(accesspoint) + "\""
    overall_dictionary["essid"] = "\"" + str(essid) + "\""
    bit_rate = int(float(bit_rate))
    overall_dictionary["bitRate"] = str(bit_rate)
    tx_power = int(float(tx_power))
    overall_dictionary["txPower"] = str(tx_power)
    link_quality = int(float(link_quality))
    overall_dictionary["linkQuality"] = str(link_quality)
    signal_level = int(float(signal_level))
    overall_dictionary["signalLevel"] = str(signal_level)
    overall_dictionary["probeNo"] = str(probe_no)
    information = json.dumps(information)
    overall_dictionary["monitor"] = information
    # values defined by administrator
    overall_dictionary["locationName"] = "\"" + str(location_name) + "\""
    overall_dictionary["testDeviceLocationDescription"] = "\"" + str(test_device_location_description) + "\""
    overall_dictionary["nat"] = "\"" + str(nat_network) + "\""
    # values received through arp-scan command
    overall_dictionary["numberOfUsers"] = "\"" + str(number_of_users) + "\""
    system_dictionary = json.dumps(system_dictionary)
    # values received from system commands (memory, cpu, disk)
    overall_dictionary["system"] = system_dictionary
    json_data = json.dumps(overall_dictionary)
    return json_data

def processing_info():
    command = '''echo "$(iostat | head -1 | awk '{print $1}')"'''
    operating_system = return_command_output(command).decode('utf8')
    command = '''echo "$(iostat | head -1 | awk '{print $2}')"'''
    driver_version = return_command_output(command).decode('utf8')
    command = '''echo "$(iostat | head -1 | awk '{print $6}' | cut -c 2-)"'''
    total_cores = return_command_output(command).decode('utf8')
    command = '''echo "$(vmstat 1 2|tail -1|awk '{print $15}')"'''
    cpu_utilization = 100 - int(return_command_output(command).decode('utf8'))
    command = '''echo "$(vmstat --stats | grep 'total memory' | tail -1 | awk '{print $1}')"'''
    total_memory = return_command_output(command).decode('utf8')
    command = '''echo "$(vmstat --stats | grep 'used memory' | tail -1 | awk '{print $1}')"'''
    used_memory = return_command_output(command).decode('utf8')
    command = '''echo "$(df -h / | tail -1 | awk '{print $2}')"'''
    total_disk_size = return_command_output(command).decode('utf8')
    command = '''echo "$(df -h / | tail -1 | awk '{print $3}')"'''
    used_disk_size = return_command_output(command).decode('utf8')

    system_dictionary = {}
    system_dictionary["operatingSystem"] = str(operating_system)
    system_dictionary["driverVersion"] = str(driver_version)
    system_dictionary["totalCores"] = str(total_cores) 
    system_dictionary["cpuUtilization"] = str(cpu_utilization)
    system_dictionary["totalMemory"] = str(total_memory) 
    system_dictionary["usedMemory"] = str(used_memory)
    system_dictionary["totalDiskSize"] = str(total_disk_size) 
    system_dictionary["usedDiskSize"] = str(used_disk_size)

    return system_dictionary

def stream_data(data):
    headers = {'content-type':"application/json"}
    try:
        session = requests.Session()
        session.verify = False
        session.post(url='https://INSERT_WAS_FQDN:443/wifimon/probes/', data=data, headers=headers, timeout=30)
    except:
        pass

def parse_arpscan(result):
    lines = result.split("\n")
    lines.pop(0)
    lines.pop(0)
    space_line = lines.index('')
    return space_line

def arpscanner():
    command = "sudo arp-scan --localnet"
    arpscan_result = return_command_output(command).decode('utf8')
    number_of_users = parse_arpscan(arpscan_result)
    return number_of_users

def pingparser(wts):
    # See: https://github.com/thombashi/pingparsing
    ping_parser = pingparsing.PingParsing()
    transmitter = pingparsing.PingTransmitter()
    transmitter.destination = str(wts)
    transmitter.count = 3
    result = transmitter.ping()
    result_json = json.dumps(ping_parser.parse(result).as_dict(), indent=4)
    result_dict = json.loads(result_json)
    return result_dict

def set_location_information():
    location_name = "INSERT_LOCATION_NAME"
    test_device_location_description = "INSERT_TEST_DEVICE_LOCATION_DESCRIPTION"
    nat_network = "INSERT_True_OR_False"
    return location_name, test_device_location_description, nat_network

def general_info():
    system_dictionary = processing_info()
    location_name, test_device_location_description, nat_network = set_location_information()
    iface_name = find_wlan_iface_name()
    mac = get_mac(iface_name)
    bit_rate, tx_power, link_quality, signal_level, accesspoint, essid = parse_iwconfig(iface_name)
    information = parse_iwlist(iface_name, accesspoint)
    probe_no = "INSERT_PROBE_NUMBER"
    wts = "INSERT_WTS_FQDN"
    number_of_users = arpscanner()
    pingparser_result = pingparser(wts)
    json_data = convert_info_to_json(accesspoint, essid, mac, bit_rate, tx_power, link_quality, signal_level, probe_no, information, location_name, test_device_location_description, nat_network, system_dictionary, number_of_users, pingparser_result)
    stream_data(json_data)

if __name__ == "__main__":
    general_info()

The following values should be set:

...