Friday, September 12, 2025

Graphing the Average Speed (WPM) of a Morse Code CQ

-----

For ham radio operators that use Morse Code (CW) the Reverse Beacon Network is an amazing tool that does many things.  However, one thing we wanted to know was what is the average WPM of the CQ calls the RBN reports.

-----

Our method to determine this average WPM was to use a Python script that:

     - Logs into via telnet to RBN at the top of each hour.

     - Capture the data stream of every CW CQ the RBN sees for 60 seconds then logout.

     - Parse the captured data to determine the Max WPM and Min WPM, plus calculate the average WPM for display on a rolling 24 hour graph.

-----

Here is what a rolling 24 hour period in September 2025 looked like.  Overall it looks like the average is in the low 20WPM.  You can also see that for this 60 second sample the Min WPM was 12 and the Max WPM was 26.  We are curious to see how these speeds increase during a contest weekend.

-----
We find the data interesting.  If you do, then here is the source code:
 
# Calculate Average WPM by capturing the RBN
# stream at the top of each hour for 60 seconds and
# graphing the ave WPM on a rolling 24 hour graph.
#
#  https://www.whiskeytangohotel.com/
#  SEPT 2025
#


import telnetlib
import time
import sys
import re
from collections import deque
from datetime import datetime, timedelta

# For non-blocking key detection on Windows
if sys.platform.startswith('win'):
    import msvcrt
else:
    import sys, select, termios, tty

# Connection details
HOST = "telnet.reversebeacon.net"
PORT = 7000
CALLSIGN = "YOURCALLSIGNHERE"

# Maximum width of the bar graph in characters
MAX_BAR_WIDTH = 50
# Maximum expected WPM value for scaling
MAX_WPM = 40
# Number of hourly cycles to display (24 hours)
ROLLING_CYCLES = 24

# ANSI color codes
COLOR_YELLOW = "\033[93m"
COLOR_RESET = "\033[0m"

# Deques to keep rolling history
avg_history = deque(maxlen=ROLLING_CYCLES)
time_history = deque(maxlen=ROLLING_CYCLES)

def render_bar(value, color):
    bar_length = int(value * MAX_BAR_WIDTH / MAX_WPM)
    bar_str = "█" * max(bar_length, 1)
    val_str = str(int(value))
    mid_pos = len(bar_str) // 2
    bar_str = bar_str[:mid_pos] + val_str + bar_str[mid_pos + len(val_str):]
    return color + bar_str + COLOR_RESET

def print_rolling_graph():
    print("\nRolling Average WPM (last %d hours):" % len(avg_history))
    header_avg = "Avg".center(MAX_BAR_WIDTH)
    print("%-8s %s" % ("Time", header_avg))
    for t, avg in zip(time_history, avg_history):
        print("%-8s %s" % (t, render_bar(avg, COLOR_YELLOW)))

def collect_rbn_data(duration_seconds=60):
    """Connect to RBN and collect data for a specified duration."""
    cq_count = 0
    wpm_values = []
    try:
        tn = telnetlib.Telnet(HOST, PORT, timeout=10)
        tn.write(CALLSIGN.encode("utf-8") + b"\n")
        print("Connected and logged in as %s" % CALLSIGN)
        start_time = time.time()
        while time.time() - start_time < duration_seconds:
            raw = tn.read_very_eager()
            if raw:
                lines = raw.decode("utf-8", errors="replace").splitlines()
                for line in lines:
                    sys.stdout.write(line + "\n")
                    sys.stdout.flush()
                    if "CQ" in line and "WPM" in line:
                        match = re.search(r"(\d+)\s*WPM", line)
                        if match:
                            wpm = int(match.group(1))
                            cq_count += 1
                            wpm_values.append(wpm)
            time.sleep(0.1)
        tn.close()
        print("RBN stream data collected for %d seconds." % duration_seconds)
    except Exception as e:
        print("Error collecting RBN data: %s" % str(e))
    return cq_count, wpm_values

def spacebar_pressed():
    """Return True if spacebar is pressed (non-blocking)."""
    if sys.platform.startswith('win'):
        return msvcrt.kbhit() and msvcrt.getch() == b' '
    else:
        dr,dw,de = select.select([sys.stdin], [], [], 0)
        if dr:
            c = sys.stdin.read(1)
            return c == ' '
    return False

def main():
    if not sys.platform.startswith('win'):
        # Unix terminal setup
        old_settings = termios.tcgetattr(sys.stdin)
        tty.setcbreak(sys.stdin.fileno())

    try:
        while True:
            now = datetime.now()
            next_hour = (now.replace(minute=0, second=0, microsecond=0) + timedelta(hours=1))
            print("Next summary graph update scheduled for %s" % next_hour.strftime("%H:%M:%S"))
            print("Press SPACE BAR to update now.")

            while True:
                # Check for spacebar press
                if spacebar_pressed():
                    print("\nSpacebar pressed — updating now!")
                    break

                # Check if top of hour reached
                if datetime.now() >= next_hour:
                    print("\nTop-of-hour reached — updating now!")
                    break

                time.sleep(0.5)

            # Collect RBN stream for 60 seconds
            cq_count, wpm_values = collect_rbn_data(duration_seconds=60)

            if cq_count > 0:
                min_wpm = min(wpm_values)
                avg_wpm = sum(wpm_values) / float(len(wpm_values))
                max_wpm = max(wpm_values)
            else:
                min_wpm = avg_wpm = max_wpm = 0

            timestamp = datetime.now().strftime("%H:%M:%S")
            avg_history.append(avg_wpm)
            time_history.append(timestamp)

            print("\nSummary for last 60s:")
            print("  CQ lines: %d" % cq_count)
            print("  Min WPM: %3d" % min_wpm)
            print("  Avg WPM: %3d" % int(avg_wpm))
            print("  Max WPM: %3d" % max_wpm)

            print_rolling_graph()
            print("\n--- Waiting for next update ---\n")

    except KeyboardInterrupt:
        print("\nProgram stopped by user.")

    finally:
        if not sys.platform.startswith('win'):
            termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)

if __name__ == "__main__":
    main()
-----