feat(integration): Complete feature rewrite with pipeline architecture, effects system, and display improvements

Major changes:
- Pipeline architecture with capability-based dependency resolution
- Effects plugin system with performance monitoring
- Display abstraction with multiple backends (terminal, null, websocket)
- Camera system for viewport scrolling
- Sensor framework for real-time input
- Command-and-control system via ntfy
- WebSocket display backend for browser clients
- Comprehensive test suite and documentation

Issue #48: ADR for preset scripting language included

This commit consolidates 110 individual commits into a single
feature integration that can be reviewed and tested before
further refinement.
This commit is contained in:
2026-03-20 04:41:23 -07:00
parent 42aa6f16cc
commit ef98add0c5
179 changed files with 27649 additions and 6552 deletions

222
scripts/demo_hot_rebuild.py Normal file
View File

@@ -0,0 +1,222 @@
#!/usr/bin/env python3
"""
Demo script for testing pipeline hot-rebuild and state preservation.
Usage:
python scripts/demo_hot_rebuild.py
python scripts/demo_hot_rebuild.py --viewport 40x15
This script:
1. Creates a small viewport (40x15) for easier capture
2. Uses NullDisplay with recording enabled
3. Runs the pipeline for N frames (capturing initial state)
4. Triggers a "hot-rebuild" (e.g., toggling an effect stage)
5. Runs the pipeline for M more frames
6. Verifies state preservation by comparing frames before/after rebuild
7. Prints visual comparison to stdout
"""
import sys
import time
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.display import DisplayRegistry
from engine.effects import get_registry
from engine.fetch import load_cache
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
from engine.pipeline.adapters import (
EffectPluginStage,
FontStage,
SourceItemsToBufferStage,
ViewportFilterStage,
create_stage_from_display,
create_stage_from_effect,
)
from engine.pipeline.params import PipelineParams
def run_demo(viewport_width: int = 40, viewport_height: int = 15):
"""Run the hot-rebuild demo."""
print(f"\n{'=' * 60}")
print(f"Pipeline Hot-Rebuild Demo")
print(f"Viewport: {viewport_width}x{viewport_height}")
print(f"{'=' * 60}\n")
import engine.effects.plugins as effects_plugins
effects_plugins.discover_plugins()
print("[1/6] Loading source items...")
items = load_cache()
if not items:
print(" ERROR: No fixture cache available")
sys.exit(1)
print(f" Loaded {len(items)} items")
print("[2/6] Creating NullDisplay with recording...")
display = DisplayRegistry.create("null")
display.init(viewport_width, viewport_height)
display.start_recording()
print(" Recording started")
print("[3/6] Building pipeline...")
params = PipelineParams()
params.viewport_width = viewport_width
params.viewport_height = viewport_height
config = PipelineConfig(
source="fixture",
display="null",
camera="scroll",
effects=["noise", "fade"],
)
pipeline = Pipeline(config=config, context=PipelineContext())
from engine.data_sources.sources import ListDataSource
from engine.pipeline.adapters import DataSourceStage
list_source = ListDataSource(items, name="fixture")
pipeline.add_stage("source", DataSourceStage(list_source, name="fixture"))
pipeline.add_stage("viewport_filter", ViewportFilterStage(name="viewport-filter"))
pipeline.add_stage("font", FontStage(name="font"))
effect_registry = get_registry()
for effect_name in config.effects:
effect = effect_registry.get(effect_name)
if effect:
pipeline.add_stage(
f"effect_{effect_name}",
create_stage_from_effect(effect, effect_name),
)
pipeline.add_stage("display", create_stage_from_display(display, "null"))
pipeline.build()
if not pipeline.initialize():
print(" ERROR: Failed to initialize pipeline")
sys.exit(1)
print(" Pipeline built and initialized")
ctx = pipeline.context
ctx.params = params
ctx.set("display", display)
ctx.set("items", items)
ctx.set("pipeline", pipeline)
ctx.set("pipeline_order", pipeline.execution_order)
ctx.set("camera_y", 0)
print("[4/6] Running pipeline for 10 frames (before rebuild)...")
frames_before = []
for frame in range(10):
params.frame_number = frame
ctx.params = params
result = pipeline.execute(items)
if result.success:
frames_before.append(display._last_buffer)
print(f" Captured {len(frames_before)} frames")
print("[5/6] Triggering hot-rebuild (toggling 'fade' effect)...")
fade_stage = pipeline.get_stage("effect_fade")
if fade_stage and isinstance(fade_stage, EffectPluginStage):
new_enabled = not fade_stage.is_enabled()
fade_stage.set_enabled(new_enabled)
fade_stage._effect.config.enabled = new_enabled
print(f" Fade effect enabled: {new_enabled}")
else:
print(" WARNING: Could not find fade effect stage")
print("[6/6] Running pipeline for 10 more frames (after rebuild)...")
frames_after = []
for frame in range(10, 20):
params.frame_number = frame
ctx.params = params
result = pipeline.execute(items)
if result.success:
frames_after.append(display._last_buffer)
print(f" Captured {len(frames_after)} frames")
display.stop_recording()
print("\n" + "=" * 60)
print("RESULTS")
print("=" * 60)
print("\n[State Preservation Check]")
if frames_before and frames_after:
last_before = frames_before[-1]
first_after = frames_after[0]
if last_before == first_after:
print(" PASS: Buffer state preserved across rebuild")
else:
print(" INFO: Buffer changed after rebuild (expected - effect toggled)")
print("\n[Frame Continuity Check]")
recorded_frames = display.get_frames()
print(f" Total recorded frames: {len(recorded_frames)}")
print(f" Frames before rebuild: {len(frames_before)}")
print(f" Frames after rebuild: {len(frames_after)}")
if len(recorded_frames) == 20:
print(" PASS: All frames recorded")
else:
print(" WARNING: Frame count mismatch")
print("\n[Visual Comparison - First frame before vs after rebuild]")
print("\n--- Before rebuild (frame 9) ---")
for i, line in enumerate(frames_before[0][:viewport_height]):
print(f"{i:2}: {line}")
print("\n--- After rebuild (frame 10) ---")
for i, line in enumerate(frames_after[0][:viewport_height]):
print(f"{i:2}: {line}")
print("\n[Recording Save/Load Test]")
test_file = Path("/tmp/test_recording.json")
display.save_recording(test_file)
print(f" Saved recording to: {test_file}")
display2 = DisplayRegistry.create("null")
display2.init(viewport_width, viewport_height)
display2.load_recording(test_file)
loaded_frames = display2.get_frames()
print(f" Loaded {len(loaded_frames)} frames from file")
if len(loaded_frames) == len(recorded_frames):
print(" PASS: Recording save/load works correctly")
else:
print(" WARNING: Frame count mismatch after load")
test_file.unlink(missing_ok=True)
pipeline.cleanup()
display.cleanup()
print("\n" + "=" * 60)
print("Demo complete!")
print("=" * 60 + "\n")
def main():
viewport_width = 40
viewport_height = 15
if "--viewport" in sys.argv:
idx = sys.argv.index("--viewport")
if idx + 1 < len(sys.argv):
vp = sys.argv[idx + 1]
try:
viewport_width, viewport_height = map(int, vp.split("x"))
except ValueError:
print("Error: Invalid viewport format. Use WxH (e.g., 40x15)")
sys.exit(1)
run_demo(viewport_width, viewport_height)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,378 @@
#!/usr/bin/env python3
"""
Oscilloscope with Image Data Source Integration
This demo:
1. Uses pygame to render oscillator waveforms
2. Converts to PIL Image (8-bit grayscale with transparency)
3. Renders to ANSI using image data source patterns
4. Features LFO modulation chain
Usage:
uv run python scripts/demo_image_oscilloscope.py --lfo --modulate
"""
import argparse
import sys
import time
from pathlib import Path
# Add mainline to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.data_sources.sources import DataSource, ImageItem
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
class ModulatedOscillator:
"""Oscillator with frequency modulation from another oscillator."""
def __init__(
self,
name: str,
waveform: str = "sine",
base_frequency: float = 1.0,
modulator: "OscillatorSensor | None" = None,
modulation_depth: float = 0.5,
):
self.name = name
self.waveform = waveform
self.base_frequency = base_frequency
self.modulator = modulator
self.modulation_depth = modulation_depth
register_oscillator_sensor(
name=name, waveform=waveform, frequency=base_frequency
)
self.osc = OscillatorSensor(
name=name, waveform=waveform, frequency=base_frequency
)
self.osc.start()
def read(self):
if self.modulator:
mod_reading = self.modulator.read()
if mod_reading:
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
effective_freq = self.base_frequency + mod_offset
effective_freq = max(0.1, min(effective_freq, 20.0))
self.osc._frequency = effective_freq
return self.osc.read()
def get_phase(self):
return self.osc._phase
def get_effective_frequency(self):
if self.modulator and self.modulator.read():
mod_reading = self.modulator.read()
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
return max(0.1, min(self.base_frequency + mod_offset, 20.0))
return self.base_frequency
def stop(self):
self.osc.stop()
class OscilloscopeDataSource(DataSource):
"""Dynamic data source that generates oscilloscope images from oscillators."""
def __init__(
self,
modulator: OscillatorSensor,
modulated: ModulatedOscillator,
width: int = 200,
height: int = 100,
):
self.modulator = modulator
self.modulated = modulated
self.width = width
self.height = height
self.frame = 0
# Check if pygame and PIL are available
import importlib.util
self.pygame_available = importlib.util.find_spec("pygame") is not None
self.pil_available = importlib.util.find_spec("PIL") is not None
@property
def name(self) -> str:
return "oscilloscope_image"
@property
def is_dynamic(self) -> bool:
return True
def fetch(self) -> list[ImageItem]:
"""Generate oscilloscope image from oscillators."""
if not self.pygame_available or not self.pil_available:
# Fallback to text-based source
return []
import pygame
from PIL import Image
# Create Pygame surface
surface = pygame.Surface((self.width, self.height))
surface.fill((10, 10, 20)) # Dark background
# Get readings
mod_reading = self.modulator.read()
mod_val = mod_reading.value if mod_reading else 0.5
modulated_reading = self.modulated.read()
modulated_val = modulated_reading.value if modulated_reading else 0.5
# Draw modulator waveform (top half)
top_height = self.height // 2
waveform_fn = self.modulator.WAVEFORMS[self.modulator.waveform]
mod_time_offset = self.modulator._phase * self.modulator.frequency * 0.3
prev_x, prev_y = 0, 0
for x in range(self.width):
col_fraction = x / self.width
time_pos = mod_time_offset + col_fraction
sample = waveform_fn(time_pos * self.modulator.frequency * 2)
y = int(top_height - (sample * (top_height - 10)) - 5)
if x > 0:
pygame.draw.line(surface, (100, 200, 255), (prev_x, prev_y), (x, y), 1)
prev_x, prev_y = x, y
# Draw separator
pygame.draw.line(
surface, (80, 80, 100), (0, top_height), (self.width, top_height), 1
)
# Draw modulated waveform (bottom half)
bottom_start = top_height + 1
bottom_height = self.height - bottom_start - 1
waveform_fn = self.modulated.osc.WAVEFORMS[self.modulated.waveform]
modulated_time_offset = (
self.modulated.get_phase() * self.modulated.get_effective_frequency() * 0.3
)
prev_x, prev_y = 0, 0
for x in range(self.width):
col_fraction = x / self.width
time_pos = modulated_time_offset + col_fraction
sample = waveform_fn(
time_pos * self.modulated.get_effective_frequency() * 2
)
y = int(
bottom_start + (bottom_height - (sample * (bottom_height - 10))) - 5
)
if x > 0:
pygame.draw.line(surface, (255, 150, 100), (prev_x, prev_y), (x, y), 1)
prev_x, prev_y = x, y
# Convert Pygame surface to PIL Image (8-bit grayscale with alpha)
img_str = pygame.image.tostring(surface, "RGB")
pil_rgb = Image.frombytes("RGB", (self.width, self.height), img_str)
# Convert to 8-bit grayscale
pil_gray = pil_rgb.convert("L")
# Create alpha channel (full opacity for now)
alpha = Image.new("L", (self.width, self.height), 255)
# Combine into RGBA
pil_rgba = Image.merge("RGBA", (pil_gray, pil_gray, pil_gray, alpha))
# Create ImageItem
item = ImageItem(
image=pil_rgba,
source="oscilloscope_image",
timestamp=str(time.time()),
path=None,
metadata={
"frame": self.frame,
"mod_value": mod_val,
"modulated_value": modulated_val,
},
)
self.frame += 1
return [item]
def render_pil_to_ansi(
pil_image, terminal_width: int = 80, terminal_height: int = 30
) -> str:
"""Convert PIL image (8-bit grayscale with transparency) to ANSI."""
# Resize for terminal display
resized = pil_image.resize((terminal_width * 2, terminal_height * 2))
# Extract grayscale and alpha channels
gray = resized.convert("L")
alpha = resized.split()[3] if len(resized.split()) > 3 else None
# ANSI character ramp (dark to light)
chars = " .:-=+*#%@"
lines = []
for y in range(0, resized.height, 2): # Sample every 2nd row for aspect ratio
line = ""
for x in range(0, resized.width, 2):
pixel = gray.getpixel((x, y))
# Check alpha if available
if alpha:
a = alpha.getpixel((x, y))
if a < 128: # Transparent
line += " "
continue
char_index = int((pixel / 255) * (len(chars) - 1))
line += chars[char_index]
lines.append(line)
return "\n".join(lines)
def demo_image_oscilloscope(
waveform: str = "sine",
base_freq: float = 0.5,
modulate: bool = False,
mod_waveform: str = "sine",
mod_freq: float = 0.5,
mod_depth: float = 0.5,
frames: int = 0,
):
"""Run oscilloscope with image data source integration."""
frame_interval = 1.0 / 15.0 # 15 FPS
print("Oscilloscope with Image Data Source Integration")
print("Frame rate: 15 FPS")
print()
# Create oscillators
modulator = OscillatorSensor(
name="modulator", waveform=mod_waveform, frequency=mod_freq
)
modulator.start()
modulated = ModulatedOscillator(
name="modulated",
waveform=waveform,
base_frequency=base_freq,
modulator=modulator if modulate else None,
modulation_depth=mod_depth,
)
# Create image data source
image_source = OscilloscopeDataSource(
modulator=modulator,
modulated=modulated,
width=200,
height=100,
)
# Run demo loop
try:
frame = 0
last_time = time.time()
while frames == 0 or frame < frames:
# Fetch image from data source
images = image_source.fetch()
if images:
# Convert to ANSI
visualization = render_pil_to_ansi(
images[0].image, terminal_width=80, terminal_height=30
)
else:
# Fallback to text message
visualization = (
"Pygame or PIL not available\n\n[Image rendering disabled]"
)
# Add header
header = f"IMAGE SOURCE MODE | Frame: {frame}"
header_line = "" * 80
visualization = f"{header}\n{header_line}\n" + visualization
# Display
print("\033[H" + visualization)
# Frame timing
elapsed = time.time() - last_time
sleep_time = max(0, frame_interval - elapsed)
time.sleep(sleep_time)
last_time = time.time()
frame += 1
except KeyboardInterrupt:
print("\n\nDemo stopped by user")
finally:
modulator.stop()
modulated.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Oscilloscope with image data source integration"
)
parser.add_argument(
"--waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Main waveform type",
)
parser.add_argument(
"--frequency",
type=float,
default=0.5,
help="Main oscillator frequency",
)
parser.add_argument(
"--lfo",
action="store_true",
help="Use slow LFO frequency (0.5Hz)",
)
parser.add_argument(
"--modulate",
action="store_true",
help="Enable LFO modulation chain",
)
parser.add_argument(
"--mod-waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Modulator waveform type",
)
parser.add_argument(
"--mod-freq",
type=float,
default=0.5,
help="Modulator frequency in Hz",
)
parser.add_argument(
"--mod-depth",
type=float,
default=0.5,
help="Modulation depth",
)
parser.add_argument(
"--frames",
type=int,
default=0,
help="Number of frames to render",
)
args = parser.parse_args()
base_freq = args.frequency
if args.lfo:
base_freq = 0.5
demo_image_oscilloscope(
waveform=args.waveform,
base_freq=base_freq,
modulate=args.modulate,
mod_waveform=args.mod_waveform,
mod_freq=args.mod_freq,
mod_depth=args.mod_depth,
frames=args.frames,
)

View File

@@ -0,0 +1,137 @@
#!/usr/bin/env python3
"""
Simple Oscillator Sensor Demo
This script demonstrates the oscillator sensor by:
1. Creating an oscillator sensor with various waveforms
2. Printing the waveform data in real-time
Usage:
uv run python scripts/demo_oscillator_simple.py --waveform sine --frequency 1.0
uv run python scripts/demo_oscillator_simple.py --waveform square --frequency 2.0
"""
import argparse
import math
import time
import sys
from pathlib import Path
# Add mainline to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
def render_waveform(width: int, height: int, osc: OscillatorSensor, frame: int) -> str:
"""Render a waveform visualization."""
# Get current reading
current_reading = osc.read()
current_value = current_reading.value if current_reading else 0.0
# Generate waveform data - sample the waveform function directly
# This shows what the waveform looks like, not the live reading
samples = []
waveform_fn = osc.WAVEFORMS[osc._waveform]
for i in range(width):
# Sample across one complete cycle (0 to 1)
phase = i / width
value = waveform_fn(phase)
samples.append(value)
# Build visualization
lines = []
# Header with sensor info
header = (
f"Oscillator: {osc.name} | Waveform: {osc.waveform} | Freq: {osc.frequency}Hz"
)
lines.append(header)
lines.append("" * width)
# Waveform plot (scaled to fit height)
num_rows = height - 3 # Header, separator, footer
for row in range(num_rows):
# Calculate the sample value that corresponds to this row
# 0.0 is bottom, 1.0 is top
row_value = 1.0 - (row / (num_rows - 1)) if num_rows > 1 else 0.5
line_chars = []
for x, sample in enumerate(samples):
# Determine if this sample should be drawn in this row
# Map sample (0.0-1.0) to row (0 to num_rows-1)
# 0.0 -> row 0 (bottom), 1.0 -> row num_rows-1 (top)
sample_row = int(sample * (num_rows - 1))
if sample_row == row:
# Use different characters for waveform vs current position marker
# Check if this is the current reading position
if abs(x / width - (osc._phase % 1.0)) < 0.02:
line_chars.append("") # Current position marker
else:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
# Footer with current value and phase info
footer = f"Value: {current_value:.3f} | Frame: {frame} | Phase: {osc._phase:.2f}"
lines.append(footer)
return "\n".join(lines)
def demo_oscillator(waveform: str = "sine", frequency: float = 1.0, frames: int = 0):
"""Run oscillator demo."""
print(f"Starting oscillator demo: {waveform} wave at {frequency}Hz")
if frames > 0:
print(f"Running for {frames} frames")
else:
print("Press Ctrl+C to stop")
print()
# Create oscillator sensor
register_oscillator_sensor(name="demo_osc", waveform=waveform, frequency=frequency)
osc = OscillatorSensor(name="demo_osc", waveform=waveform, frequency=frequency)
osc.start()
# Run demo loop
try:
frame = 0
while frames == 0 or frame < frames:
# Render waveform
visualization = render_waveform(80, 20, osc, frame)
# Print with ANSI escape codes to clear screen and move cursor
print("\033[H\033[J" + visualization)
time.sleep(0.05) # 20 FPS
frame += 1
except KeyboardInterrupt:
print("\n\nDemo stopped by user")
finally:
osc.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Oscillator sensor demo")
parser.add_argument(
"--waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Waveform type",
)
parser.add_argument(
"--frequency", type=float, default=1.0, help="Oscillator frequency in Hz"
)
parser.add_argument(
"--frames",
type=int,
default=0,
help="Number of frames to render (0 = infinite until Ctrl+C)",
)
args = parser.parse_args()
demo_oscillator(args.waveform, args.frequency, args.frames)

View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""
Oscilloscope Demo - Real-time waveform visualization
This demonstrates a real oscilloscope-style display where:
1. A complete waveform is drawn on the canvas
2. The camera scrolls horizontally (time axis)
3. The "pen" traces the waveform vertically at the center
Think of it as:
- Canvas: Contains the waveform pattern (like a stamp)
- Camera: Moves left-to-right, revealing different parts of the waveform
- Pen: Always at center X, moves vertically with the signal value
Usage:
uv run python scripts/demo_oscilloscope.py --frequency 1.0 --speed 10
"""
import argparse
import math
import time
import sys
from pathlib import Path
# Add mainline to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
def render_oscilloscope(
width: int,
height: int,
osc: OscillatorSensor,
frame: int,
) -> str:
"""Render an oscilloscope-style display."""
# Get current reading (0.0 to 1.0)
reading = osc.read()
current_value = reading.value if reading else 0.5
phase = osc._phase
frequency = osc.frequency
# Build visualization
lines = []
# Header with sensor info
header = (
f"Oscilloscope: {osc.name} | Wave: {osc.waveform} | "
f"Freq: {osc.frequency}Hz | Phase: {phase:.2f}"
)
lines.append(header)
lines.append("" * width)
# Center line (zero reference)
center_row = height // 2
# Draw oscilloscope trace
waveform_fn = osc.WAVEFORMS[osc._waveform]
# Calculate time offset for scrolling
# The trace scrolls based on phase - this creates the time axis movement
# At frequency 1.0, the trace completes one full sweep per frequency cycle
time_offset = phase * frequency * 2.0
# Pre-calculate all sample values for this frame
# Each column represents a time point on the X axis
samples = []
for col in range(width):
# Time position for this column (0.0 to 1.0 across width)
col_fraction = col / width
# Combine with time offset for scrolling effect
time_pos = time_offset + col_fraction
# Sample the waveform at this time point
# Multiply by frequency to get correct number of cycles shown
sample_value = waveform_fn(time_pos * frequency * 2)
samples.append(sample_value)
# Draw the trace
# For each row, check which columns have their sample value in this row
for row in range(height - 3): # Reserve 3 lines for header/footer
# Calculate vertical position (0.0 at bottom, 1.0 at top)
row_pos = 1.0 - (row / (height - 4))
line_chars = []
for col in range(width):
sample = samples[col]
# Check if this sample falls in this row
tolerance = 1.0 / (height - 4)
if abs(sample - row_pos) < tolerance:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
# Draw center indicator line
center_line = list(" " * width)
# Position the indicator based on current value
indicator_x = int((current_value) * (width - 1))
if 0 <= indicator_x < width:
center_line[indicator_x] = ""
lines.append("".join(center_line))
# Footer with current value
footer = f"Value: {current_value:.3f} | Frame: {frame} | Phase: {phase:.2f}"
lines.append(footer)
return "\n".join(lines)
def demo_oscilloscope(
waveform: str = "sine",
frequency: float = 1.0,
frames: int = 0,
):
"""Run oscilloscope demo."""
# Determine if this is LFO range
is_lfo = frequency <= 20.0 and frequency >= 0.1
freq_type = "LFO" if is_lfo else "Audio"
print(f"Oscilloscope demo: {waveform} wave")
print(f"Frequency: {frequency}Hz ({freq_type} range)")
if frames > 0:
print(f"Running for {frames} frames")
else:
print("Press Ctrl+C to stop")
print()
# Create oscillator sensor
register_oscillator_sensor(
name="oscilloscope_osc", waveform=waveform, frequency=frequency
)
osc = OscillatorSensor(
name="oscilloscope_osc", waveform=waveform, frequency=frequency
)
osc.start()
# Run demo loop
try:
frame = 0
while frames == 0 or frame < frames:
# Render oscilloscope display
visualization = render_oscilloscope(80, 22, osc, frame)
# Print with ANSI escape codes to clear screen and move cursor
print("\033[H\033[J" + visualization)
time.sleep(1.0 / 60.0) # 60 FPS
frame += 1
except KeyboardInterrupt:
print("\n\nDemo stopped by user")
finally:
osc.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Oscilloscope demo")
parser.add_argument(
"--waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Waveform type",
)
parser.add_argument(
"--frequency",
type=float,
default=1.0,
help="Oscillator frequency in Hz (LFO: 0.1-20Hz, Audio: >20Hz)",
)
parser.add_argument(
"--lfo",
action="store_true",
help="Use LFO frequency (0.5Hz - slow modulation)",
)
parser.add_argument(
"--fast-lfo",
action="store_true",
help="Use fast LFO frequency (5Hz - rhythmic modulation)",
)
parser.add_argument(
"--frames",
type=int,
default=0,
help="Number of frames to render (0 = infinite until Ctrl+C)",
)
args = parser.parse_args()
# Determine frequency based on mode
frequency = args.frequency
if args.lfo:
frequency = 0.5 # Slow LFO for modulation
elif args.fast_lfo:
frequency = 5.0 # Fast LFO for rhythmic modulation
demo_oscilloscope(
waveform=args.waveform,
frequency=frequency,
frames=args.frames,
)

View File

@@ -0,0 +1,380 @@
#!/usr/bin/env python3
"""
Enhanced Oscilloscope with LFO Modulation Chain
This demo features:
1. Slower frame rate (15 FPS) for human appreciation
2. Reduced flicker using cursor positioning
3. LFO modulation chain: LFO1 modulates LFO2 frequency
4. Multiple visualization modes
Usage:
# Simple LFO
uv run python scripts/demo_oscilloscope_mod.py --lfo
# LFO modulation chain: LFO1 modulates LFO2 frequency
uv run python scripts/demo_oscilloscope_mod.py --modulate --lfo
# Custom modulation depth and rate
uv run python scripts/demo_oscilloscope_mod.py --modulate --lfo --mod-depth 0.5 --mod-rate 0.25
"""
import argparse
import sys
import time
from pathlib import Path
# Add mainline to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
class ModulatedOscillator:
"""
Oscillator with frequency modulation from another oscillator.
Frequency = base_frequency + (modulator_value * modulation_depth)
"""
def __init__(
self,
name: str,
waveform: str = "sine",
base_frequency: float = 1.0,
modulator: "OscillatorSensor | None" = None,
modulation_depth: float = 0.5,
):
self.name = name
self.waveform = waveform
self.base_frequency = base_frequency
self.modulator = modulator
self.modulation_depth = modulation_depth
# Create the oscillator sensor
register_oscillator_sensor(
name=name, waveform=waveform, frequency=base_frequency
)
self.osc = OscillatorSensor(
name=name, waveform=waveform, frequency=base_frequency
)
self.osc.start()
def read(self):
"""Read current value, applying modulation if present."""
# Update frequency based on modulator
if self.modulator:
mod_reading = self.modulator.read()
if mod_reading:
# Modulator value (0-1) affects frequency
# Map 0-1 to -modulation_depth to +modulation_depth
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
effective_freq = self.base_frequency + mod_offset
# Clamp to reasonable range
effective_freq = max(0.1, min(effective_freq, 20.0))
self.osc._frequency = effective_freq
return self.osc.read()
def get_phase(self):
"""Get current phase."""
return self.osc._phase
def get_effective_frequency(self):
"""Get current effective frequency (after modulation)."""
if self.modulator and self.modulator.read():
mod_reading = self.modulator.read()
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
return max(0.1, min(self.base_frequency + mod_offset, 20.0))
return self.base_frequency
def stop(self):
"""Stop the oscillator."""
self.osc.stop()
def render_dual_waveform(
width: int,
height: int,
modulator: OscillatorSensor,
modulated: ModulatedOscillator,
frame: int,
) -> str:
"""Render both modulator and modulated waveforms."""
# Get readings
mod_reading = modulator.read()
mod_val = mod_reading.value if mod_reading else 0.5
modulated_reading = modulated.read()
modulated_val = modulated_reading.value if modulated_reading else 0.5
# Build visualization
lines = []
# Header with sensor info
header1 = f"MODULATOR: {modulator.name} | Wave: {modulator.waveform} | Freq: {modulator.frequency:.2f}Hz"
header2 = f"MODULATED: {modulated.name} | Wave: {modulated.waveform} | Base: {modulated.base_frequency:.2f}Hz | Eff: {modulated.get_effective_frequency():.2f}Hz"
lines.append(header1)
lines.append(header2)
lines.append("" * width)
# Render modulator waveform (top half)
top_height = (height - 5) // 2
waveform_fn = modulator.WAVEFORMS[modulator.waveform]
# Calculate time offset for scrolling
mod_time_offset = modulator._phase * modulator.frequency * 0.3
for row in range(top_height):
row_pos = 1.0 - (row / (top_height - 1))
line_chars = []
for col in range(width):
col_fraction = col / width
time_pos = mod_time_offset + col_fraction
sample = waveform_fn(time_pos * modulator.frequency * 2)
tolerance = 1.0 / (top_height - 1)
if abs(sample - row_pos) < tolerance:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
# Separator line with modulation info
lines.append(
f"─ MODULATION: depth={modulated.modulation_depth:.2f} | mod_value={mod_val:.2f}"
)
# Render modulated waveform (bottom half)
bottom_height = height - top_height - 5
waveform_fn = modulated.osc.WAVEFORMS[modulated.waveform]
# Calculate time offset for scrolling
modulated_time_offset = (
modulated.get_phase() * modulated.get_effective_frequency() * 0.3
)
for row in range(bottom_height):
row_pos = 1.0 - (row / (bottom_height - 1))
line_chars = []
for col in range(width):
col_fraction = col / width
time_pos = modulated_time_offset + col_fraction
sample = waveform_fn(time_pos * modulated.get_effective_frequency() * 2)
tolerance = 1.0 / (bottom_height - 1)
if abs(sample - row_pos) < tolerance:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
# Footer with current values
footer = f"Mod Value: {mod_val:.3f} | Modulated Value: {modulated_val:.3f} | Frame: {frame}"
lines.append(footer)
return "\n".join(lines)
def render_single_waveform(
width: int,
height: int,
osc: OscillatorSensor,
frame: int,
) -> str:
"""Render a single waveform (for non-modulated mode)."""
reading = osc.read()
current_value = reading.value if reading else 0.5
phase = osc._phase
frequency = osc.frequency
# Build visualization
lines = []
# Header with sensor info
header = (
f"Oscilloscope: {osc.name} | Wave: {osc.waveform} | "
f"Freq: {frequency:.2f}Hz | Phase: {phase:.2f}"
)
lines.append(header)
lines.append("" * width)
# Draw oscilloscope trace
waveform_fn = osc.WAVEFORMS[osc.waveform]
time_offset = phase * frequency * 0.3
for row in range(height - 3):
row_pos = 1.0 - (row / (height - 4))
line_chars = []
for col in range(width):
col_fraction = col / width
time_pos = time_offset + col_fraction
sample = waveform_fn(time_pos * frequency * 2)
tolerance = 1.0 / (height - 4)
if abs(sample - row_pos) < tolerance:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
# Footer
footer = f"Value: {current_value:.3f} | Frame: {frame} | Phase: {phase:.2f}"
lines.append(footer)
return "\n".join(lines)
def demo_oscilloscope_mod(
waveform: str = "sine",
base_freq: float = 1.0,
modulate: bool = False,
mod_waveform: str = "sine",
mod_freq: float = 0.5,
mod_depth: float = 0.5,
frames: int = 0,
):
"""Run enhanced oscilloscope demo with modulation support."""
# Frame timing for smooth 15 FPS
frame_interval = 1.0 / 15.0 # 66.67ms per frame
print("Enhanced Oscilloscope Demo")
print("Frame rate: 15 FPS (66ms per frame)")
if modulate:
print(
f"Modulation: {mod_waveform} @ {mod_freq}Hz -> {waveform} @ {base_freq}Hz"
)
print(f"Modulation depth: {mod_depth}")
else:
print(f"Waveform: {waveform} @ {base_freq}Hz")
if frames > 0:
print(f"Running for {frames} frames")
else:
print("Press Ctrl+C to stop")
print()
# Create oscillators
if modulate:
# Create modulation chain: modulator -> modulated
modulator = OscillatorSensor(
name="modulator", waveform=mod_waveform, frequency=mod_freq
)
modulator.start()
modulated = ModulatedOscillator(
name="modulated",
waveform=waveform,
base_frequency=base_freq,
modulator=modulator,
modulation_depth=mod_depth,
)
else:
# Single oscillator
register_oscillator_sensor(
name="oscilloscope", waveform=waveform, frequency=base_freq
)
osc = OscillatorSensor(
name="oscilloscope", waveform=waveform, frequency=base_freq
)
osc.start()
# Run demo loop with consistent timing
try:
frame = 0
last_time = time.time()
while frames == 0 or frame < frames:
# Render based on mode
if modulate:
visualization = render_dual_waveform(
80, 30, modulator, modulated, frame
)
else:
visualization = render_single_waveform(80, 22, osc, frame)
# Use cursor positioning instead of full clear to reduce flicker
print("\033[H" + visualization)
# Calculate sleep time for consistent 15 FPS
elapsed = time.time() - last_time
sleep_time = max(0, frame_interval - elapsed)
time.sleep(sleep_time)
last_time = time.time()
frame += 1
except KeyboardInterrupt:
print("\n\nDemo stopped by user")
finally:
if modulate:
modulator.stop()
modulated.stop()
else:
osc.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Enhanced oscilloscope with LFO modulation chain"
)
parser.add_argument(
"--waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Main waveform type",
)
parser.add_argument(
"--frequency",
type=float,
default=1.0,
help="Main oscillator frequency (LFO range: 0.1-20Hz)",
)
parser.add_argument(
"--lfo",
action="store_true",
help="Use slow LFO frequency (0.5Hz) for main oscillator",
)
parser.add_argument(
"--modulate",
action="store_true",
help="Enable LFO modulation chain (modulator modulates main oscillator)",
)
parser.add_argument(
"--mod-waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Modulator waveform type",
)
parser.add_argument(
"--mod-freq",
type=float,
default=0.5,
help="Modulator frequency in Hz",
)
parser.add_argument(
"--mod-depth",
type=float,
default=0.5,
help="Modulation depth (0.0-1.0, higher = more frequency variation)",
)
parser.add_argument(
"--frames",
type=int,
default=0,
help="Number of frames to render (0 = infinite until Ctrl+C)",
)
args = parser.parse_args()
# Set frequency based on LFO flag
base_freq = args.frequency
if args.lfo:
base_freq = 0.5
demo_oscilloscope_mod(
waveform=args.waveform,
base_freq=base_freq,
modulate=args.modulate,
mod_waveform=args.mod_waveform,
mod_freq=args.mod_freq,
mod_depth=args.mod_depth,
frames=args.frames,
)

View File

@@ -0,0 +1,411 @@
#!/usr/bin/env python3
"""
Enhanced Oscilloscope with Pipeline Switching
This demo features:
1. Text-based oscilloscope (first 15 seconds)
2. Pygame renderer with PIL to ANSI conversion (next 15 seconds)
3. Continuous looping between the two modes
Usage:
uv run python scripts/demo_oscilloscope_pipeline.py --lfo --modulate
"""
import argparse
import sys
import time
from pathlib import Path
# Add mainline to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
class ModulatedOscillator:
"""Oscillator with frequency modulation from another oscillator."""
def __init__(
self,
name: str,
waveform: str = "sine",
base_frequency: float = 1.0,
modulator: "OscillatorSensor | None" = None,
modulation_depth: float = 0.5,
):
self.name = name
self.waveform = waveform
self.base_frequency = base_frequency
self.modulator = modulator
self.modulation_depth = modulation_depth
register_oscillator_sensor(
name=name, waveform=waveform, frequency=base_frequency
)
self.osc = OscillatorSensor(
name=name, waveform=waveform, frequency=base_frequency
)
self.osc.start()
def read(self):
"""Read current value, applying modulation if present."""
if self.modulator:
mod_reading = self.modulator.read()
if mod_reading:
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
effective_freq = self.base_frequency + mod_offset
effective_freq = max(0.1, min(effective_freq, 20.0))
self.osc._frequency = effective_freq
return self.osc.read()
def get_phase(self):
return self.osc._phase
def get_effective_frequency(self):
if self.modulator:
mod_reading = self.modulator.read()
if mod_reading:
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
return max(0.1, min(self.base_frequency + mod_offset, 20.0))
return self.base_frequency
def stop(self):
self.osc.stop()
def render_text_mode(
width: int,
height: int,
modulator: OscillatorSensor,
modulated: ModulatedOscillator,
frame: int,
) -> str:
"""Render dual waveforms in text mode."""
mod_reading = modulator.read()
mod_val = mod_reading.value if mod_reading else 0.5
modulated_reading = modulated.read()
modulated_val = modulated_reading.value if modulated_reading else 0.5
lines = []
header1 = (
f"TEXT MODE | MODULATOR: {modulator.waveform} @ {modulator.frequency:.2f}Hz"
)
header2 = (
f"MODULATED: {modulated.waveform} @ {modulated.get_effective_frequency():.2f}Hz"
)
lines.append(header1)
lines.append(header2)
lines.append("" * width)
# Modulator waveform (top half)
top_height = (height - 5) // 2
waveform_fn = modulator.WAVEFORMS[modulator.waveform]
mod_time_offset = modulator._phase * modulator.frequency * 0.3
for row in range(top_height):
row_pos = 1.0 - (row / (top_height - 1))
line_chars = []
for col in range(width):
col_fraction = col / width
time_pos = mod_time_offset + col_fraction
sample = waveform_fn(time_pos * modulator.frequency * 2)
tolerance = 1.0 / (top_height - 1)
if abs(sample - row_pos) < tolerance:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
lines.append(
f"─ MODULATION: depth={modulated.modulation_depth:.2f} | mod_value={mod_val:.2f}"
)
# Modulated waveform (bottom half)
bottom_height = height - top_height - 5
waveform_fn = modulated.osc.WAVEFORMS[modulated.waveform]
modulated_time_offset = (
modulated.get_phase() * modulated.get_effective_frequency() * 0.3
)
for row in range(bottom_height):
row_pos = 1.0 - (row / (bottom_height - 1))
line_chars = []
for col in range(width):
col_fraction = col / width
time_pos = modulated_time_offset + col_fraction
sample = waveform_fn(time_pos * modulated.get_effective_frequency() * 2)
tolerance = 1.0 / (bottom_height - 1)
if abs(sample - row_pos) < tolerance:
line_chars.append("")
else:
line_chars.append(" ")
lines.append("".join(line_chars))
footer = (
f"Mod Value: {mod_val:.3f} | Modulated: {modulated_val:.3f} | Frame: {frame}"
)
lines.append(footer)
return "\n".join(lines)
def render_pygame_to_ansi(
width: int,
height: int,
modulator: OscillatorSensor,
modulated: ModulatedOscillator,
frame: int,
font_path: str | None,
) -> str:
"""Render waveforms using Pygame, convert to ANSI with PIL."""
try:
import pygame
from PIL import Image
except ImportError:
return "Pygame or PIL not available\n\n" + render_text_mode(
width, height, modulator, modulated, frame
)
# Initialize Pygame surface (smaller for ANSI conversion)
pygame_width = width * 2 # Double for better quality
pygame_height = height * 4
surface = pygame.Surface((pygame_width, pygame_height))
surface.fill((10, 10, 20)) # Dark background
# Get readings
mod_reading = modulator.read()
mod_val = mod_reading.value if mod_reading else 0.5
modulated_reading = modulated.read()
modulated_val = modulated_reading.value if modulated_reading else 0.5
# Draw modulator waveform (top half)
top_height = pygame_height // 2
waveform_fn = modulator.WAVEFORMS[modulator.waveform]
mod_time_offset = modulator._phase * modulator.frequency * 0.3
prev_x, prev_y = 0, 0
for x in range(pygame_width):
col_fraction = x / pygame_width
time_pos = mod_time_offset + col_fraction
sample = waveform_fn(time_pos * modulator.frequency * 2)
y = int(top_height - (sample * (top_height - 20)) - 10)
if x > 0:
pygame.draw.line(surface, (100, 200, 255), (prev_x, prev_y), (x, y), 2)
prev_x, prev_y = x, y
# Draw separator
pygame.draw.line(
surface, (80, 80, 100), (0, top_height), (pygame_width, top_height), 1
)
# Draw modulated waveform (bottom half)
bottom_start = top_height + 10
bottom_height = pygame_height - bottom_start - 20
waveform_fn = modulated.osc.WAVEFORMS[modulated.waveform]
modulated_time_offset = (
modulated.get_phase() * modulated.get_effective_frequency() * 0.3
)
prev_x, prev_y = 0, 0
for x in range(pygame_width):
col_fraction = x / pygame_width
time_pos = modulated_time_offset + col_fraction
sample = waveform_fn(time_pos * modulated.get_effective_frequency() * 2)
y = int(bottom_start + (bottom_height - (sample * (bottom_height - 20))) - 10)
if x > 0:
pygame.draw.line(surface, (255, 150, 100), (prev_x, prev_y), (x, y), 2)
prev_x, prev_y = x, y
# Draw info text on pygame surface
try:
if font_path:
font = pygame.font.Font(font_path, 16)
info_text = f"PYGAME MODE | Mod: {mod_val:.2f} | Out: {modulated_val:.2f} | Frame: {frame}"
text_surface = font.render(info_text, True, (200, 200, 200))
surface.blit(text_surface, (10, 10))
except Exception:
pass
# Convert Pygame surface to PIL Image
img_str = pygame.image.tostring(surface, "RGB")
pil_image = Image.frombytes("RGB", (pygame_width, pygame_height), img_str)
# Convert to ANSI
return pil_to_ansi(pil_image)
def pil_to_ansi(image) -> str:
"""Convert PIL image to ANSI escape codes."""
# Resize for terminal display
terminal_width = 80
terminal_height = 30
image = image.resize((terminal_width * 2, terminal_height * 2))
# Convert to grayscale
image = image.convert("L")
# ANSI character ramp (dark to light)
chars = " .:-=+*#%@"
lines = []
for y in range(0, image.height, 2): # Sample every 2nd row for aspect ratio
line = ""
for x in range(0, image.width, 2):
pixel = image.getpixel((x, y))
char_index = int((pixel / 255) * (len(chars) - 1))
line += chars[char_index]
lines.append(line)
# Add header info
header = "PYGAME → ANSI RENDER MODE"
header_line = "" * terminal_width
return f"{header}\n{header_line}\n" + "\n".join(lines)
def demo_with_pipeline_switching(
waveform: str = "sine",
base_freq: float = 0.5,
modulate: bool = False,
mod_waveform: str = "sine",
mod_freq: float = 0.5,
mod_depth: float = 0.5,
frames: int = 0,
):
"""Run demo with pipeline switching every 15 seconds."""
frame_interval = 1.0 / 15.0 # 15 FPS
mode_duration = 15.0 # 15 seconds per mode
print("Enhanced Oscilloscope with Pipeline Switching")
print(f"Mode duration: {mode_duration} seconds")
print("Frame rate: 15 FPS")
print()
# Create oscillators
modulator = OscillatorSensor(
name="modulator", waveform=mod_waveform, frequency=mod_freq
)
modulator.start()
modulated = ModulatedOscillator(
name="modulated",
waveform=waveform,
base_frequency=base_freq,
modulator=modulator if modulate else None,
modulation_depth=mod_depth,
)
# Find font path
font_path = Path("fonts/Pixel_Sparta.otf")
if not font_path.exists():
font_path = Path("fonts/Pixel Sparta.otf")
font_path = str(font_path) if font_path.exists() else None
# Run demo loop
try:
frame = 0
mode_start_time = time.time()
mode_index = 0 # 0 = text, 1 = pygame
while frames == 0 or frame < frames:
elapsed = time.time() - mode_start_time
# Switch mode every 15 seconds
if elapsed >= mode_duration:
mode_index = (mode_index + 1) % 2
mode_start_time = time.time()
print(f"\n{'=' * 60}")
print(
f"SWITCHING TO {'PYGAME+ANSI' if mode_index == 1 else 'TEXT'} MODE"
)
print(f"{'=' * 60}\n")
time.sleep(1.0) # Brief pause to show mode switch
# Render based on mode
if mode_index == 0:
# Text mode
visualization = render_text_mode(80, 30, modulator, modulated, frame)
else:
# Pygame + PIL to ANSI mode
visualization = render_pygame_to_ansi(
80, 30, modulator, modulated, frame, font_path
)
# Display with cursor positioning
print("\033[H" + visualization)
# Frame timing
time.sleep(frame_interval)
frame += 1
except KeyboardInterrupt:
print("\n\nDemo stopped by user")
finally:
modulator.stop()
modulated.stop()
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Enhanced oscilloscope with pipeline switching"
)
parser.add_argument(
"--waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Main waveform type",
)
parser.add_argument(
"--frequency",
type=float,
default=0.5,
help="Main oscillator frequency (LFO range)",
)
parser.add_argument(
"--lfo",
action="store_true",
help="Use slow LFO frequency (0.5Hz)",
)
parser.add_argument(
"--modulate",
action="store_true",
help="Enable LFO modulation chain",
)
parser.add_argument(
"--mod-waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Modulator waveform type",
)
parser.add_argument(
"--mod-freq",
type=float,
default=0.5,
help="Modulator frequency in Hz",
)
parser.add_argument(
"--mod-depth",
type=float,
default=0.5,
help="Modulation depth",
)
parser.add_argument(
"--frames",
type=int,
default=0,
help="Number of frames to render (0 = infinite)",
)
args = parser.parse_args()
base_freq = args.frequency
if args.lfo:
base_freq = 0.5
demo_with_pipeline_switching(
waveform=args.waveform,
base_freq=base_freq,
modulate=args.modulate,
mod_waveform=args.mod_waveform,
mod_freq=args.mod_freq,
mod_depth=args.mod_depth,
)

View File

@@ -0,0 +1,111 @@
#!/usr/bin/env python3
"""
Oscillator Data Export
Exports oscillator sensor data in JSON format for external use.
Usage:
uv run python scripts/oscillator_data_export.py --waveform sine --frequency 1.0 --duration 5.0
"""
import argparse
import json
import time
import sys
from pathlib import Path
from datetime import datetime
# Add mainline to path
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
def export_oscillator_data(
waveform: str = "sine",
frequency: float = 1.0,
duration: float = 5.0,
sample_rate: float = 60.0,
output_file: str | None = None,
):
"""Export oscillator data to JSON."""
print(f"Exporting oscillator data: {waveform} wave at {frequency}Hz")
print(f"Duration: {duration}s, Sample rate: {sample_rate}Hz")
# Create oscillator sensor
register_oscillator_sensor(
name="export_osc", waveform=waveform, frequency=frequency
)
osc = OscillatorSensor(name="export_osc", waveform=waveform, frequency=frequency)
osc.start()
# Collect data
data = {
"waveform": waveform,
"frequency": frequency,
"duration": duration,
"sample_rate": sample_rate,
"timestamp": datetime.now().isoformat(),
"samples": [],
}
sample_interval = 1.0 / sample_rate
num_samples = int(duration * sample_rate)
print(f"Collecting {num_samples} samples...")
for i in range(num_samples):
reading = osc.read()
if reading:
data["samples"].append(
{
"index": i,
"timestamp": reading.timestamp,
"value": reading.value,
"phase": osc._phase,
}
)
time.sleep(sample_interval)
osc.stop()
# Export to JSON
if output_file:
with open(output_file, "w") as f:
json.dump(data, f, indent=2)
print(f"Data exported to {output_file}")
else:
print(json.dumps(data, indent=2))
return data
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Export oscillator sensor data")
parser.add_argument(
"--waveform",
choices=["sine", "square", "sawtooth", "triangle", "noise"],
default="sine",
help="Waveform type",
)
parser.add_argument(
"--frequency", type=float, default=1.0, help="Oscillator frequency in Hz"
)
parser.add_argument(
"--duration", type=float, default=5.0, help="Duration to record in seconds"
)
parser.add_argument(
"--sample-rate", type=float, default=60.0, help="Sample rate in Hz"
)
parser.add_argument(
"--output", "-o", type=str, help="Output JSON file (default: print to stdout)"
)
args = parser.parse_args()
export_oscillator_data(
waveform=args.waveform,
frequency=args.frequency,
duration=args.duration,
sample_rate=args.sample_rate,
output_file=args.output,
)

509
scripts/pipeline_demo.py Normal file
View File

@@ -0,0 +1,509 @@
#!/usr/bin/env python3
"""
Pipeline Demo Orchestrator
Demonstrates all effects and camera modes with gentle oscillation.
Runs a comprehensive test of the Mainline pipeline system with proper
frame rate control and extended duration for visibility.
"""
import argparse
import math
import signal
import sys
import time
from typing import Any
from engine.camera import Camera
from engine.data_sources.checkerboard import CheckerboardDataSource
from engine.data_sources.sources import SourceItem
from engine.display import DisplayRegistry, NullDisplay
from engine.effects.plugins import discover_plugins
from engine.effects import get_registry
from engine.effects.types import EffectConfig
from engine.frame import FrameTimer
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
from engine.pipeline.adapters import (
CameraClockStage,
CameraStage,
DataSourceStage,
DisplayStage,
EffectPluginStage,
SourceItemsToBufferStage,
)
from engine.pipeline.stages.framebuffer import FrameBufferStage
class GentleOscillator:
"""Produces smooth, gentle sinusoidal values."""
def __init__(
self, speed: float = 60.0, amplitude: float = 1.0, offset: float = 0.0
):
self.speed = speed # Period length in frames
self.amplitude = amplitude # Amplitude
self.offset = offset # Base offset
def value(self, frame: int) -> float:
"""Get oscillated value for given frame."""
return self.offset + self.amplitude * 0.5 * (1 + math.sin(frame / self.speed))
class PipelineDemoOrchestrator:
"""Orchestrates comprehensive pipeline demonstrations."""
def __init__(
self,
use_terminal: bool = True,
target_fps: float = 30.0,
effect_duration: float = 8.0,
mode_duration: float = 3.0,
enable_fps_switch: bool = False,
loop: bool = False,
verbose: bool = False,
):
self.use_terminal = use_terminal
self.target_fps = target_fps
self.effect_duration = effect_duration
self.mode_duration = mode_duration
self.enable_fps_switch = enable_fps_switch
self.loop = loop
self.verbose = verbose
self.frame_count = 0
self.pipeline = None
self.context = None
self.framebuffer = None
self.camera = None
self.timer = None
def log(self, message: str, verbose: bool = False):
"""Print with timestamp if verbose or always-important."""
if self.verbose or not verbose:
print(f"[{time.strftime('%H:%M:%S')}] {message}")
def build_base_pipeline(
self, camera_type: str = "scroll", camera_speed: float = 0.5
):
"""Build a base pipeline with all required components."""
self.log(f"Building base pipeline: camera={camera_type}, speed={camera_speed}")
# Camera
camera = Camera.scroll(speed=camera_speed)
camera.set_canvas_size(200, 200)
# Context
ctx = PipelineContext()
# Pipeline config
config = PipelineConfig(
source="empty",
display="terminal" if self.use_terminal else "null",
camera=camera_type,
effects=[],
enable_metrics=True,
)
pipeline = Pipeline(config=config, context=ctx)
# Use a large checkerboard pattern for visible motion effects
source = CheckerboardDataSource(width=200, height=200, square_size=10)
pipeline.add_stage("source", DataSourceStage(source, name="checkerboard"))
# Add camera clock (must run every frame)
pipeline.add_stage(
"camera_update", CameraClockStage(camera, name="camera-clock")
)
# Add render
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
# Add camera stage
pipeline.add_stage("camera", CameraStage(camera, name="camera"))
# Add framebuffer (optional for effects that use it)
self.framebuffer = FrameBufferStage(name="default", history_depth=5)
pipeline.add_stage("framebuffer", self.framebuffer)
# Add display
display_backend = "terminal" if self.use_terminal else "null"
display = DisplayRegistry.create(display_backend)
if display:
pipeline.add_stage("display", DisplayStage(display, name=display_backend))
# Build and initialize
pipeline.build(auto_inject=False)
pipeline.initialize()
self.pipeline = pipeline
self.context = ctx
self.camera = camera
self.log("Base pipeline built successfully")
return pipeline
def test_effects_oscillation(self):
"""Test each effect with gentle intensity oscillation."""
self.log("\n=== EFFECTS OSCILLATION TEST ===")
self.log(
f"Duration: {self.effect_duration}s per effect at {self.target_fps} FPS"
)
discover_plugins() # Ensure all plugins are registered
registry = get_registry()
all_effects = registry.list_all()
effect_names = [
name
for name in all_effects.keys()
if name not in ("motionblur", "afterimage")
]
# Calculate frames based on duration and FPS
frames_per_effect = int(self.effect_duration * self.target_fps)
oscillator = GentleOscillator(speed=90, amplitude=0.7, offset=0.3)
total_effects = len(effect_names) + 2 # +2 for motionblur and afterimage
estimated_total = total_effects * self.effect_duration
self.log(f"Testing {len(effect_names)} regular effects + 2 framebuffer effects")
self.log(f"Estimated time: {estimated_total:.0f}s")
for idx, effect_name in enumerate(sorted(effect_names), 1):
try:
self.log(f"[{idx}/{len(effect_names)}] Testing effect: {effect_name}")
effect = registry.get(effect_name)
if not effect:
self.log(f" Skipped: plugin not found")
continue
stage = EffectPluginStage(effect, name=effect_name)
self.pipeline.add_stage(f"effect_{effect_name}", stage)
self.pipeline.build(auto_inject=False)
self._run_frames(
frames_per_effect, oscillator=oscillator, effect=effect
)
self.pipeline.remove_stage(f"effect_{effect_name}")
self.pipeline.build(auto_inject=False)
self.log(f"{effect_name} completed successfully")
except Exception as e:
self.log(f"{effect_name} failed: {e}")
# Test motionblur and afterimage separately with framebuffer
for effect_name in ["motionblur", "afterimage"]:
try:
self.log(
f"[{len(effect_names) + 1}/{total_effects}] Testing effect: {effect_name} (with framebuffer)"
)
effect = registry.get(effect_name)
if not effect:
self.log(f" Skipped: plugin not found")
continue
stage = EffectPluginStage(
effect,
name=effect_name,
dependencies={"framebuffer.history.default"},
)
self.pipeline.add_stage(f"effect_{effect_name}", stage)
self.pipeline.build(auto_inject=False)
self._run_frames(
frames_per_effect, oscillator=oscillator, effect=effect
)
self.pipeline.remove_stage(f"effect_{effect_name}")
self.pipeline.build(auto_inject=False)
self.log(f"{effect_name} completed successfully")
except Exception as e:
self.log(f"{effect_name} failed: {e}")
def _run_frames(self, num_frames: int, oscillator=None, effect=None):
"""Run a specified number of frames with proper timing."""
for frame in range(num_frames):
self.frame_count += 1
self.context.set("frame_number", frame)
if oscillator and effect:
intensity = oscillator.value(frame)
effect.configure(EffectConfig(intensity=intensity))
dt = self.timer.sleep_until_next_frame()
self.camera.update(dt)
self.pipeline.execute([])
def test_framebuffer(self):
"""Test framebuffer functionality."""
self.log("\n=== FRAMEBUFFER TEST ===")
try:
# Run frames using FrameTimer for consistent pacing
self._run_frames(10)
# Check framebuffer history
history = self.context.get("framebuffer.default.history")
assert history is not None, "No framebuffer history found"
assert len(history) > 0, "Framebuffer history is empty"
self.log(f"History frames: {len(history)}")
self.log(f"Configured depth: {self.framebuffer.config.history_depth}")
# Check intensity computation
intensity = self.context.get("framebuffer.default.current_intensity")
assert intensity is not None, "No intensity map found"
self.log(f"Intensity map length: {len(intensity)}")
# Check that frames are being stored correctly
recent_frame = self.framebuffer.get_frame(0, self.context)
assert recent_frame is not None, "Cannot retrieve recent frame"
self.log(f"Recent frame rows: {len(recent_frame)}")
self.log("✓ Framebuffer test passed")
except Exception as e:
self.log(f"✗ Framebuffer test failed: {e}")
raise
def test_camera_modes(self):
"""Test each camera mode."""
self.log("\n=== CAMERA MODES TEST ===")
self.log(f"Duration: {self.mode_duration}s per mode at {self.target_fps} FPS")
camera_modes = [
("feed", 0.1),
("scroll", 0.5),
("horizontal", 0.3),
("omni", 0.3),
("floating", 0.5),
("bounce", 0.5),
("radial", 0.3),
]
frames_per_mode = int(self.mode_duration * self.target_fps)
self.log(f"Testing {len(camera_modes)} camera modes")
self.log(f"Estimated time: {len(camera_modes) * self.mode_duration:.0f}s")
for idx, (camera_type, speed) in enumerate(camera_modes, 1):
try:
self.log(f"[{idx}/{len(camera_modes)}] Testing camera: {camera_type}")
# Rebuild camera
self.camera.reset()
cam_class = getattr(Camera, camera_type, Camera.scroll)
new_camera = cam_class(speed=speed)
new_camera.set_canvas_size(200, 200)
# Update camera stages
clock_stage = CameraClockStage(new_camera, name="camera-clock")
self.pipeline.replace_stage("camera_update", clock_stage)
camera_stage = CameraStage(new_camera, name="camera")
self.pipeline.replace_stage("camera", camera_stage)
self.camera = new_camera
# Run frames with proper timing
self._run_frames(frames_per_mode)
# Verify camera moved (check final position)
x, y = self.camera.x, self.camera.y
self.log(f" Final position: ({x:.1f}, {y:.1f})")
if camera_type == "feed":
assert x == 0 and y == 0, "Feed camera should not move"
elif camera_type in ("scroll", "horizontal"):
assert abs(x) > 0 or abs(y) > 0, "Camera should have moved"
else:
self.log(f" Position check skipped (mode={camera_type})")
self.log(f"{camera_type} completed successfully")
except Exception as e:
self.log(f"{camera_type} failed: {e}")
def test_fps_switch_demo(self):
"""Demonstrate the effect of different frame rates on animation smoothness."""
if not self.enable_fps_switch:
return
self.log("\n=== FPS SWITCH DEMONSTRATION ===")
fps_sequence = [
(30.0, 5.0), # 30 FPS for 5 seconds
(60.0, 5.0), # 60 FPS for 5 seconds
(30.0, 5.0), # Back to 30 FPS for 5 seconds
(20.0, 3.0), # 20 FPS for 3 seconds
(60.0, 3.0), # 60 FPS for 3 seconds
]
original_fps = self.target_fps
for fps, duration in fps_sequence:
self.log(f"\n--- Switching to {fps} FPS for {duration}s ---")
self.target_fps = fps
self.timer.target_frame_dt = 1.0 / fps
# Update display FPS if supported
display = (
self.pipeline.get_stage("display").stage
if self.pipeline.get_stage("display")
else None
)
if display and hasattr(display, "target_fps"):
display.target_fps = fps
display._frame_period = 1.0 / fps if fps > 0 else 0
frames = int(duration * fps)
camera_type = "radial" # Use radial for smooth rotation that's visible at different FPS
speed = 0.3
# Rebuild camera if needed
self.camera.reset()
new_camera = Camera.radial(speed=speed)
new_camera.set_canvas_size(200, 200)
clock_stage = CameraClockStage(new_camera, name="camera-clock")
self.pipeline.replace_stage("camera_update", clock_stage)
camera_stage = CameraStage(new_camera, name="camera")
self.pipeline.replace_stage("camera", camera_stage)
self.camera = new_camera
for frame in range(frames):
self.context.set("frame_number", frame)
dt = self.timer.sleep_until_next_frame()
self.camera.update(dt)
result = self.pipeline.execute([])
self.log(f" Completed {frames} frames at {fps} FPS")
# Restore original FPS
self.target_fps = original_fps
self.timer.target_frame_dt = 1.0 / original_fps
self.log("✓ FPS switch demo completed")
def run(self):
"""Run the complete demo."""
start_time = time.time()
self.log("Starting Pipeline Demo Orchestrator")
self.log("=" * 50)
# Initialize frame timer
self.timer = FrameTimer(target_frame_dt=1.0 / self.target_fps)
# Build pipeline
self.build_base_pipeline()
try:
# Test framebuffer first (needed for motion blur effects)
self.test_framebuffer()
# Test effects
self.test_effects_oscillation()
# Test camera modes
self.test_camera_modes()
# Optional FPS switch demonstration
if self.enable_fps_switch:
self.test_fps_switch_demo()
else:
self.log("\n=== FPS SWITCH DEMO ===")
self.log("Skipped (enable with --switch-fps)")
elapsed = time.time() - start_time
self.log("\n" + "=" * 50)
self.log("Demo completed successfully!")
self.log(f"Total frames processed: {self.frame_count}")
self.log(f"Total elapsed time: {elapsed:.1f}s")
self.log(f"Average FPS: {self.frame_count / elapsed:.1f}")
finally:
# Always cleanup properly
self._cleanup()
def _cleanup(self):
"""Clean up pipeline resources."""
self.log("Cleaning up...", verbose=True)
if self.pipeline:
try:
self.pipeline.cleanup()
if self.verbose:
self.log("Pipeline cleaned up successfully", verbose=True)
except Exception as e:
self.log(f"Error during pipeline cleanup: {e}", verbose=True)
# If not looping, clear references
if not self.loop:
self.pipeline = None
self.context = None
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Pipeline Demo Orchestrator - comprehensive demo of Mainline pipeline"
)
parser.add_argument(
"--null",
action="store_true",
help="Use null display (no visual output)",
)
parser.add_argument(
"--fps",
type=float,
default=30.0,
help="Target frame rate (default: 30)",
)
parser.add_argument(
"--effect-duration",
type=float,
default=8.0,
help="Duration per effect in seconds (default: 8)",
)
parser.add_argument(
"--mode-duration",
type=float,
default=3.0,
help="Duration per camera mode in seconds (default: 3)",
)
parser.add_argument(
"--switch-fps",
action="store_true",
help="Include FPS switching demonstration",
)
parser.add_argument(
"--loop",
action="store_true",
help="Run demo in an infinite loop",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable verbose output",
)
args = parser.parse_args()
orchestrator = PipelineDemoOrchestrator(
use_terminal=not args.null,
target_fps=args.fps,
effect_duration=args.effect_duration,
mode_duration=args.mode_duration,
enable_fps_switch=args.switch_fps,
loop=args.loop,
verbose=args.verbose,
)
try:
orchestrator.run()
except KeyboardInterrupt:
print("\nInterrupted by user")
sys.exit(0)
except Exception as e:
print(f"\nDemo failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)

View File

@@ -0,0 +1,49 @@
#!/usr/bin/env python3
"""Render Mermaid diagrams in markdown files to ASCII art."""
import re
import subprocess
import sys
def extract_mermaid_blocks(content: str) -> list[str]:
"""Extract mermaid blocks from markdown."""
return re.findall(r"```mermaid\n(.*?)\n```", content, re.DOTALL)
def render_diagram(block: str) -> str:
"""Render a single mermaid block to ASCII."""
result = subprocess.run(
["mermaid-ascii", "-f", "-"],
input=block,
capture_output=True,
text=True,
)
if result.returncode != 0:
return f"ERROR: {result.stderr}"
return result.stdout
def main():
if len(sys.argv) < 2:
print("Usage: render-diagrams.py <markdown-file>")
sys.exit(1)
filename = sys.argv[1]
content = open(filename).read()
blocks = extract_mermaid_blocks(content)
print(f"Found {len(blocks)} mermaid diagram(s) in {filename}")
print()
for i, block in enumerate(blocks):
# Skip if empty
if not block.strip():
continue
print(f"=== Diagram {i + 1} ===")
print(render_diagram(block))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,64 @@
#!/usr/bin/env python3
"""Validate Mermaid diagrams in markdown files."""
import glob
import re
import sys
# Diagram types that are valid in Mermaid
VALID_TYPES = {
"flowchart",
"graph",
"classDiagram",
"sequenceDiagram",
"stateDiagram",
"stateDiagram-v2",
"erDiagram",
"gantt",
"pie",
"mindmap",
"journey",
"gitGraph",
"requirementDiagram",
}
def extract_mermaid_blocks(content: str) -> list[tuple[int, str]]:
"""Extract mermaid blocks with their positions."""
blocks = []
for match in re.finditer(r"```mermaid\n(.*?)\n```", content, re.DOTALL):
blocks.append((match.start(), match.group(1)))
return blocks
def validate_block(block: str) -> bool:
"""Check if a mermaid block has a valid diagram type."""
if not block.strip():
return True # Empty block is OK
first_line = block.strip().split("\n")[0]
return any(first_line.startswith(t) for t in VALID_TYPES)
def main():
md_files = glob.glob("docs/*.md")
errors = []
for filepath in md_files:
content = open(filepath).read()
blocks = extract_mermaid_blocks(content)
for i, (_, block) in enumerate(blocks):
if not validate_block(block):
errors.append(f"{filepath}: invalid diagram type in block {i + 1}")
if errors:
for e in errors:
print(f"ERROR: {e}")
sys.exit(1)
print(f"Validated {len(md_files)} markdown files - all OK")
if __name__ == "__main__":
main()