Appendix G - gimbal_lib.py - High-Level Motion Control (imports gclib.py)

# -*- coding: utf-8 -*-
"""
Simplified 2-Axis Gimbal Controller (AZ/EL) — NSC-G3-E / Galil-safe

Enhancements:
- Adds streaming-safe Position Tracking (PT) mode for continuous target updates
  without BG/AM/MC, preventing TC:7 ("not valid while running") during PCHIP streams.
- Keeps classic PA/PR + BG flows for non-streaming helpers (move_absolute, move_relative).
- Uses _TNX/_TNY polling for waits (as before) and adds a settle loop for PT.

Key choices (unchanged unless noted):
- Classic PA/PR/BG supported.
- AC/DC/SP classic 'AC x,y', 'DC x,y', 'SP x,y'.
- One command per line (no semicolons).
- Waits via _TNX/_TNY polling (no AMX/AMY).
- Light connection retry (given string → raw IP → '-d').
"""

import time
import numpy as np
from pathlib import Path

try:
    import gclib
except ImportError:
    gclib = None  # simulation mode if library isn't present

# ------------------- USER LIMITS -------------------
AZ_MIN, AZ_MAX = -90.0, 90.0
EL_MIN, EL_MAX = -90.0, 90.0
MAX_STEP_DEG = None  # reserved for future segmentation
# ---------------------------------------------------

STATE_FILE = "dependencies/gimbal_state/currGimbalPosition.txt"
MIN_PR_COUNTS = 10    # deadband to avoid tiny moves
MIN_PA_DELTA = 6      # counts; ignore microscopic PA updates in PT

AXIS_AZ = 'X'
AXIS_EL = 'Y'

# Counts-per-degree (calibrated)
CNT_PER_DEG_AZ = 10000
CNT_PER_DEG_EL = 10000


def _clip(v, lo, hi):
    return max(lo, min(hi, v))


def _safe_load_pos():
    try:
        p = Path(STATE_FILE)
        if not p.exists():
            return [0.0, 0.0, 0.0]
        vals = np.loadtxt(p)
        return [float(vals[0]), float(vals[1]), float(vals[2])]
    except Exception:
        return [0.0, 0.0, 0.0]


def _safe_save_pos(vec3):
    try:
        Path(STATE_FILE).parent.mkdir(parents=True, exist_ok=True)
        np.savetxt(STATE_FILE, np.asarray(vec3))
    except Exception as e:
        print(f"[WARN] Could not save position: {e}")


class GimbalController:
    def __init__(self, connection, cnt_per_deg=(CNT_PER_DEG_AZ, CNT_PER_DEG_EL),
                 assume_zero_on_connect=True, streaming=True):
        """
        connection: e.g. '192.168.1.2' or '192.168.1.2 -d'
        assume_zero_on_connect=True:
            Define current counts as 0,0 on connect without moving hardware (DP 0,0).
        streaming=True:
            Enable Position Tracking (PT) mode so PA targets can be updated DURING motion
            with no BG — ideal for high-rate PCHIP streams.
        """
        print("[INIT] Connecting to Galil Controller...")
        self.cnt_az, self.cnt_el = cnt_per_deg
        self.curr_az, self.curr_el = 0.0, 0.0
        self.curr_pos = _safe_load_pos()
        self._assume_zero = assume_zero_on_connect
        self.streaming = streaming

        self.sim = gclib is None
        if self.sim:
            print("[SIMULATION MODE] No gclib available; motions won't go to hardware.")
            return

        # --- Light connection retry for robustness ---
        self.g = gclib.py()
        variants = []
        s = str(connection).strip()
        parts = s.split()

        if s:
            variants.append(s)  # as provided
        if parts:
            ip_only = parts[0]
            if ip_only not in variants:
                variants.append(ip_only)
            dash_d = f"{ip_only} -d"
            if dash_d not in variants:
                variants.append(dash_d)

        last_err = None
        for cand in variants:
            try:
                self.g.GOpen(cand)
                try:
                    self.g.timeout = 120000  # ms
                except Exception:
                    pass
                try:
                    print(self.g.GInfo().strip())
                except Exception:
                    print("[INFO] Connected.")
                break
            except Exception as e:
                last_err = e
                try:
                    self.g.GClose()
                except Exception:
                    pass
                time.sleep(0.2)
        else:
            raise RuntimeError(f"Failed to connect using {variants}: {last_err}")

        # Bring controller to a known state and set motion params
        self._setup_motion()

        # Enter PT if streaming (so PA can be updated during motion)
        if self.streaming:
            self._enter_pt()

        # Sync our object state from controller (counts → deg)
        self._sync_from_controller()

    # -------------- low-level I/O --------------
    def _cmd(self, s, quiet=False):
        """Send a raw Galil command, with '?' handling that also fetches TC when possible."""
        try:
            resp = self.g.GCommand(s)
            if not quiet:
                msg = resp.strip().replace("\r", " ").replace("\n", " ")
                print(f"[GALIL] {s}" + ("" if not msg else f" -> {msg}"))
            return resp
        except gclib.GclibError as e:
            # Try to fetch TC code for diagnostics
            try:
                tc = self.g.GCommand("TC").strip()
            except Exception:
                tc = "<TC read failed>"
            print(f"[GALIL ERROR] {s}\n  → {e}\n  → TC: {tc}")
            raise
        except Exception:
            print(f"[GALIL ERROR] {s}")
            raise

    # -------------- PT helpers --------------
    def _enter_pt(self):
        """Enable Position Tracking mode (validates PA during motion, no BG)."""
        try:
            # Ensure idle profiler before enabling PT
            self._cmd("ST", quiet=True)
            self._cmd("PT 1,1", quiet=True)
            print("[OK] PT mode enabled for streaming PA updates.")
        except Exception:
            print("[WARN] Could not enable PT mode; continuing classic mode.")
            self.streaming = False

    def _exit_pt(self):
        """Disable Position Tracking mode to allow classic PA/PR + BG motion."""
        try:
            self._cmd("ST", quiet=True)
            self._cmd("PT 0,0", quiet=True)
            print("[OK] PT mode disabled (classic BG motion allowed).")
        except Exception:
            pass

    # -------------- setup / sync --------------
    def _set_params(self):
        """Set motion parameters using classic X,Y forms (no named-axis)."""
        self._cmd("AB", quiet=True)          # abort any left-over program/motion
        self._cmd("ST", quiet=True)          # stop
        self._cmd("WT 20", quiet=True)       # small wait

        # Motor off (combined, then per-axis fallbacks)
        try:
            self._cmd("MO XY", quiet=True)
        except Exception:
            try: self._cmd("MO X", quiet=True)
            except Exception: pass
            try: self._cmd("MO Y", quiet=True)
            except Exception: pass

        # Define current commanded position = 0,0
        if self._assume_zero:
            try:
                self._cmd("DP 0,0", quiet=True)
            except Exception:
                # Per-axis as fallback
                try: self._cmd("DP X=0", quiet=True)
                except Exception: pass
                try: self._cmd("DP Y=0", quiet=True)
                except Exception: pass

        # Servo on (combined, then per-axis fallbacks)
        try:
            self._cmd("SH XY", quiet=True)
        except Exception:
            try: self._cmd("SH X", quiet=True)
            except Exception: pass
            try: self._cmd("SH Y", quiet=True)
            except Exception: pass

        # Motion parameters — classic comma syntax (firmware-safe)
        self._cmd("AC 200000,200000", quiet=True)  # counts/s^2
        self._cmd("DC 200000,200000", quiet=True)  # counts/s^2
        self._cmd("SP 60000,60000",   quiet=True)  # counts/s

        print("[OK] Motion parameters set." + (" (DP 0,0 applied)" if self._assume_zero else ""))

    def _setup_motion(self):
        """Init: stop, zero (optional), servo on, params (classic syntax)."""
        self._set_params()

    def _sync_from_controller(self):
        """Read TP counts and convert to our deg state."""
        cx, cy = 0.0, 0.0
        try:
            cx = float(self._cmd("TP X", quiet=True).strip())
        except Exception:
            pass
        try:
            cy = float(self._cmd("TP Y", quiet=True).strip())
        except Exception:
            pass
        self.curr_az = cx / self.cnt_az
        self.curr_el = cy / self.cnt_el
        self.curr_pos = [self.curr_az, self.curr_el, 0.0]
        print(f"[SYNC] AZ={self.curr_az:.3f}°, EL_gimbal={self.curr_el:.3f}°")

    def close(self):
        if not self.sim:
            try:
                self._cmd("ST", quiet=True)
            except Exception:
                pass
            # Back out of PT to leave controller in classic state
            if self.streaming:
                self._exit_pt()
            try:
                self.g.GClose()
            except Exception:
                pass
        _safe_save_pos(self.curr_pos)
        print("[CLOSED] Gimbal safely disconnected.")

    # -------------- waits / health --------------
    def _wait_inpos(self, timeout_s=2.0):
        """
        Non-blocking in-position wait using _TNX/_TNY (>=1 means in position).
        Avoids AMX/AMY which have caused timeouts on some units.
        """
        t0 = time.time()
        while time.time() - t0 < timeout_s:
            try:
                tn = self._cmd("MG _TNX,_TNY", quiet=True).strip().split()
                if len(tn) >= 2 and all(float(x) >= 1 for x in tn[:2]):
                    return True
            except Exception:
                pass
            time.sleep(0.02)
        return False

    def _wait_settle_counts(self, tgt_x, tgt_y, timeout_s=2.0, tol_counts=30):
        """
        PT settle helper: poll TP until within tolerance of target counts.
        """
        t0 = time.time()
        while time.time() - t0 < timeout_s:
            try:
                cx = float(self._cmd("TP X", quiet=True).strip())
                cy = float(self._cmd("TP Y", quiet=True).strip())
                if abs(cx - tgt_x) <= tol_counts and abs(cy - tgt_y) <= tol_counts:
                    return True
            except Exception:
                pass
            time.sleep(0.02)
        return False

    # -------------- motion helpers --------------
    def _send_relative_counts(self, dcnt_x, dcnt_y, wait=True):
        """Send PR using classic comma syntax; then BG XY."""
        mv_x = abs(dcnt_x) >= MIN_PR_COUNTS
        mv_y = abs(dcnt_y) >= MIN_PR_COUNTS
        if not (mv_x or mv_y):
            return False

        # Classic comma form: zeros for non-moving axes
        pr_x = int(dcnt_x) if mv_x else 0
        pr_y = int(dcnt_y) if mv_y else 0

        # If currently in PT, temporarily exit to allow BG motion
        was_pt = self.streaming
        if was_pt:
            self._exit_pt()

        self._cmd(f"PR {pr_x},{pr_y}", quiet=True)
        self._cmd("BG XY", quiet=True)

        if wait:
            self._wait_inpos(timeout_s=2.0)

        # Re-enter PT if we were streaming before
        if was_pt:
            self._enter_pt()

        return True

    def _send_absolute_counts(self, tgt_x, tgt_y, wait=True):
        """
        Absolute move.
        - In streaming/PT mode: issue PA only (no BG); optionally wait via settle loop.
        - In classic mode: PA + BG, then _TN waits.
        """
        if self.streaming:
            # Drop tiny updates to avoid saturating the bus with negligible changes
            try:
                cx = float(self._cmd("TP X", quiet=True).strip())
                cy = float(self._cmd("TP Y", quiet=True).strip())
            except Exception:
                cx, cy = None, None

            if cx is not None and cy is not None:
                if abs(tgt_x - cx) < MIN_PA_DELTA and abs(tgt_y - cy) < MIN_PA_DELTA:
                    return False

            self._cmd(f"PA {int(tgt_x)},{int(tgt_y)}", quiet=True)
            if wait:
                self._wait_settle_counts(tgt_x, tgt_y, timeout_s=2.0)
            return True

        # Classic (non-PT)
        self._cmd(f"PA {int(tgt_x)},{int(tgt_y)}", quiet=True)
        self._cmd("BG XY", quiet=True)
        if wait:
            self._wait_inpos(timeout_s=2.0)
        return True

    # -------------- public movement API --------------
    def move_absolute(self, az_deg, el_sky_deg, eps_deg=1e-6, wait=True):
        """
        Move to absolute AZ / EL_sky.
        NOTE: If you feed sky elevation here, internal convention assumes:
              EL_gimbal = EL_sky - 90°
        """
        target_az = _clip(float(az_deg), AZ_MIN, AZ_MAX)
        target_el = _clip(float(el_sky_deg) - 90.0, EL_MIN, EL_MAX)

        if self.sim:
            self.curr_az, self.curr_el = target_az, target_el
            self.curr_pos = [self.curr_az, self.curr_el, 0.0]
            return

        curr_cnt_x = int(round(self.curr_az * self.cnt_az))
        curr_cnt_y = int(round(self.curr_el * self.cnt_el))
        tgt_cnt_x  = int(round(target_az * self.cnt_az))
        tgt_cnt_y  = int(round(target_el * self.cnt_el))

        if (tgt_cnt_x == curr_cnt_x) and (tgt_cnt_y == curr_cnt_y):
            return

        # Use classic PA+BG for one-shot absolute moves (exit PT temporarily if needed)
        was_pt = self.streaming
        if was_pt:
            self._exit_pt()
        self._send_absolute_counts(tgt_cnt_x, tgt_cnt_y, wait=wait)  # classic branch inside
        if was_pt:
            self._enter_pt()

        self.curr_az = target_az
        self.curr_el = target_el
        self.curr_pos = [self.curr_az, self.curr_el, 0.0]

    def move_relative(self, d_az, d_el_sky, wait=True):
        """
        Relative movement (ΔAz_deg, ΔEl_sky_deg). Limits enforced.
        """
        new_az = _clip(self.curr_az + float(d_az), AZ_MIN, AZ_MAX)
        new_el = _clip(self.curr_el + float(d_el_sky), EL_MIN, EL_MAX)

        if self.sim:
            self.curr_az, self.curr_el = new_az, new_el
            self.curr_pos = [new_az, new_el, 0.0]
            return

        dcnt_x = int(round((new_az - self.curr_az) * self.cnt_az))
        dcnt_y = int(round((new_el - self.curr_el) * self.cnt_el))

        if self._send_relative_counts(dcnt_x, dcnt_y, wait=wait):
            self.curr_az, self.curr_el = new_az, new_el
            self.curr_pos = [new_az, new_el, 0.0]

    def degSteer(self, az_gim, el_gim, absolute=True, wait=False, eps_deg=1e-6):
        """
        Steer using *gimbal-frame* degrees (AZ: -90..+90, EL: -90..+90).
        absolute=True  → target absolute (recommended for streaming/PT)
        absolute=False → relative (classic PR path; will temporarily exit PT)
        """
        target_az = _clip(float(az_gim), AZ_MIN, AZ_MAX)
        target_el = _clip(float(el_gim), EL_MIN, EL_MAX)

        if self.sim:
            if absolute:
                self.curr_az, self.curr_el = target_az, target_el
            else:
                self.curr_az = _clip(self.curr_az + target_az, AZ_MIN, AZ_MAX)
                self.curr_el = _clip(self.curr_el + target_el, EL_MIN, EL_MAX)
            self.curr_pos = [self.curr_az, self.curr_el, 0.0]
            return

        if absolute:
            curr_cnt_x = int(round(self.curr_az * self.cnt_az))
            curr_cnt_y = int(round(self.curr_el * self.cnt_el))
            tgt_cnt_x  = int(round(target_az * self.cnt_az))
            tgt_cnt_y  = int(round(target_el * self.cnt_el))

            if (tgt_cnt_x == curr_cnt_x) and (tgt_cnt_y == curr_cnt_y):
                return

            # Streaming path: PA only (PT), no BG -> prevents TC:7 during frequent updates
            self._send_absolute_counts(tgt_cnt_x, tgt_cnt_y, wait=wait)

            self.curr_az, self.curr_el = target_az, target_el

        else:
            # Relative steering: temporarily exit PT to PR+BG, then re-enter PT
            dcnt_x = int(round(target_az * self.cnt_az))
            dcnt_y = int(round(target_el * self.cnt_el))

            was_pt = self.streaming
            if was_pt:
                self._exit_pt()

            if self._send_relative_counts(dcnt_x, dcnt_y, wait=wait):
                self.curr_az = _clip(self.curr_az + (dcnt_x / self.cnt_az), AZ_MIN, AZ_MAX)
                self.curr_el = _clip(self.curr_el + (dcnt_y / self.cnt_el), EL_MIN, EL_MAX)

            if was_pt:
                self._enter_pt()

        self.curr_pos = [self.curr_az, self.curr_el, 0.0]

    # ---------------- UTILITY ----------------
    def stop(self):
        if not self.sim:
            try:
                self._cmd("ST", quiet=True)
            except Exception:
                pass
        print("[STOP] Motion halted.")

    def go_home(self, wait=True):
        """Return to (AZ=0, EL=0 -> Up)."""
        self.move_absolute(0.0, 90.0, wait=wait)

    def wait(self, t=0.1):
        time.sleep(t)


Copyright © 2025-2026 Galassia-5 Satellite Programme