Appendix I - pass_scheduler.py - Next-Pass Selection

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
pass_scheduler.py — Legacy-workflow scheduler for .pass files

This version keeps your original workflow:
  TLE_Set_Builder.py  →  TLE_Parser_Test.py (writes ./Passes/<SAT>_<today>.pass)  →  pass_scheduler.py

What this script does
- Scans the ./Passes directory for all *.pass files.
- Each .pass file format:
    Line 1   : <SATELLITE NAME>
    Line 2+ : <start_utc_iso>,<end_utc_iso>   (e.g., 2025-10-20T03:10:32+00:00,2025-10-20T03:22:45+00:00)
- Finds the FIRST upcoming pass after the given time (default: now, UTC).
- Prints a human-friendly summary AND a machine-readable line, e.g.:
    NEXT_PASS,TELEOS-2,2025-10-20T03:10:32+00:00,2025-10-20T03:22:45+00:00,733

CLI examples
  # Default: pick the next pass from ./Passes using current UTC time
  python pass_scheduler.py

  # Point to a custom directory and also print Singapore local times
  python pass_scheduler.py --dir Passes --local

  # Show all future passes (across all files) starting from now
  python pass_scheduler.py --all --local
"""

from __future__ import annotations

import argparse
import datetime as dt
import os
import sys
from pathlib import Path
from typing import Optional, Tuple, List

# Optional import: only used to accept Skyfield Time objects for `current_time`
try:
    from skyfield.api import load as _sf_load  # noqa: F401
except Exception:  # pragma: no cover
    _sf_load = None

ASIA_SG = dt.timezone(dt.timedelta(hours=8))
UTC = dt.timezone.utc

# ----------------------------- helpers -----------------------------

def _ensure_utc_datetime(current_time) -> dt.datetime:
    """Accepts Python datetime or Skyfield Time; returns timezone-aware UTC datetime."""
    # Skyfield Time has .utc_datetime() method
    if hasattr(current_time, "utc_datetime"):
        current_time = current_time.utc_datetime()
    if not isinstance(current_time, dt.datetime):
        raise TypeError("current_time must be datetime or Skyfield Time")
    if current_time.tzinfo is None:
        current_time = current_time.replace(tzinfo=UTC)
    return current_time.astimezone(UTC)

def _iso_utc(d: dt.datetime) -> str:
    if d.tzinfo is None:
        d = d.replace(tzinfo=UTC)
    return d.astimezone(UTC).isoformat()

def _iso_sgt(d: dt.datetime) -> str:
    if d.tzinfo is None:
        d = d.replace(tzinfo=UTC)
    return d.astimezone(ASIA_SG).isoformat()

def humanize_seconds(total: float) -> str:
    """Convert duration in seconds → 'Xm Ys' or 'Xh Ym Zs'."""
    total = int(total)
    h, rem = divmod(total, 3600)
    m, s = divmod(rem, 60)
    if h > 0:
        return f"{h}h {m}m {s}s"
    elif m > 0:
        return f"{m}m {s}s"
    else:
        return f"{s}s"

# ----------------------- legacy-compatible API -----------------------

def convert_datetime(str_datetime: str) -> dt.datetime:
    """Parse timestamps written by your .pass generator.

    Accepts strings like:
      - '2025-10-20 03:10:32.123456+00:00'
      - '2025-10-20T03:10:32.123456+00:00'
      - '2025-10-20 03:10:32.123456' (assumed UTC)
      - '2025-10-20T03:10:32' (assumed UTC)
    """
    s = str_datetime.strip()
    s = s.replace('T', ' ')
    s = s.replace('+00:00', '').replace('Z', '')
    fmts = [
        '%Y-%m-%d %H:%M:%S.%f',
        '%Y-%m-%d %H:%M:%S',
    ]
    d = None
    for fmt in fmts:
        try:
            d = dt.datetime.strptime(s, fmt)
            break
        except ValueError:
            pass
    if d is None:
        raise ValueError(f"Could not parse datetime string: {str_datetime}")
    return d.replace(tzinfo=UTC)

def load_passfile(pass_file: str | os.PathLike, current_time) -> Optional[Tuple[str, dt.datetime, dt.datetime]]:
    """Return (satname, start_time_utc, end_time_utc) for the first future pass in the file.
    If no upcoming events in the file, return None.
    """
    now_utc = _ensure_utc_datetime(current_time)

    with open(pass_file, 'r', encoding='utf-8') as f:
        satname = f.readline().strip()  # first line
        for line in f:
            if not line.strip():
                continue
            try:
                start_s, end_s = [x.strip() for x in line.split(',')]
            except ValueError:
                continue
            start_dt = convert_datetime(start_s)
            end_dt = convert_datetime(end_s)
            if start_dt > now_utc:
                return satname, start_dt, end_dt
    return None

def select_pass(pass_directory: str = "Passes", current_time=None):
    """Scan all .pass files and return the next upcoming event across all satellites.

    Returns (start_dt_utc, end_dt_utc, satname, duration_seconds)
    or (None, None, None, None) if nothing upcoming.
    """
    if current_time is None:
        current_time = dt.datetime.now(UTC)
    now_utc = _ensure_utc_datetime(current_time)

    path = Path(pass_directory)
    if not path.exists():
        print(f"[INFO] Pass directory not found: {path}")
        return None, None, None, None

    best = None  # (start, end, name)
    for fname in sorted(path.glob('*.pass')):
        try:
            res = load_passfile(fname, now_utc)
        except Exception as e:
            print(f"[WARN] Skipping {fname}: {e}")
            continue
        if res is None:
            continue
        sat, start, end = res
        if best is None or start < best[0]:
            best = (start, end, sat)

    if best is None:
        print("All stored passes complete! Exiting the program!")
        return None, None, None, None

    start, end, sat = best
    duration = (end - start).total_seconds()
    print(f"Next pass: {sat} at {start.isoformat()} for {humanize_seconds(duration)}")
    return start, end, sat, duration

# ------------------------------ CLI ------------------------------

def _list_all_future(pass_directory: str, current_time, show_local: bool):
    now_utc = _ensure_utc_datetime(current_time)
    path = Path(pass_directory)
    rows: List[Tuple[str, dt.datetime, dt.datetime]] = []
    for fname in sorted(path.glob('*.pass')):
        try:
            with open(fname, 'r', encoding='utf-8') as f:
                satname = f.readline().strip()
                for line in f:
                    if not line.strip():
                        continue
                    try:
                        start_s, end_s = [x.strip() for x in line.split(',')]
                    except ValueError:
                        continue
                    start_dt = convert_datetime(start_s)
                    end_dt = convert_datetime(end_s)
                    if start_dt > now_utc:
                        rows.append((satname, start_dt, end_dt))
        except Exception as e:
            print(f"[WARN] Failed reading {fname}: {e}")
    rows.sort(key=lambda r: r[1])
    if not rows:
        print("[INFO] No upcoming passes found.")
        return
    print("\n=== Upcoming passes ===")
    for sat, s, e in rows:
        dur = int((e - s).total_seconds())
        line = f"{sat:12s}  {s.isoformat()}  →  {e.isoformat()}  ({humanize_seconds(dur)})"
        if show_local:
            line += f"  |  SGT: {_iso_sgt(s)} → {_iso_sgt(e)}"
        print(line)
    if rows:
        sat, s, e = rows[0]
        print("\nNEXT_PASS,{},{},{},{}".format(sat, _iso_utc(s), _iso_utc(e), int((e - s).total_seconds())))

def main():
    ap = argparse.ArgumentParser(description="Select the next satellite pass from legacy .pass files.")
    ap.add_argument('--dir', default='Passes', help='Directory containing .pass files (default: Passes)')
    ap.add_argument('--local', action='store_true', help='Also print Singapore local (SGT) times')
    ap.add_argument('--all', action='store_true', help='List all upcoming passes instead of only the next one')
    ap.add_argument('--now', type=str, default=None,
                    help='Override current UTC time (ISO, e.g., 2025-10-20T03:00:00)')

    args = ap.parse_args()

    # Determine current time
    if args.now:
        t = args.now.replace('T', ' ')
        fmts = ['%Y-%m-%d %H:%M:%S', '%Y-%m-%d %H:%M']
        parsed = None
        for fmt in fmts:
            try:
                parsed = dt.datetime.strptime(t, fmt).replace(tzinfo=UTC)
                break
            except ValueError:
                continue
        if parsed is None:
            print('[ERROR] --now must be ISO like 2025-10-20T03:00:00')
            return 2
        now = parsed
    else:
        now = dt.datetime.now(UTC)

    if args.all:
        _list_all_future(args.dir, now, args.local)
        return 0

    start, end, sat, dur = select_pass(args.dir, now)
    if start is None:
        return 0

    # Human-friendly
    print("\n==== NEXT PASS ====")
    print(f"Satellite : {sat}")
    print(f"Start UTC : {start.isoformat()}")
    print(f"End   UTC : {end.isoformat()}")
    print(f"Duration  : {humanize_seconds(dur)}")
    if args.local:
        print(f"Start SGT : {_iso_sgt(start)}")
        print(f"End   SGT : {_iso_sgt(end)}")

    # Machine-readable for scripts
    print(f"NEXT_PASS,{sat},{_iso_utc(start)},{_iso_utc(end)},{int(dur)}")
    return 0

if __name__ == '__main__':
    raise SystemExit(main())


Copyright © 2025-2026 Galassia-5 Satellite Programme