28 Jul, 2024
2 minutes
HTB Academy Latency Script
Script to measure latency to all HTB academy servers and find the best server for you
Make sure ping3
is installed.
meausre-academy-latency.py
#!/usr/bin/env python
# Recommended server feature added by https://github.com/0xBienCuit
import asyncio
import curses
import sys
import time
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
try:
import ping3
except ImportError:
print("ping3 module not installed, install using:")
print("pip install ping3")
sys.exit(1)
servers = [
"edge-us-academy-1.hackthebox.eu",
"edge-us-academy-2.hackthebox.eu",
"edge-us-academy-3.hackthebox.eu",
"edge-us-academy-4.hackthebox.eu",
"edge-us-academy-5.hackthebox.eu",
"edge-us-academy-6.hackthebox.eu",
"edge-eu-academy-1.hackthebox.eu",
"edge-eu-academy-2.hackthebox.eu",
"edge-eu-academy-3.hackthebox.eu",
"edge-eu-academy-4.hackthebox.eu",
"edge-eu-academy-5.hackthebox.eu",
"edge-eu-academy-6.hackthebox.eu",
]
# Initialise a dictionary to hold deque (rolling window) for each server
latency_data = {server: deque(maxlen=20) for server in servers}
# Initialise counters for packets sent and dropped
packets_sent = defaultdict(int)
packets_dropped = defaultdict(int)
def get_latency(server):
packets_sent[server] += 1
try:
latency = ping3.ping(server)
if latency is None:
packets_dropped[server] += 1
return None
else:
return latency * 1000
except ping3.errors as e:
packets_dropped[server] += 1
return None
def calculate_jitter(latencies):
if len(latencies) < 2:
return None
jitter = sum(
abs(latencies[i] - latencies[i - 1]) for i in range(1, len(latencies))
) / (len(latencies) - 1)
return jitter
def calculate_weighted_score(latency, packet_loss, jitter):
# Define weights
latency_weight = 0.3
packet_loss_weight = 0.4
jitter_weight = 0.3
# Normalise values (assuming maximums for scaling)
normalized_latency = latency / 100
normalized_packet_loss = packet_loss / 100
normalized_jitter = jitter / 100 if jitter is not None else 0
# Calculate weighted score
score = (
latency_weight * normalized_latency
+ packet_loss_weight * normalized_packet_loss
+ jitter_weight * normalized_jitter
)
return score
async def measure_latency(stdscr):
curses.curs_set(0)
stdscr.nodelay(1)
executor = ThreadPoolExecutor(max_workers=len(servers))
while True:
loop = asyncio.get_event_loop()
tasks = [
loop.run_in_executor(executor, get_latency, server) for server in servers
]
results = await asyncio.gather(*tasks)
for server, latency in zip(servers, results):
if latency is not None:
latency_data[server].append(latency)
stdscr.clear()
best_server = None
best_score = float("inf")
for i, server in enumerate(servers):
if latency_data[server]:
avg_latency = sum(latency_data[server]) / len(latency_data[server])
jitter = calculate_jitter(latency_data[server])
jitter_str = f"{jitter:.2f} ms" if jitter is not None else "N/A"
packets_loss_percentage = (
packets_dropped[server] / packets_sent[server]
) * 100
score = calculate_weighted_score(
avg_latency, packets_loss_percentage, jitter
)
if score < best_score:
best_score = score
best_server = server
stdscr.addstr(
i,
0,
f"{server}: {avg_latency:.2f} ms (avg) | {jitter_str} (jitter) | "
f"{packets_loss_percentage:.2f}% packets dropped over last {len(latency_data[server])} measurements | Weighted score: {score:.2f}",
)
else:
stdscr.addstr(i, 0, f"{server}: No data")
if best_server:
stdscr.addstr(len(servers) + 1, 0, f"Recommended server: {best_server}")
else:
stdscr.addstr(
len(servers) + 1,
0,
"No recommended server due to insufficient data",
)
stdscr.refresh()
await asyncio.sleep(
0.5
) # Reduced delay between measurements for faster updates
if stdscr.getch() == ord("q"):
break
if __name__ == "__main__":
try:
curses.wrapper(lambda stdscr: asyncio.run(measure_latency(stdscr)))
except KeyboardInterrupt:
print("\nScript stopped by user.")
412 Words