Skip to content

Hierarchical State Machine

This example shows how to create a hierarchical state machine. The machine is a traffic light with a pedestrian crossing. The pedestrian crossing has a countdown timer. When the timer reaches zero, the pedestrian crossing changes to red and the traffic light changes to green.

import asyncio
import random

from kiwi_cogs import Machine


def is_walking(context, _):
    return context["speed"] <= 11


def is_running(context, _):
    return context["speed"] > 11


walk_states = {
    "initial": "start",
    "states": {
        "start": {
            "transitions": [  # resolved in order
                {"target": "walking", "cond": is_walking},
                {"target": "running", "cond": is_running},
            ],
        },
        "walking": {"events": {"CROSSED": {"target": "crossed"}}},
        "running": {"events": {"CROSSED": {"target": "crossed"}}},
        "crossed": {},
    },
}


pedestrian_states = {
    "initial": "walk",
    "states": {
        "walk": {"events": {"PED_COUNTDOWN": {"target": "wait"}}, **walk_states},
        "wait": {"events": {"PED_COUNTDOWN": {"target": "stop"}}},
        "stop": {},
        "blinking": {},
    },
}


config = {
    "name": "light",
    "initial": "green",
    "context": {"speed": 10},
    "states": {
        "green": {"events": {"TIMER": {"target": "yellow"}}},
        "yellow": {"events": {"TIMER": {"target": "red"}}},
        "red": {"events": {"TIMER": {"target": "green"}}, **pedestrian_states},
    },
    "events": {
        "POWER_OUTAGE": {"target": ".red.blinking"},
        "POWER_RESTORED": {"target": ".red"},
    },
}


async def run():
    machine = await Machine.create(config)
    print(f"Light is: {machine.state.name}")
    while True:
        await machine.event("TIMER")
        print(f"Light is: {machine.state.name}")
        if machine.state.name == "red":
            value = machine.state.value["red"]
            print(f"Pedestrian: {value}")
            await machine.event("CROSSED")
            value = machine.state.value["red"]
            print(f"Pedestrian: {value}")
            await machine.event("PED_COUNTDOWN")
            value = machine.state.value["red"]
            print(f"Pedestrian: {value}")
            await machine.event("PED_COUNTDOWN")
            value = machine.state.value["red"]
            print(f"Pedestrian: {value}")

            # 10 percent chance of power outage
            if random.random() > 0.9:
                await machine.event("POWER_OUTAGE")
                print(f"Power outage! {machine.state.name}")
                print(f"Battery: {machine.state.value}")
                await asyncio.sleep(3)
                await machine.event("POWER_RESTORED")
            else:
                await asyncio.sleep(1)


if __name__ == "__main__":
    asyncio.run(run())

Example output:

Light is: green
Light is: yellow
Light is: red
Pedestrian: {'walk': 'walking'}
Pedestrian: {'walk': 'crossed'}
Pedestrian: wait
Pedestrian: stop
Light is: green
Light is: yellow
Light is: red
Pedestrian: {'walk': 'walking'}
Pedestrian: {'walk': 'crossed'}
Pedestrian: wait
Pedestrian: stop
Light is: green
Light is: yellow
Light is: red
Pedestrian: {'walk': 'walking'}
Pedestrian: {'walk': 'crossed'}
Pedestrian: wait
Pedestrian: stop
Light is: green
Light is: yellow
Light is: red
Pedestrian: {'walk': 'walking'}
Pedestrian: {'walk': 'crossed'}
Pedestrian: wait
Pedestrian: stop