-----
For ham radio operators that use Morse Code (CW) the Reverse Beacon Network is an amazing tool that does many things. However, one thing we wanted to know was what is the average WPM of the CQ calls the RBN reports.
One comment about ham radio before we continue. It's really a hobby with a few thousand hobbies embedded into it. Lately around here it's been about writing SW. Getting a license is easy and the hobby is great fun. With that advertisement out of the way....
-----
Our method to determine this average Morse Code (CW) WPM was to use a Python script that:
- Logs into via telnet to RBN at the top of each hour.
- Capture the data stream of every CW CQ the RBN sees for 60 seconds then logout.
- Parse the captured data to determine the Max WPM and Min WPM, plus calculate the average WPM.
- Create a rolling 24 hour graph to chart these results in 1) a terminal window, 2) a webpage viewable on your LAN (ex: 192.168.1.73), and/or 3) a public facing webpage via io.adafruit.com which for our project can be view here.
-----
Example screenshots of the three outputs listed above We are curious to see how these speeds increase during a contest weekend.
# by capturing the RBN stream for 60 seconds
# and graphing the ave WPM on a rollong 24 hour graph
# to the screen terminal on the LAN and publically to
# https://io.adafruit.com/ironjungle/dashboards/rbn-stats-by-whiskeytangohotel-dot-com?kiosk=true
#
# https://www.whiskeytangohotel.com/2025/09/graphing-average-speed-wpm-of-morse.html
# SEPT 2025
#
import telnetlib
import time
import sys
import re
from collections import deque
from datetime import datetime, timedelta
import threading
from http.server import SimpleHTTPRequestHandler, HTTPServer
import socket
import select
import os
import requests # <-- Added for Adafruit IO uploads
# For non-blocking key detection on Windows
if sys.platform.startswith('win'):
import msvcrt
else:
import termios, tty
# RBN Connection details
HOST = "telnet.reversebeacon.net"
PORT = 7000
CALLSIGN = "CALLSIGN"
# Adafruit IO credentials
ADAFRUIT_IO_USERNAME = "ADAFRUIT_IO_USERNAME"
ADAFRUIT_IO_KEY = "ADAFRUIT_IO_KEY"
# Maximum width of the terminal bar graph in characters
MAX_BAR_WIDTH = 100
# Maximum expected WPM value for scaling
MAX_WPM = 50 # updated as requested
# Number of hourly cycles to display (24 hours)
ROLLING_CYCLES = 24
# ANSI color codes (used only for terminal output)
COLOR_YELLOW = "\033[93m"
COLOR_GREEN = "\033[92m"
COLOR_RESET = "\033[0m"
def send_to_adafruit(feed, value):
"""Send a numeric value to a given Adafruit IO feed."""
try:
url = "https://io.adafruit.com/api/v2/{}/feeds/{}/data".format(
ADAFRUIT_IO_USERNAME, feed)
headers = {
"X-AIO-Key": ADAFRUIT_IO_KEY,
"Content-Type": "application/json"
}
data = {"value": value}
r = requests.post(url, json=data, headers=headers, timeout=10)
if r.status_code != 200:
print("⚠️ Adafruit IO error for {}: {}".format(feed, r.text))
except Exception as e:
print("⚠️ Could not send to Adafruit IO: {}".format(str(e)))
# Deques to keep rolling history
avg_history = deque(maxlen=ROLLING_CYCLES)
time_history = deque(maxlen=ROLLING_CYCLES)
cq_history = deque(maxlen=ROLLING_CYCLES)
min_wpm_history = deque(maxlen=ROLLING_CYCLES)
max_wpm_history = deque(maxlen=ROLLING_CYCLES)
def render_bar(value, color):
"""Render a colored bar for terminal output."""
bar_length = int(value * MAX_BAR_WIDTH / MAX_WPM)
bar_str = "█" * max(bar_length, 1)
val_str = str(int(value))
mid_pos = len(bar_str) // 2
bar_str = bar_str[:mid_pos] + val_str + bar_str[mid_pos + len(val_str):]
return color + bar_str + COLOR_RESET
def render_bar_plain(value):
"""Render a plain bar for web output (to be colored in HTML)."""
bar_length = int(value * MAX_BAR_WIDTH / MAX_WPM)
bar_str = "█" * max(bar_length, 1)
val_str = str(int(value))
mid_pos = len(bar_str) // 2
bar_str = bar_str[:mid_pos] + val_str + bar_str[mid_pos + len(val_str):]
return bar_str
def print_and_save_graph(cq_count, min_wpm, avg_wpm, max_wpm, timestamp, next_update_time):
"""Print summary and rolling graph to terminal and save plain HTML for web."""
# Update rolling history
avg_history.append(avg_wpm)
time_history.append(timestamp)
cq_history.append(cq_count)
min_wpm_history.append(min_wpm)
max_wpm_history.append(max_wpm)
# Terminal output
output_lines = []
output_lines.append("\nSummary of last 60s collected at:")
output_lines.append(" Central Time: %s:00" % timestamp)
output_lines.append(" CQ lines: %d" % cq_count)
output_lines.append(" Min WPM: %3d" % min_wpm)
output_lines.append(" Avg WPM: %3d" % int(avg_wpm))
output_lines.append(" Max WPM: %3d" % max_wpm)
output_lines.append("\nRolling Average WPM")
header_avg = "WhiskeyTangoHotel.Com".center(MAX_BAR_WIDTH)
output_lines.append("%-12s %s" % ("Central Time", header_avg))
for t, avg in zip(time_history, avg_history):
output_lines.append("%-12s %s" % (t + ":00", render_bar(avg, COLOR_YELLOW)))
terminal_output = "\n".join(output_lines)
print(terminal_output)
# HTML output for web page
html_lines = []
html_lines.append("<html><head><title>WPM Average Graph</title>")
html_lines.append("<style>body{background:white;color:black;font-family:monospace;} .bar{color:blue;}</style></head><body>")
html_lines.append("<pre>")
html_lines.append("Summary of last 60s collected at:")
html_lines.append(" Central Time: %s:00" % timestamp)
html_lines.append(" CQ lines: %d" % cq_count)
html_lines.append(" Min WPM: %3d" % min_wpm)
html_lines.append(" Avg WPM: %3d" % int(avg_wpm))
html_lines.append(" Max WPM: %3d" % max_wpm)
html_lines.append("\nRolling Average WPM")
html_lines.append("%-12s %s" % ("Central Time", header_avg))
for t, avg in zip(time_history, avg_history):
bar = render_bar_plain(avg)
html_lines.append("%-12s <span class='bar'>%s</span>" % (t + ":00", bar))
html_lines.append("\nNext summary graph update scheduled for: %s" % next_update_time.strftime("%H:%M:%S"))
html_lines.append("</pre></body></html>")
with open("graph.txt", "w", encoding="utf-8") as f:
f.write("\n".join(html_lines))
def collect_rbn_data(duration_seconds=60):
"""Connect to RBN and collect data for a specified duration."""
cq_count = 0
wpm_values = []
try:
tn = telnetlib.Telnet(HOST, PORT, timeout=10)
tn.write(CALLSIGN.encode("utf-8") + b"\n")
print("Connected and logged in as %s" % CALLSIGN)
start_time = time.time()
while time.time() - start_time < duration_seconds:
raw = tn.read_very_eager()
if raw:
lines = raw.decode("utf-8", errors="replace").splitlines()
for line in lines:
sys.stdout.write(line + "\n")
sys.stdout.flush()
if "CQ" in line and "WPM" in line:
match = re.search(r"(\d+)\s*WPM", line)
if match:
wpm = int(match.group(1))
cq_count += 1
wpm_values.append(wpm)
time.sleep(0.1)
tn.close()
print("RBN stream data collected for %d seconds." % duration_seconds)
except Exception as e:
print("Error collecting RBN data: %s" % str(e))
return cq_count, wpm_values
def spacebar_pressed():
"""Return True if spacebar is pressed (non-blocking)."""
if sys.platform.startswith('win'):
return msvcrt.kbhit() and msvcrt.getch() == b' '
else:
dr, dw, de = select.select([sys.stdin], [], [], 0)
if dr:
c = sys.stdin.read(1)
return c == ' '
return False
# Web server section
class Handler(SimpleHTTPRequestHandler):
def do_GET(self):
if self.path == '/' or self.path == '/index.html':
self.send_response(200)
self.send_header("Content-type", "text/html; charset=utf-8")
self.end_headers()
try:
with open("graph.txt", "r", encoding="utf-8") as f:
self.wfile.write(f.read().encode("utf-8"))
except Exception:
self.wfile.write(b"No graph data yet.")
elif self.path == '/favicon.ico':
self.send_response(204)
self.end_headers()
else:
self.send_error(404)
def log_message(self, format, *args):
"""Override to suppress logging to terminal."""
pass
def get_local_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
except Exception:
ip = "127.0.0.1"
finally:
s.close()
return ip
def start_web_server():
HTTP_PORT = 8080
server = HTTPServer(('', HTTP_PORT), Handler)
print("\nWeb server running on http://%s:%d/" % (get_local_ip(), HTTP_PORT))
server.serve_forever()
# Start web server in background
web_thread = threading.Thread(target=start_web_server)
web_thread.setDaemon(True)
web_thread.start()
def main():
if not sys.platform.startswith('win'):
old_settings = termios.tcgetattr(sys.stdin)
tty.setcbreak(sys.stdin.fileno())
try:
while True:
now = datetime.now()
# --- Schedule ONLY :30 after each hour ---
if now.minute < 30:
next_update_time = now.replace(minute=30, second=0, microsecond=0)
else:
next_update_time = (now + timedelta(hours=1)).replace(minute=30, second=0, microsecond=0)
print("Next summary graph update scheduled for %s" % next_update_time.strftime("%H:%M:%S"))
print("Press SPACE BAR to update now.")
while True:
if spacebar_pressed():
print("\nSpacebar pressed — updating now!")
break
if datetime.now() >= next_update_time:
print("\n:30 minute mark reached — updating now!")
break
time.sleep(0.5)
# --- Use start of collection time (rounded to nearest minute) ---
start_time = datetime.now()
rounded_start = (start_time + timedelta(seconds=30)).replace(second=0, microsecond=0)
timestamp = rounded_start.strftime("%H:%M") # will append ":00" in output
cq_count, wpm_values = collect_rbn_data(duration_seconds=60)
if cq_count > 0:
min_wpm = min(wpm_values)
avg_wpm = sum(wpm_values)/float(len(wpm_values))
max_wpm = max(wpm_values)
else:
min_wpm = avg_wpm = max_wpm = 0
# Recompute next :30 AFTER collection so the webpage shows the true next scheduled :30
now_after = datetime.now()
if now_after.minute < 30:
next_update_time = now_after.replace(minute=30, second=0, microsecond=0)
else:
next_update_time = (now_after + timedelta(hours=1)).replace(minute=30, second=0, microsecond=0)
print_and_save_graph(cq_count, min_wpm, avg_wpm, max_wpm, timestamp, next_update_time)
# --- Send results to Adafruit IO ---
send_to_adafruit("average-wpm", avg_wpm)
send_to_adafruit("min-wpm", min_wpm)
send_to_adafruit("max-wpm", max_wpm)
send_to_adafruit("cq-count", cq_count)
print("\n--- Waiting for next update ---\n")
except KeyboardInterrupt:
print("\nProgram stopped by user.")
finally:
if not sys.platform.startswith('win'):
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
if __name__ == "__main__":
main()