Make sure ping3 is installed.

meausre-academy-latency.py
Copied to clipboard
#!/usr/bin/env python
# Recommended server feature added by https://github.com/0xBienCuit

import asyncio
import curses
import sys
import time
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor

try:
    import ping3
except ImportError:
    print("ping3 module not installed, install using:")
    print("pip install ping3")
    sys.exit(1)

servers = [
    "edge-us-academy-1.hackthebox.eu",
    "edge-us-academy-2.hackthebox.eu",
    "edge-us-academy-3.hackthebox.eu",
    "edge-us-academy-4.hackthebox.eu",
    "edge-us-academy-5.hackthebox.eu",
    "edge-us-academy-6.hackthebox.eu",
    "edge-eu-academy-1.hackthebox.eu",
    "edge-eu-academy-2.hackthebox.eu",
    "edge-eu-academy-3.hackthebox.eu",
    "edge-eu-academy-4.hackthebox.eu",
    "edge-eu-academy-5.hackthebox.eu",
    "edge-eu-academy-6.hackthebox.eu",
]

# Initialise a dictionary to hold deque (rolling window) for each server
latency_data = {server: deque(maxlen=20) for server in servers}
# Initialise counters for packets sent and dropped
packets_sent = defaultdict(int)
packets_dropped = defaultdict(int)


def get_latency(server):
    packets_sent[server] += 1
    try:
        latency = ping3.ping(server)
        if latency is None:
            packets_dropped[server] += 1
            return None
        else:
            return latency * 1000
    except ping3.errors as e:
        packets_dropped[server] += 1
        return None


def calculate_jitter(latencies):
    if len(latencies) < 2:
        return None
    jitter = sum(
        abs(latencies[i] - latencies[i - 1]) for i in range(1, len(latencies))
    ) / (len(latencies) - 1)
    return jitter


def calculate_weighted_score(latency, packet_loss, jitter):
    # Define weights
    latency_weight = 0.3
    packet_loss_weight = 0.4
    jitter_weight = 0.3
    # Normalise values (assuming maximums for scaling)
    normalized_latency = latency / 100
    normalized_packet_loss = packet_loss / 100
    normalized_jitter = jitter / 100 if jitter is not None else 0
    # Calculate weighted score
    score = (
        latency_weight * normalized_latency
        + packet_loss_weight * normalized_packet_loss
        + jitter_weight * normalized_jitter
    )
    return score


async def measure_latency(stdscr):
    curses.curs_set(0)
    stdscr.nodelay(1)
    executor = ThreadPoolExecutor(max_workers=len(servers))

    while True:
        loop = asyncio.get_event_loop()
        tasks = [
            loop.run_in_executor(executor, get_latency, server) for server in servers
        ]
        results = await asyncio.gather(*tasks)

        for server, latency in zip(servers, results):
            if latency is not None:
                latency_data[server].append(latency)

        stdscr.clear()
        best_server = None
        best_score = float("inf")

        for i, server in enumerate(servers):
            if latency_data[server]:
                avg_latency = sum(latency_data[server]) / len(latency_data[server])
                jitter = calculate_jitter(latency_data[server])
                jitter_str = f"{jitter:.2f} ms" if jitter is not None else "N/A"
                packets_loss_percentage = (
                    packets_dropped[server] / packets_sent[server]
                ) * 100

                score = calculate_weighted_score(
                    avg_latency, packets_loss_percentage, jitter
                )

                if score < best_score:
                    best_score = score
                    best_server = server

                stdscr.addstr(
                    i,
                    0,
                    f"{server}: {avg_latency:.2f} ms (avg) | {jitter_str} (jitter) | "
                    f"{packets_loss_percentage:.2f}% packets dropped over last {len(latency_data[server])} measurements | Weighted score: {score:.2f}",
                )
            else:
                stdscr.addstr(i, 0, f"{server}: No data")

        if best_server:
            stdscr.addstr(len(servers) + 1, 0, f"Recommended server: {best_server}")
        else:
            stdscr.addstr(
                len(servers) + 1,
                0,
                "No recommended server due to insufficient data",
            )

        stdscr.refresh()

        await asyncio.sleep(
            0.5
        )  # Reduced delay between measurements for faster updates
        if stdscr.getch() == ord("q"):
            break
            
if __name__ == "__main__":
    try:
        curses.wrapper(lambda stdscr: asyncio.run(measure_latency(stdscr)))
    except KeyboardInterrupt:
        print("\nScript stopped by user.")