Source code for z21aio.loco

"""Locomotive control.

Provides the Loco class for controlling DCC locomotives via Z21.
"""

from __future__ import annotations

from collections.abc import Callable
import contextlib
import logging
from typing import TYPE_CHECKING

from z21aio.packet import Packet

from .messages import XBUS_LOCO_INFO, XBusMessage
from .types import DccThrottleSteps, FunctionAction, LocoState, RailComData

if TYPE_CHECKING:
    from .station import Z21Station

log = logging.getLogger(__name__)


def _calc_speed_byte(
    steps: DccThrottleSteps, speed_percent: float, reverse: bool = False
) -> int:
    """Calculate speed byte from percentage and throttle steps.

    Args:
        steps: Throttle step mode
        speed_percent: Speed as percentage (0 to 100)
        reverse: Direction, False = forward, True = reverse

    Returns:
        Speed byte with direction bit (bit 7)
    """
    # Clamp speed to valid range
    speed_percent = max(0.0, min(100.0, speed_percent))

    # Map percentage to throttle steps
    max_speed = steps.max_speed
    speed_value = int((speed_percent / 100.0) * max_speed)

    # Clamp to valid range (0-127 for step values)
    speed_value = min(speed_value, 127)

    return _combine_speed_direction(speed_value, reverse)


def _combine_speed_direction(speed_value: int, reverse: bool) -> int:
    """Combine speed value with direction bit."""
    if reverse:
        return speed_value
    return speed_value | 0x80


[docs] class Loco: """Locomotive controller. Provides methods for controlling a single DCC locomotive including speed, direction, and function control. Example: loco = await Loco.control(station, address=3) await loco.set_headlights(True) await loco.drive(50.0) # 50% forward await loco.drive(50.0, reverse=True) # 50% reverse await loco.stop() # Normal stop (decelerate) await loco.estop() # Emergency stop (immediate) """
[docs] def __init__( self, station: Z21Station, address: int, steps: DccThrottleSteps = DccThrottleSteps.STEPS_128, ) -> None: """Initialize locomotive controller. Use Loco.control() class method for proper initialization. Args: station: Z21Station instance address: DCC locomotive address (1-9999) steps: Throttle step mode (default 128 steps) """ self._station = station self._address = address self._steps = steps self._railcom: RailComData | None = None
[docs] @classmethod async def control( cls, station: Z21Station, address: int, steps: DccThrottleSteps = DccThrottleSteps.STEPS_128, ) -> Loco: """Get control of a locomotive. Args: station: Z21Station instance address: DCC locomotive address (1-9999) steps: Throttle step mode (default 128 steps) Returns: Loco instance ready for control """ loco = cls(station, address, steps) # Request initial state to "register" with the station # It's OK if we don't get a response - loco may not be on track with contextlib.suppress(TimeoutError): await loco.get_state() return loco
@property def address(self) -> int: """DCC address of this locomotive.""" return self._address @property def steps(self) -> DccThrottleSteps: """Throttle step mode for this locomotive.""" return self._steps
[docs] async def drive(self, speed_percent: float, reverse: bool = False) -> None: """Set locomotive speed and direction. Args: speed_percent: Speed as percentage (0 to 100) reverse: Direction, False = forward, True = reverse """ speed_byte = _calc_speed_byte(self._steps, speed_percent, reverse) msg = XBusMessage.loco_drive(self._address, self._steps, speed_byte) await self._station.send_xbus_command(msg)
[docs] async def stop(self, reverse: bool = False) -> None: """Normal stop with braking curve. The locomotive will decelerate according to its decoder settings. """ speed_byte = _combine_speed_direction(0x00, reverse) msg = XBusMessage.loco_drive(self._address, self._steps, speed_byte) await self._station.send_xbus_command(msg)
[docs] async def estop(self, reverse: bool = False) -> None: """Emergency stop (immediate halt). The locomotive will stop immediately without deceleration. """ speed_byte = _combine_speed_direction(0x01, reverse) msg = XBusMessage.loco_drive(self._address, self._steps, speed_byte) await self._station.send_xbus_command(msg)
[docs] async def set_function(self, index: int, action: FunctionAction) -> None: """Set a locomotive function. Args: index: Function number (0-31) action: Action to perform (OFF, ON, TOGGLE) Raises: ValueError: If index is not 0-31 """ if not 0 <= index <= 31: raise ValueError(f"Function index must be 0-31, got {index}") msg = XBusMessage.loco_function(self._address, index, action) await self._station.send_xbus_command(msg)
[docs] async def function_on(self, index: int) -> None: """Turn on a locomotive function. Args: index: Function number (0-31) """ await self.set_function(index, FunctionAction.ON)
[docs] async def function_off(self, index: int) -> None: """Turn off a locomotive function. Args: index: Function number (0-31) """ await self.set_function(index, FunctionAction.OFF)
[docs] async def function_toggle(self, index: int) -> None: """Toggle a locomotive function. Args: index: Function number (0-31) """ await self.set_function(index, FunctionAction.TOGGLE)
[docs] async def set_headlights(self, on: bool) -> None: """Turn headlights on or off (F0). Args: on: True to turn on, False to turn off """ action = FunctionAction.ON if on else FunctionAction.OFF await self.set_function(0, action)
[docs] async def get_state(self) -> LocoState: """Get current locomotive state. Returns: LocoState with current speed, direction, and function states Raises: asyncio.TimeoutError: If no response received """ msg = XBusMessage.loco_get_info(self._address) response = await self._station.send_xbus_command(msg, XBUS_LOCO_INFO) if response is None: raise RuntimeError("No response received") state = LocoState.from_bytes(response.dbs) log.debug("Got loco state %s", state) return state
[docs] def subscribe_state( self, callback: Callable[[LocoState], None], ) -> None: """Subscribe to locomotive state updates. The callback will be called whenever the station broadcasts an update for this locomotive's address. Args: callback: Function called with LocoState on each update """ def handle_packet(packet: Packet) -> None: try: xbus_msg = XBusMessage.from_bytes(packet.data) if xbus_msg.x_header == XBUS_LOCO_INFO: state = LocoState.from_bytes(xbus_msg.dbs) if state.address == self._address: callback(state) except (ValueError, TypeError) as e: log.error("Got exception handling packet in loco state callback %s", e) if XBUS_LOCO_INFO not in self._station._subscribers: self._station._subscribers[XBUS_LOCO_INFO] = [] self._station._subscribers[XBUS_LOCO_INFO].append(handle_packet)
@property def railcom(self) -> RailComData | None: """Current RailCom data for this locomotive. Returns None if no RailCom subscription is active or no data received. Subscribe with subscribe_railcom() to receive updates. """ return self._railcom
[docs] async def get_railcom_data(self, timeout: float | None = None) -> RailComData: """Request RailCom data for this locomotive. Args: timeout: Response timeout in seconds (uses station default if None) Returns: RailComData for this locomotive Raises: asyncio.TimeoutError: If no response within timeout Note: Requires firmware 1.29+ and RailCom-capable decoder """ return await self._station.get_railcom_data(self._address, timeout)
[docs] def subscribe_railcom( self, callback: Callable[[RailComData], None] | None = None, ) -> None: """Subscribe to RailCom data updates for this locomotive. Updates the railcom property and optionally calls a callback. Args: callback: Optional function called with RailComData on each update. If None, only updates the railcom property. Note: Requires enabling RailCom broadcasts on the station first. """ def handle_railcom(railcom_data: RailComData) -> None: self._railcom = railcom_data if callback is not None: callback(railcom_data) self._station.subscribe_railcom(handle_railcom, self._address)
def __repr__(self) -> str: return f"Loco(address={self._address}, steps={self._steps.name})"