API Reference

Core Classes

Z21Station

class z21aio.Z21Station[source]

Bases: object

Z21 DCC command station controller.

Provides async methods for communicating with a Z21 station over UDP. Supports multiple simultaneous connections to different stations.

Example

async with await Z21Station.connect(“192.168.0.111”) as station:

await station.voltage_on() serial = await station.get_serial_number() print(f”Serial: {serial}”)

async classmethod connect(host, port=21105, timeout=2.0, *, keep_alive=True)[source]

Connect to a Z21 station.

Parameters:
  • host (str) – IP address of the Z21 station

  • port (int) – UDP port (default 21105)

  • timeout (float) – Command timeout in seconds (default 2.0)

  • keep_alive (bool) – Start the 20-second keep-alive loop automatically (default True).

Returns:

Connected Z21Station instance

Return type:

Z21Station

start_keep_alive()[source]

Start the keep-alive background task if not already running.

Return type:

None

async stop_keep_alive()[source]

Cancel the keep-alive background task if running.

Return type:

None

async send_packet(packet)[source]

Send a packet to the Z21 station.

Parameters:

packet (Packet) – Packet to send

Return type:

None

async receive_packet(header, timeout=None)[source]

Wait for a packet with the specified header.

Parameters:
  • header (int) – Expected packet header

  • timeout (float | None) – Timeout in seconds (uses default if None)

Returns:

Received packet

Raises:
  • asynci

  • o.TimeoutError – If no packet received within timeout

Return type:

Packet

async send_xbus_command(msg, expected_response_header=None)[source]

Send an XBus command and optionally wait for response.

Parameters:
  • msg (XBusMessage) – XBus message to send

  • expected_response_header (int | None) – XBus header to wait for (None = no wait)

Returns:

Response XBusMessage if expected_response_header specified, else None

Return type:

XBusMessage | None

async get_serial_number()[source]

Get the Z21 station serial number.

Returns:

Station serial number as integer

Return type:

int

async discover_devices()[source]

Get the Z21 station devices.

Returns:

None

Return type:

None

async get_firmware_version()[source]

Get the Z21 station firmware version.

Returns:

Tuple of (major, minor) version in BCD format. For example, (1, 30) represents firmware version 1.30

Return type:

tuple[int, int]

async get_version()[source]

Get the X-BUS protocol version and command station ID.

Returns:

Tuple of (xbus_version, command_station_id). xbus_version is in BCD format (e.g., 0x36 = version 3.6). command_station_id identifies the type of command station.

Return type:

tuple[int, int]

async voltage_on()[source]

Turn on track power.

Return type:

None

async voltage_off()[source]

Turn off track power (emergency stop all locomotives).

Return type:

None

async logout()[source]

Send logout/disconnect command to Z21.

Return type:

None

subscribe_system_state(callback, freq_hz=1.0)[source]

Subscribe to system state updates.

Parameters:
  • callback (Callable[[SystemState], None]) – Function called with SystemState on each update

  • freq_hz (float) – Polling frequency in Hz (default 1.0)

Returns:

Background task handle (can be cancelled)

Return type:

Task[None]

subscribe_track_power(callback)[source]

Subscribe to track power state change broadcasts.

The callback is called whenever the track power state changes, whether triggered by this client or an external device (e.g., multiMaus).

Requires broadcast flag 0x00000001, which is set by default.

Parameters:

callback (Callable[[bool], None]) – Called with True when track power turns on, False when it turns off.

Return type:

None

async enable_railcom_broadcasts(all_locos=False)[source]

Enable RailCom data broadcasts.

Parameters:

all_locos (bool) – If True, receive RailCom data for all locos. If False (default), only receive data for subscribed locos.

Return type:

None

Note

all_locos=True requires firmware 1.29+

async disable_railcom_broadcasts()[source]

Disable RailCom data broadcasts.

Return type:

None

async get_railcom_data(address=None, timeout=None)[source]

Request RailCom data for a specific locomotive or next in queue.

Parameters:
  • address (int | None) – DCC address to query, or None for circular polling

  • timeout (float | None) – Response timeout in seconds (uses default if None)

Returns:

RailComData for the queried locomotive

Raises:

asyncio.TimeoutError – If no response within timeout

Return type:

RailComData

Note

Requires firmware 1.29+

subscribe_railcom(callback, address=None)[source]

Subscribe to RailCom data broadcasts.

Parameters:
  • callback (Callable[[RailComData], None]) – Function called with RailComData on each update

  • address (int | None) – If specified, filter for this address only. If None, receive all RailCom broadcasts.

Return type:

None

Note

Call enable_railcom_broadcasts() first to receive broadcasts.

subscribe_railcom_polled(callback, address=None, freq_hz=1.0)[source]

Subscribe to RailCom data via polling.

Polls the Z21 at the specified frequency for RailCom data. Useful when broadcast flags cannot be changed or for specific addresses.

Parameters:
  • callback (Callable[[RailComData], None]) – Function called with RailComData on each poll

  • address (int | None) – Specific address to poll, or None for circular polling

  • freq_hz (float) – Polling frequency in Hz (default 1.0)

Returns:

Background task handle (can be cancelled)

Return type:

Task[None]

subscribe_loco_state(callback)[source]

Subscribe to locomotive state updates from all locomotives.

The callback will be called whenever the station broadcasts a state update for any locomotive. The LocoState object includes the locomotive address, speed, functions, and other state.

Parameters:

callback (Callable[[LocoState], None]) – Function called with LocoState for each update. Receives updates for ALL locomotives.

Return type:

None

Example

def on_any_loco_state(state: LocoState):

print(f”Loco {state.address}: speed={state.speed_percentage}%”)

station.subscribe_loco_state(on_any_loco_state)

subscribe_turnout_state(callback)[source]

Subscribe to turnout state updates from all turnouts.

The callback will be called whenever the station broadcasts a state update for any turnout.

Parameters:

callback (Callable[[TurnoutState], None]) – Function called with TurnoutState for each update. Receives updates for ALL turnouts.

Return type:

None

Example

def on_any_turnout_state(state: TurnoutState):

print(f”Turnout {state.address}: position={state.position.name}”)

station.subscribe_turnout_state(on_any_turnout_state)

async close()[source]

Close the connection and clean up resources.

Stops keep-alive task, sends logout, and closes transport.

Return type:

None

async __aenter__()[source]

Return the station instance for async context managers.

Return type:

Self

async __aexit__(exc_type, exc_val, exc_tb)[source]

Close the station when leaving an async context.

Parameters:
Return type:

None

__repr__()[source]

Return a string representation of the station.

Return type:

str

Loco

class z21aio.Loco(station, address, steps=DccThrottleSteps.STEPS_128)[source]

Bases: object

Locomotive controller.

Provides methods for controlling a single DCC locomotive including speed, direction, and function control.

Example

loco = await Loco.control(station, address=3) await loco.set_headlights(True) await loco.drive(50.0) # 50% forward await loco.drive(50.0, reverse=True) # 50% reverse await loco.stop() # Normal stop (decelerate) await loco.estop() # Emergency stop (immediate)

Parameters:
__init__(station, address, steps=DccThrottleSteps.STEPS_128)[source]

Initialize locomotive controller.

Use Loco.control() class method for proper initialization.

Parameters:
  • station (Z21Station) – Z21Station instance

  • address (int) – DCC locomotive address (1-9999)

  • steps (DccThrottleSteps) – Throttle step mode (default 128 steps)

Return type:

None

async classmethod control(station, address, steps=DccThrottleSteps.STEPS_128)[source]

Get control of a locomotive.

Parameters:
  • station (Z21Station) – Z21Station instance

  • address (int) – DCC locomotive address (1-9999)

  • steps (DccThrottleSteps) – Throttle step mode (default 128 steps)

Returns:

Loco instance ready for control

Return type:

Loco

property address: int

DCC address of this locomotive.

property steps: DccThrottleSteps

Throttle step mode for this locomotive.

async drive(speed_percent, reverse=False)[source]

Set locomotive speed and direction.

Parameters:
  • speed_percent (float) – Speed as percentage (0 to 100)

  • reverse (bool) – Direction, False = forward, True = reverse

Return type:

None

async stop(reverse=False)[source]

Normal stop with braking curve.

The locomotive will decelerate according to its decoder settings.

Parameters:

reverse (bool)

Return type:

None

async estop(reverse=False)[source]

Emergency stop (immediate halt).

The locomotive will stop immediately without deceleration.

Parameters:

reverse (bool)

Return type:

None

async set_function(index, action)[source]

Set a locomotive function.

Parameters:
  • index (int) – Function number (0-31)

  • action (FunctionAction) – Action to perform (OFF, ON, TOGGLE)

Raises:

ValueError – If index is not 0-31

Return type:

None

async function_on(index)[source]

Turn on a locomotive function.

Parameters:

index (int) – Function number (0-31)

Return type:

None

async function_off(index)[source]

Turn off a locomotive function.

Parameters:

index (int) – Function number (0-31)

Return type:

None

async function_toggle(index)[source]

Toggle a locomotive function.

Parameters:

index (int) – Function number (0-31)

Return type:

None

async set_headlights(on)[source]

Turn headlights on or off (F0).

Parameters:

on (bool) – True to turn on, False to turn off

Return type:

None

async get_state()[source]

Get current locomotive state.

Returns:

LocoState with current speed, direction, and function states

Raises:

asyncio.TimeoutError – If no response received

Return type:

LocoState

subscribe_state(callback)[source]

Subscribe to locomotive state updates.

The callback will be called whenever the station broadcasts an update for this locomotive’s address.

Parameters:

callback (Callable[[LocoState], None]) – Function called with LocoState on each update

Return type:

None

property railcom: RailComData | None

Current RailCom data for this locomotive.

Returns None if no RailCom subscription is active or no data received. Subscribe with subscribe_railcom() to receive updates.

async get_railcom_data(timeout=None)[source]

Request RailCom data for this locomotive.

Parameters:

timeout (float | None) – Response timeout in seconds (uses station default if None)

Returns:

RailComData for this locomotive

Raises:

asyncio.TimeoutError – If no response within timeout

Return type:

RailComData

Note

Requires firmware 1.29+ and RailCom-capable decoder

subscribe_railcom(callback=None)[source]

Subscribe to RailCom data updates for this locomotive.

Updates the railcom property and optionally calls a callback.

Parameters:

callback (Callable[[RailComData], None] | None) – Optional function called with RailComData on each update. If None, only updates the railcom property.

Return type:

None

Note

Requires enabling RailCom broadcasts on the station first.

Message Handling

Packet

class z21aio.Packet(header, data=b'')[source]

Bases: object

Z21 LAN packet.

The packet format is: - 2 bytes: Total length (little-endian), includes header - 2 bytes: Message header/type (little-endian) - N bytes: Payload data

Parameters:
header

16-bit message type identifier

Type:

int

data

Variable-length payload bytes

Type:

bytes

header: int
data: bytes = b''
property data_len: int

Total packet length including 4-byte header.

to_bytes()[source]

Serialize packet to bytes.

Returns:

Bytes in Z21 LAN packet format

Return type:

bytes

classmethod from_bytes(data)[source]

Deserialize packet from bytes.

Parameters:

data (bytes) – Raw packet bytes

Returns:

Packet instance

Raises:

ValueError – If data is too short (< 4 bytes)

Return type:

Packet

classmethod with_header(header)[source]

Create a packet with just a header (no data).

Parameters:

header (int) – Message header/type

Returns:

Packet with empty data

Return type:

Packet

classmethod with_header_and_data(header, data)[source]

Create a packet with header and data.

Parameters:
  • header (int) – Message header/type

  • data (bytes) – Payload bytes

Returns:

Packet with specified header and data

Return type:

Packet

__init__(header, data=b'')
Parameters:
Return type:

None

XBusMessage

class z21aio.XBusMessage(x_header, dbs=b'')[source]

Bases: object

XBus protocol message.

XBus messages are encapsulated within LAN packets with header 0x40. Format: [x_header][data_bytes…][xor_checksum]

The XOR checksum is calculated as XOR of x_header and all data bytes.

Parameters:
x_header

Command type byte

Type:

int

dbs

Data bytes (variable length)

Type:

bytes

x_header: int
dbs: bytes = b''
property xor: int

Calculate XOR checksum of x_header and all data bytes.

to_bytes()[source]

Serialize to bytes with XOR checksum.

Returns:

[x_header][data_bytes][xor]

Return type:

Bytes

classmethod from_bytes(data)[source]

Parse XBus message from bytes with XOR validation.

Parameters:

data (bytes) – Raw XBus message bytes

Returns:

XBusMessage instance

Raises:

ValueError – If XOR checksum is invalid

Return type:

XBusMessage

classmethod get_firmware_version()[source]

Create command to request firmware version.

Return type:

XBusMessage

classmethod get_version()[source]

Create command to request X-BUS version and command station ID.

Return type:

XBusMessage

classmethod track_power_on()[source]

Create command to turn on track power.

Return type:

XBusMessage

classmethod track_power_off()[source]

Create command to turn off track power (emergency stop).

Return type:

XBusMessage

classmethod loco_get_info(address)[source]

Create command to request locomotive state.

Parameters:

address (int) – DCC locomotive address (1-9999)

Returns:

XBusMessage for getting locomotive info

Return type:

XBusMessage

classmethod loco_drive(address, steps, speed_byte)[source]

Create command to drive a locomotive.

Parameters:
  • address (int) – DCC locomotive address (1-9999)

  • steps (DccThrottleSteps) – Throttle step mode (14/28/128)

  • speed_byte (int) – Speed value with direction bit 7 (0x00 = stop, 0x01 = emergency stop)

Returns:

XBusMessage for driving locomotive

Return type:

XBusMessage

classmethod loco_function(address, function, action)[source]

Create command to control a locomotive function.

Parameters:
  • address (int) – DCC locomotive address (1-9999)

  • function (int) – Function number (0-31)

  • action (FunctionAction) – Action to perform (OFF, ON, TOGGLE)

Returns:

XBusMessage for controlling locomotive function

Raises:

ValueError – If function is not 0-31

Return type:

XBusMessage

classmethod get_turnout_info(address)[source]

Create command to request turnout state.

Parameters:

address (int) – Turnout function address (0-2047)

Returns:

XBusMessage for getting turnout info

Return type:

XBusMessage

classmethod set_turnout(address, output, activate, queue_mode=True)[source]

Create command to switch a turnout output.

Parameters:
  • address (int) – Turnout function address (0-2047)

  • output (int) – Output number (0 or 1)

  • activate (bool) – True to activate, False to deactivate

  • queue_mode (bool) – True for queue mode (Z21 handles timing), False for immediate (client handles timing)

Returns:

XBusMessage for switching turnout

Raises:

ValueError – If output is not 0 or 1

Return type:

XBusMessage

__init__(x_header, dbs=b'')
Parameters:
Return type:

None

Data Types

class z21aio.SystemState(main_current, prog_current, filtered_main_current, temperature, supply_voltage, vcc_voltage, central_state, central_state_ex, reserved, capabilities)[source]

Bases: object

Z21 system state (16 bytes).

Contains information about the command station’s current state including current, voltage, and temperature readings.

Parameters:
  • main_current (int)

  • prog_current (int)

  • filtered_main_current (int)

  • temperature (int)

  • supply_voltage (int)

  • vcc_voltage (int)

  • central_state (int)

  • central_state_ex (int)

  • reserved (int)

  • capabilities (int)

main_current: int
prog_current: int
filtered_main_current: int
temperature: int
supply_voltage: int
vcc_voltage: int
central_state: int
central_state_ex: int
reserved: int
capabilities: int
classmethod from_bytes(data)[source]

Parse SystemState from 16 bytes of data.

Parameters:

data (bytes) – 16 bytes of system state data

Returns:

SystemState instance

Raises:

ValueError – If data length is not 16 bytes

Return type:

SystemState

property is_track_voltage_off: bool

Check if track voltage is off.

property is_short_circuit: bool

Check if short circuit detected.

property is_programming_mode: bool

Check if in programming mode.

__init__(main_current, prog_current, filtered_main_current, temperature, supply_voltage, vcc_voltage, central_state, central_state_ex, reserved, capabilities)
Parameters:
  • main_current (int)

  • prog_current (int)

  • filtered_main_current (int)

  • temperature (int)

  • supply_voltage (int)

  • vcc_voltage (int)

  • central_state (int)

  • central_state_ex (int)

  • reserved (int)

  • capabilities (int)

Return type:

None

class z21aio.LocoState(address, is_busy=None, stepping=None, speed_percentage=None, speed_value=None, reverse=None, double_traction=None, smart_search=None, functions=None)[source]

Bases: object

Locomotive state (variable length, 2-9 bytes).

Contains information about a locomotive’s current state including speed, direction, and function states.

Parameters:
  • address (int)

  • is_busy (bool | None)

  • stepping (DccThrottleSteps | None)

  • speed_percentage (float | None)

  • speed_value (int | None)

  • reverse (bool | None)

  • double_traction (bool | None)

  • smart_search (bool | None)

  • functions (list[bool] | None)

address: int
is_busy: bool | None = None
stepping: DccThrottleSteps | None = None
speed_percentage: float | None = None
speed_value: int | None = None
reverse: bool | None = None
double_traction: bool | None = None
functions: list[bool] | None = None
property is_estop: bool

Return True if this state represents an emergency stop.

In the Z21 protocol, emergency stop is encoded as speed value 1, regardless of throttle step mode.

classmethod from_bytes(data)[source]

Parse LocoState from variable-length data.

Parameters:

data (bytes) – 2-9 bytes of locomotive state data

Returns:

LocoState instance

Raises:

ValueError – If data length is less than 2 bytes

Return type:

LocoState

__init__(address, is_busy=None, stepping=None, speed_percentage=None, speed_value=None, reverse=None, double_traction=None, smart_search=None, functions=None)
Parameters:
  • address (int)

  • is_busy (bool | None)

  • stepping (DccThrottleSteps | None)

  • speed_percentage (float | None)

  • speed_value (int | None)

  • reverse (bool | None)

  • double_traction (bool | None)

  • smart_search (bool | None)

  • functions (list[bool] | None)

Return type:

None

class z21aio.RailComData(loco_address, receive_counter, error_counter, options, speed, qos)[source]

Bases: object

RailCom feedback data (13 bytes).

Contains RailCom data broadcast from the Z21 command station, providing real-time feedback from RailCom-equipped decoders.

Parameters:
loco_address

Detected decoder address

Type:

int

receive_counter

Number of valid RailCom messages received

Type:

int

error_counter

Number of RailCom reception errors

Type:

int

options

Option flags (speed type, QoS validity)

Type:

z21aio.types.RailComOptions

speed

Current speed value (interpretation depends on options)

Type:

int

qos

Quality of Service value (0-255, higher is better)

Type:

int

loco_address: int
receive_counter: int
error_counter: int
options: RailComOptions
speed: int
qos: int
classmethod from_bytes(data)[source]

Parse RailComData from 13 bytes of data.

Parameters:

data (bytes) – 13 bytes of RailCom data

Returns:

RailComData instance

Raises:

ValueError – If data length is not 13 bytes

Return type:

RailComData

property has_speed1: bool

Check if speed field contains Speed 1 value.

property has_speed2: bool

Check if speed field contains Speed 2 value.

property has_qos: bool

Check if QoS value is valid.

property error_rate: float

Calculate error rate as percentage.

Returns:

Error rate (0.0 to 100.0), or 0.0 if no messages received

__init__(loco_address, receive_counter, error_counter, options, speed, qos)
Parameters:
Return type:

None

class z21aio.RailComOptions(*values)[source]

Bases: IntFlag

RailCom option flags from the Options byte.

NONE = 0
SPEED1 = 1
SPEED2 = 2
QOS = 4
class z21aio.DccThrottleSteps(*values)[source]

Bases: IntEnum

DCC throttle step modes.

STEPS_14 = 0
STEPS_28 = 2
STEPS_128 = 4
classmethod from_byte(value)[source]

Convert byte value to DccThrottleSteps.

Parameters:

value (int)

Return type:

DccThrottleSteps

to_speed_byte()[source]

Convert to speed command byte prefix.

Return type:

int

property max_speed: int

Maximum speed value for this throttle mode.

class z21aio.FunctionAction(*values)[source]

Bases: IntEnum

Action to perform on a locomotive function.

OFF = 0
ON = 1
TOGGLE = 2

Low-Level Details

Constants and Protocol Headers

Protocol constants, message headers, and header names are defined in the low-level modules.

The z21aio.headers module contains: - LAN packet header constants - X-Bus message header constants - Helper functions for header name lookup

The z21aio.messages module contains: - Broadcast flag constants - LAN command constants - XBusMessage class for low-level protocol messages