forked from genewildish/Mainline
feat: Add oscilloscope with image data source integration
- demo_image_oscilloscope.py: Uses ImageDataSource pattern to generate oscilloscope images - Pygame renders waveforms to RGB surfaces - PIL converts to 8-bit grayscale with RGBA transparency - ANSI rendering converts grayscale to character ramp - Features LFO modulation chain Usage: uv run python scripts/demo_image_oscilloscope.py --lfo --modulate Pattern: Pygame surface → PIL Image (L mode) → ANSI characters Related to #46
This commit is contained in:
378
scripts/demo_image_oscilloscope.py
Normal file
378
scripts/demo_image_oscilloscope.py
Normal file
@@ -0,0 +1,378 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Oscilloscope with Image Data Source Integration
|
||||
|
||||
This demo:
|
||||
1. Uses pygame to render oscillator waveforms
|
||||
2. Converts to PIL Image (8-bit grayscale with transparency)
|
||||
3. Renders to ANSI using image data source patterns
|
||||
4. Features LFO modulation chain
|
||||
|
||||
Usage:
|
||||
uv run python scripts/demo_image_oscilloscope.py --lfo --modulate
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
# Add mainline to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from engine.data_sources.sources import DataSource, ImageItem
|
||||
from engine.sensors.oscillator import OscillatorSensor, register_oscillator_sensor
|
||||
|
||||
|
||||
class ModulatedOscillator:
|
||||
"""Oscillator with frequency modulation from another oscillator."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
name: str,
|
||||
waveform: str = "sine",
|
||||
base_frequency: float = 1.0,
|
||||
modulator: "OscillatorSensor | None" = None,
|
||||
modulation_depth: float = 0.5,
|
||||
):
|
||||
self.name = name
|
||||
self.waveform = waveform
|
||||
self.base_frequency = base_frequency
|
||||
self.modulator = modulator
|
||||
self.modulation_depth = modulation_depth
|
||||
|
||||
register_oscillator_sensor(
|
||||
name=name, waveform=waveform, frequency=base_frequency
|
||||
)
|
||||
self.osc = OscillatorSensor(
|
||||
name=name, waveform=waveform, frequency=base_frequency
|
||||
)
|
||||
self.osc.start()
|
||||
|
||||
def read(self):
|
||||
if self.modulator:
|
||||
mod_reading = self.modulator.read()
|
||||
if mod_reading:
|
||||
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
|
||||
effective_freq = self.base_frequency + mod_offset
|
||||
effective_freq = max(0.1, min(effective_freq, 20.0))
|
||||
self.osc._frequency = effective_freq
|
||||
return self.osc.read()
|
||||
|
||||
def get_phase(self):
|
||||
return self.osc._phase
|
||||
|
||||
def get_effective_frequency(self):
|
||||
if self.modulator and self.modulator.read():
|
||||
mod_reading = self.modulator.read()
|
||||
mod_offset = (mod_reading.value - 0.5) * 2 * self.modulation_depth
|
||||
return max(0.1, min(self.base_frequency + mod_offset, 20.0))
|
||||
return self.base_frequency
|
||||
|
||||
def stop(self):
|
||||
self.osc.stop()
|
||||
|
||||
|
||||
class OscilloscopeDataSource(DataSource):
|
||||
"""Dynamic data source that generates oscilloscope images from oscillators."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
modulator: OscillatorSensor,
|
||||
modulated: ModulatedOscillator,
|
||||
width: int = 200,
|
||||
height: int = 100,
|
||||
):
|
||||
self.modulator = modulator
|
||||
self.modulated = modulated
|
||||
self.width = width
|
||||
self.height = height
|
||||
self.frame = 0
|
||||
|
||||
# Check if pygame and PIL are available
|
||||
import importlib.util
|
||||
|
||||
self.pygame_available = importlib.util.find_spec("pygame") is not None
|
||||
self.pil_available = importlib.util.find_spec("PIL") is not None
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "oscilloscope_image"
|
||||
|
||||
@property
|
||||
def is_dynamic(self) -> bool:
|
||||
return True
|
||||
|
||||
def fetch(self) -> list[ImageItem]:
|
||||
"""Generate oscilloscope image from oscillators."""
|
||||
if not self.pygame_available or not self.pil_available:
|
||||
# Fallback to text-based source
|
||||
return []
|
||||
|
||||
import pygame
|
||||
from PIL import Image
|
||||
|
||||
# Create Pygame surface
|
||||
surface = pygame.Surface((self.width, self.height))
|
||||
surface.fill((10, 10, 20)) # Dark background
|
||||
|
||||
# Get readings
|
||||
mod_reading = self.modulator.read()
|
||||
mod_val = mod_reading.value if mod_reading else 0.5
|
||||
modulated_reading = self.modulated.read()
|
||||
modulated_val = modulated_reading.value if modulated_reading else 0.5
|
||||
|
||||
# Draw modulator waveform (top half)
|
||||
top_height = self.height // 2
|
||||
waveform_fn = self.modulator.WAVEFORMS[self.modulator.waveform]
|
||||
mod_time_offset = self.modulator._phase * self.modulator.frequency * 0.3
|
||||
|
||||
prev_x, prev_y = 0, 0
|
||||
for x in range(self.width):
|
||||
col_fraction = x / self.width
|
||||
time_pos = mod_time_offset + col_fraction
|
||||
sample = waveform_fn(time_pos * self.modulator.frequency * 2)
|
||||
y = int(top_height - (sample * (top_height - 10)) - 5)
|
||||
if x > 0:
|
||||
pygame.draw.line(surface, (100, 200, 255), (prev_x, prev_y), (x, y), 1)
|
||||
prev_x, prev_y = x, y
|
||||
|
||||
# Draw separator
|
||||
pygame.draw.line(
|
||||
surface, (80, 80, 100), (0, top_height), (self.width, top_height), 1
|
||||
)
|
||||
|
||||
# Draw modulated waveform (bottom half)
|
||||
bottom_start = top_height + 1
|
||||
bottom_height = self.height - bottom_start - 1
|
||||
waveform_fn = self.modulated.osc.WAVEFORMS[self.modulated.waveform]
|
||||
modulated_time_offset = (
|
||||
self.modulated.get_phase() * self.modulated.get_effective_frequency() * 0.3
|
||||
)
|
||||
|
||||
prev_x, prev_y = 0, 0
|
||||
for x in range(self.width):
|
||||
col_fraction = x / self.width
|
||||
time_pos = modulated_time_offset + col_fraction
|
||||
sample = waveform_fn(
|
||||
time_pos * self.modulated.get_effective_frequency() * 2
|
||||
)
|
||||
y = int(
|
||||
bottom_start + (bottom_height - (sample * (bottom_height - 10))) - 5
|
||||
)
|
||||
if x > 0:
|
||||
pygame.draw.line(surface, (255, 150, 100), (prev_x, prev_y), (x, y), 1)
|
||||
prev_x, prev_y = x, y
|
||||
|
||||
# Convert Pygame surface to PIL Image (8-bit grayscale with alpha)
|
||||
img_str = pygame.image.tostring(surface, "RGB")
|
||||
pil_rgb = Image.frombytes("RGB", (self.width, self.height), img_str)
|
||||
|
||||
# Convert to 8-bit grayscale
|
||||
pil_gray = pil_rgb.convert("L")
|
||||
|
||||
# Create alpha channel (full opacity for now)
|
||||
alpha = Image.new("L", (self.width, self.height), 255)
|
||||
|
||||
# Combine into RGBA
|
||||
pil_rgba = Image.merge("RGBA", (pil_gray, pil_gray, pil_gray, alpha))
|
||||
|
||||
# Create ImageItem
|
||||
item = ImageItem(
|
||||
image=pil_rgba,
|
||||
source="oscilloscope_image",
|
||||
timestamp=str(time.time()),
|
||||
path=None,
|
||||
metadata={
|
||||
"frame": self.frame,
|
||||
"mod_value": mod_val,
|
||||
"modulated_value": modulated_val,
|
||||
},
|
||||
)
|
||||
|
||||
self.frame += 1
|
||||
return [item]
|
||||
|
||||
|
||||
def render_pil_to_ansi(
|
||||
pil_image, terminal_width: int = 80, terminal_height: int = 30
|
||||
) -> str:
|
||||
"""Convert PIL image (8-bit grayscale with transparency) to ANSI."""
|
||||
# Resize for terminal display
|
||||
resized = pil_image.resize((terminal_width * 2, terminal_height * 2))
|
||||
|
||||
# Extract grayscale and alpha channels
|
||||
gray = resized.convert("L")
|
||||
alpha = resized.split()[3] if len(resized.split()) > 3 else None
|
||||
|
||||
# ANSI character ramp (dark to light)
|
||||
chars = " .:-=+*#%@"
|
||||
|
||||
lines = []
|
||||
for y in range(0, resized.height, 2): # Sample every 2nd row for aspect ratio
|
||||
line = ""
|
||||
for x in range(0, resized.width, 2):
|
||||
pixel = gray.getpixel((x, y))
|
||||
|
||||
# Check alpha if available
|
||||
if alpha:
|
||||
a = alpha.getpixel((x, y))
|
||||
if a < 128: # Transparent
|
||||
line += " "
|
||||
continue
|
||||
|
||||
char_index = int((pixel / 255) * (len(chars) - 1))
|
||||
line += chars[char_index]
|
||||
lines.append(line)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def demo_image_oscilloscope(
|
||||
waveform: str = "sine",
|
||||
base_freq: float = 0.5,
|
||||
modulate: bool = False,
|
||||
mod_waveform: str = "sine",
|
||||
mod_freq: float = 0.5,
|
||||
mod_depth: float = 0.5,
|
||||
frames: int = 0,
|
||||
):
|
||||
"""Run oscilloscope with image data source integration."""
|
||||
frame_interval = 1.0 / 15.0 # 15 FPS
|
||||
|
||||
print("Oscilloscope with Image Data Source Integration")
|
||||
print("Frame rate: 15 FPS")
|
||||
print()
|
||||
|
||||
# Create oscillators
|
||||
modulator = OscillatorSensor(
|
||||
name="modulator", waveform=mod_waveform, frequency=mod_freq
|
||||
)
|
||||
modulator.start()
|
||||
|
||||
modulated = ModulatedOscillator(
|
||||
name="modulated",
|
||||
waveform=waveform,
|
||||
base_frequency=base_freq,
|
||||
modulator=modulator if modulate else None,
|
||||
modulation_depth=mod_depth,
|
||||
)
|
||||
|
||||
# Create image data source
|
||||
image_source = OscilloscopeDataSource(
|
||||
modulator=modulator,
|
||||
modulated=modulated,
|
||||
width=200,
|
||||
height=100,
|
||||
)
|
||||
|
||||
# Run demo loop
|
||||
try:
|
||||
frame = 0
|
||||
last_time = time.time()
|
||||
|
||||
while frames == 0 or frame < frames:
|
||||
# Fetch image from data source
|
||||
images = image_source.fetch()
|
||||
|
||||
if images:
|
||||
# Convert to ANSI
|
||||
visualization = render_pil_to_ansi(
|
||||
images[0].image, terminal_width=80, terminal_height=30
|
||||
)
|
||||
else:
|
||||
# Fallback to text message
|
||||
visualization = (
|
||||
"Pygame or PIL not available\n\n[Image rendering disabled]"
|
||||
)
|
||||
|
||||
# Add header
|
||||
header = f"IMAGE SOURCE MODE | Frame: {frame}"
|
||||
header_line = "─" * 80
|
||||
visualization = f"{header}\n{header_line}\n" + visualization
|
||||
|
||||
# Display
|
||||
print("\033[H" + visualization)
|
||||
|
||||
# Frame timing
|
||||
elapsed = time.time() - last_time
|
||||
sleep_time = max(0, frame_interval - elapsed)
|
||||
time.sleep(sleep_time)
|
||||
last_time = time.time()
|
||||
|
||||
frame += 1
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\nDemo stopped by user")
|
||||
|
||||
finally:
|
||||
modulator.stop()
|
||||
modulated.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Oscilloscope with image data source integration"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--waveform",
|
||||
choices=["sine", "square", "sawtooth", "triangle", "noise"],
|
||||
default="sine",
|
||||
help="Main waveform type",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--frequency",
|
||||
type=float,
|
||||
default=0.5,
|
||||
help="Main oscillator frequency",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--lfo",
|
||||
action="store_true",
|
||||
help="Use slow LFO frequency (0.5Hz)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--modulate",
|
||||
action="store_true",
|
||||
help="Enable LFO modulation chain",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mod-waveform",
|
||||
choices=["sine", "square", "sawtooth", "triangle", "noise"],
|
||||
default="sine",
|
||||
help="Modulator waveform type",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mod-freq",
|
||||
type=float,
|
||||
default=0.5,
|
||||
help="Modulator frequency in Hz",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mod-depth",
|
||||
type=float,
|
||||
default=0.5,
|
||||
help="Modulation depth",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--frames",
|
||||
type=int,
|
||||
default=0,
|
||||
help="Number of frames to render",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
base_freq = args.frequency
|
||||
if args.lfo:
|
||||
base_freq = 0.5
|
||||
|
||||
demo_image_oscilloscope(
|
||||
waveform=args.waveform,
|
||||
base_freq=base_freq,
|
||||
modulate=args.modulate,
|
||||
mod_waveform=args.mod_waveform,
|
||||
mod_freq=args.mod_freq,
|
||||
mod_depth=args.mod_depth,
|
||||
frames=args.frames,
|
||||
)
|
||||
Reference in New Issue
Block a user