...
| Code Block | ||||||||
|---|---|---|---|---|---|---|---|---|
| ||||||||
#!/usr/bin/python3
import subprocess
import datetime
import requests
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
import json
def return_command_output(command):
proc = subprocess.Popen(command, stdout = subprocess.PIPE, shell = True)
(out, err) = proc.communicate()
output = out.rstrip('\n'.encode('utf8'))
return output
def get_mac(iface):
command = "cat /sys/class/net/" + str(iface) + "/address"
mac = return_command_output(command).decode('utf8')
mac = mac.replace(":", "-")
return mac
def find_wlan_iface_name():
command = "printf '%s\n' /sys/class/net/*/wireless | awk -F'/' '{print $5 }'"
wlan_iface_name = return_command_output(command)
return wlan_iface_name.decode('utf8')
def parse_iwconfig(iface):
bit_rate = return_command_output("sudo iwconfig " + iface + " | grep Bit | awk '{print $2}' | sed 's/Rate=//'").decode('utf8')
tx_power = return_command_output("sudo iwconfig " + iface + " | grep Bit | awk '{print $4}' | sed 's/Tx-Power=//'").decode('utf8')
link_quality = return_command_output("sudo iwconfig " + iface + " | grep Link | awk '{print $2}' | sed 's/Quality=//'").decode('utf8')
link_quality = link_quality.split("/")[0]
signal_level = return_command_output("sudo iwconfig " + iface + " | grep Link | awk '{print $4}' | sed 's/level=//'").decode('utf8')
accesspoint = return_command_output("sudo iwconfig " + iface + " | grep Mode | awk '{print $6}' | sed 's/Point: //'").decode('utf8')
accesspoint = accesspoint.replace(":", "-")
essid = return_command_output("sudo iwconfig " + iface + " | grep ESSID | awk '{print $4}' | sed 's/ESSID://'").decode('utf8')
essid = essid.replace("\"", "")
return bit_rate, tx_power, link_quality, signal_level, accesspoint, essid
def parse_iwlist(iface, accesspoint):
information = {}
command = "sudo iwlist " + iface + " scan | grep -E \"Cell|Quality|ESSID\""
aps = return_command_output(command).decode("utf8")
aps = aps.split("\n")
cell_indices = list()
for index in range(0, len(aps)):
line_no_whitespace = ' '.join(aps[index].split())
parts = line_no_whitespace.split()
if parts[0] == "Cell":
cell_indices.append(index)
for index in cell_indices:
line0 = ' '.join(aps[index].split())
ap_mac = line0.split()[-1]
ap_mac = ap_mac.replace(":", "-")
information[ap_mac] = {}
line1 = ' '.join(aps[index + 1].split())
parts = line1.split()
information[ap_mac]["drillTest"] = float(parts[2].split("=")[1])
line2 = ' '.join(aps[index + 2].split())
parts = line2.split(":")
information[ap_mac][str(parts[1].replace('"', ''))] = information[ap_mac]["drillTest"]
return information
def convert_info_to_json(accesspoint, essid, mac, bit_rate, tx_power, link_quality, signal_level, probe_no, information, location_name, test_device_location_description, nat_network):
overall_dictionary = {}
overall_dictionary["macAddress"] = "\"" + str(mac) + "\""
overall_dictionary["accesspoint"] = "\"" + str(accesspoint) + "\""
overall_dictionary["essid"] = "\"" + str(essid) + "\""
bit_rate = int(float(bit_rate))
overall_dictionary["bitRate"] = str(bit_rate)
tx_power = int(float(tx_power))
overall_dictionary["txPower"] = str(tx_power)
link_quality = int(float(link_quality))
overall_dictionary["linkQuality"] = str(link_quality)
signal_level = int(float(signal_level))
overall_dictionary["signalLevel"] = str(signal_level)
overall_dictionary["probeNo"] = str(probe_no)
information = json.dumps(information)
overall_dictionary["monitor"] = information
overall_dictionary["locationName"] = "\"" + str(location_name) + "\""
overall_dictionary["testDeviceLocationDescription"] = "\"" + str(test_device_location_description) + "\""
overall_dictionary["nat"] = "\"" + str(nat_network) + "\""
json_data = json.dumps(overall_dictionary)
return json_data
def stream_data(data):
headers = {'content-type':"application/json"}
try:
session = requests.Session()
session.verify = False
session.post(url='https://WAS_FQDN:4438443/wifimon/probes/', data=data, headers=headers, timeout=30)
except:
pass
def set_location_information():
location_name = ""
test_device_location_description = ""
nat_network = ""
return location_name, test_device_location_description, nat_network
def wireless_info():
location_name, test_device_location_description, nat_network = set_location_information()
iface_name = find_wlan_iface_name()
mac = get_mac(iface_name)
bit_rate, tx_power, link_quality, signal_level, accesspoint, essid = parse_iwconfig(iface_name)
information = parse_iwlist(iface_name, accesspoint)
probe_no = ""
json_data = convert_info_to_json(accesspoint, essid, mac, bit_rate, tx_power, link_quality, signal_level, probe_no, information, location_name, test_device_location_description, nat_network)
stream_data(json_data)
if __name__ == "__main__":
wireless_info() |
...
- "probe_no" (line 109) should match the number assigned to the testtools of the particular WiFiMon Hardware Probe (WHP), e.g. for the WHP assigned the number 1, the value should be "1". Assigning numbers to WHPs is possible by appropriately setting the testtool attribute included in the websites monitored by them. More information related to assigning number to WHPs is available in the WiFiMon Test Server installation guide.
- "WAS_FQDN" (line 93) should match the FQDN of the WiFiMon Analysis Server (WAS) responsible for processing the wireless performance metrics of the WHP. The above code block assumes that the WAS uses https and port 4438443.
- LInes 98 to 100 can be filled with more information regarding the location of the WHP.
...