"""
Pattern Playback Controller Module
Manages MIDI pattern playback using the PlaybackEngine. Handles:
- Starting/stopping/pausing playback
- Building MIDI files from patterns
- UI synchronization during playback
- Muting/unmuting channels
- Shuffle play functionality
"""
import random
from typing import Any, Callable, Dict, List, Optional
from decologr import Decologr as log
from mido import Message, MetaMessage, MidiFile, MidiTrack, bpm2tempo
from PySide6.QtCore import QObject, Qt, QTimer
from picomidi.core.tempo import bpm_to_ticks
from picomidi.message.type import MidoMetaMessageType
from picomidi.messages.note import build_midi_note, note_off, note_on
from picomidi.playback.engine import (
PlaybackEngine,
TransportState,
)
from picomidi.playback.worker import MidiPlaybackWorker
from picomidi.sequencer.event import SequencerEvent
from picomidi.ui.widget.button.note import NoteButtonEvent
[docs]
class PlaybackConfig:
"""Configuration for playback controller."""
def __init__(
self,
ticks_per_beat: int = 480,
beats_per_measure: int = 4,
measure_beats: int = 16,
default_bpm: int = 120,
playback_interval_ms: int = 20,
):
"""
Initialize playback configuration.
:param ticks_per_beat: MIDI ticks per beat
:param beats_per_measure: Beats per measure (4/4 time)
:param measure_beats: Beats per measure for sequencer display
:param default_bpm: Default tempo
:param playback_interval_ms: Timer interval for playback updates
"""
[docs]
self.ticks_per_beat = ticks_per_beat
[docs]
self.beats_per_measure = beats_per_measure
[docs]
self.measure_beats = measure_beats
[docs]
self.default_bpm = default_bpm
[docs]
self.playback_interval_ms = playback_interval_ms
@property
[docs]
def ticks_per_measure(self) -> int:
"""Calculate ticks per bar."""
return self.ticks_per_beat * self.beats_per_measure
[docs]
class PlaybackPosition:
"""Represents the current playback position."""
def __init__(self, global_step: int = 0, bar_index: int = 0, step_in_bar: int = 0):
"""
Initialize playback position.
:param global_step: Current step across all bars
:param bar_index: Current bar index
:param step_in_bar: Current step within bar
"""
[docs]
self.global_step = global_step
[docs]
self.bar_index = bar_index
[docs]
self.step_in_bar = step_in_bar
[docs]
class PatternPlaybackController(QObject):
"""
Controls pattern playback and synchronization.
Manages:
- PlaybackEngine operation
- UI updates during playback (bar/step highlighting)
- Muting/unmuting channels
- Pause/resume functionality
- Shuffle play
"""
def __init__(
self,
config: Optional[PlaybackConfig] = None,
playback_engine: Optional[PlaybackEngine] = None,
scope: str = "PatternPlaybackController",
):
"""
Initialize playback controller.
:param config: Playback configuration
:param playback_engine: PlaybackEngine instance. If not provided, creates one
:param scope: Logging scope name
"""
super().__init__()
[docs]
self.config = config or PlaybackConfig()
[docs]
self.playback_engine = playback_engine or PlaybackEngine()
# Playback state
[docs]
self.is_playing = False
[docs]
self.current_bpm = self.config.default_bpm
# Position tracking
[docs]
self.current_position = PlaybackPosition()
[docs]
self.last_bar_index = -1
[docs]
self.last_step_in_bar = -1
# Mute state
[docs]
self.muted_channels: List[int] = []
# Timer for driving playback
[docs]
self.timer: Optional[QTimer] = None
[docs]
self.worker = MidiPlaybackWorker(parent=self)
# Callbacks for UI updates
[docs]
self.on_playback_started: Optional[Callable[[], None]] = None
[docs]
self.on_playback_stopped: Optional[Callable[[], None]] = None
[docs]
self.on_playback_paused: Optional[Callable[[], None]] = None
[docs]
self.on_playback_resumed: Optional[Callable[[], None]] = None
[docs]
self.on_position_changed: Optional[Callable[[PlaybackPosition], None]] = None
[docs]
self.on_bar_changed: Optional[Callable[[int], None]] = None
[docs]
self.on_step_changed: Optional[Callable[[int], None]] = None
# Callback for MIDI event sending
[docs]
self.on_midi_event: Optional[Callable[[Message], Any]] = None
[docs]
def start_playback(
self,
measures: List,
bpm: Optional[int] = None,
) -> bool:
"""
Start pattern playback.
:param measures: List of PatternMeasure objects to play
:param bpm: Optional tempo override
:return: True if playback started successfully
"""
if self.is_playing:
log.debug(
message="Playback already in progress",
scope=self.scope,
)
return False
if not measures:
log.warning(
message="No measures to play",
scope=self.scope,
)
return False
try:
# Set tempo if provided
if bpm is not None:
self.current_bpm = bpm
# Build MIDI file from measures
midi_file = self._build_midi_file_for_playback(measures)
# Check if there are any notes to play
if len(midi_file.tracks[0]) <= 1:
log.message(
message="Pattern has no notes to play",
scope=self.scope,
)
return False
# Configure and start engine
self.playback_engine.load_file(midi_file)
# Apply mute state
for channel in range(16):
self.playback_engine.mute_channel(
channel, channel in self.muted_channels
)
# Set MIDI event callback
if self.on_midi_event:
self.playback_engine.on_event = lambda msg: self.on_midi_event(msg)
# Start engine
self.playback_engine.start(0)
# Setup worker for MIDI playback (drives engine.process_until_now)
self.worker.setup(
buffered_msgs=[],
midi_out_port=None,
ticks_per_beat=midi_file.ticks_per_beat,
play_program_changes=True,
start_time=None,
initial_tempo=bpm2tempo(self.current_bpm),
playback_engine=self.playback_engine,
)
self.worker.finished.connect(self._on_worker_finished)
# Initialize position tracking
self.current_position = PlaybackPosition()
self.last_bar_index = -1
self.last_step_in_bar = -1
# Start timer
self._start_timer()
# Mark as playing
self.is_playing = True
self.is_paused = False
tempo_us = bpm2tempo(self.current_bpm)
log.message(
message=(
f"Pattern playback started: {self.current_bpm} BPM "
f"(tempo={tempo_us} µs/beat)"
),
scope=self.scope,
)
if self.on_playback_started:
self.on_playback_started()
return True
except Exception as ex:
log.error(
message=f"Error starting playback: {ex}",
scope=self.scope,
)
return False
[docs]
def _on_worker_finished(self) -> None:
"""Handle worker.finished when engine stops playing."""
if self.is_playing:
self.stop_playback()
[docs]
def stop_playback(self) -> None:
"""Stop pattern playback."""
if not self.is_playing:
return
try:
self.worker.stop()
self.playback_engine.stop()
try:
self.worker.finished.disconnect(self._on_worker_finished)
except (TypeError, RuntimeError):
pass
# Stop timer
self._stop_timer()
# Reset state
self.is_playing = False
self.is_paused = False
self.current_position = PlaybackPosition()
log.message(
message="Pattern playback stopped",
scope=self.scope,
)
if self.on_playback_stopped:
self.on_playback_stopped()
except Exception as ex:
log.error(
message=f"Error stopping playback: {ex}",
scope=self.scope,
)
[docs]
def pause_playback(self) -> None:
"""Pause pattern playback (can be resumed)."""
if not self.is_playing or self.is_paused:
return
try:
if self.timer:
self.timer.stop()
self.is_paused = True
log.message(
message="Pattern playback paused",
scope=self.scope,
)
if self.on_playback_paused:
self.on_playback_paused()
except Exception as ex:
log.error(
message=f"Error pausing playback: {ex}",
scope=self.scope,
)
[docs]
def resume_playback(self) -> None:
"""Resume paused playback."""
if not self.is_playing or not self.is_paused:
return
try:
if self.timer:
self._update_timer_interval()
self.timer.start()
self.is_paused = False
log.message(
message="Pattern playback resumed",
scope=self.scope,
)
if self.on_playback_resumed:
self.on_playback_resumed()
except Exception as ex:
log.error(
message=f"Error resuming playback: {ex}",
scope=self.scope,
)
[docs]
def toggle_pause(self) -> None:
"""Toggle pause/resume state."""
if self.is_paused:
self.resume_playback()
else:
self.pause_playback()
[docs]
def shuffle_play(
self,
measures: List,
bpm: Optional[int] = None,
) -> bool:
"""
Select a random bar and start playback.
:param measures: List of PatternMeasure objects
:param bpm: Optional tempo override
:return: True if playback started
"""
if not measures:
log.warning(
message="No measures for shuffle play",
scope=self.scope,
)
return False
try:
# Select random bar
random_bar_index = random.randint(0, len(measures) - 1)
log.message(
message=f"Shuffle play: starting from bar {random_bar_index + 1}",
scope=self.scope,
)
# Trigger callback for UI to select the bar
if self.on_bar_changed:
self.on_bar_changed(random_bar_index)
# Start playback
return self.start_playback(measures, bpm)
except Exception as ex:
log.error(
message=f"Error starting shuffle play: {ex}",
scope=self.scope,
)
return False
[docs]
def mute_channel(self, channel: int, mute: bool = True) -> None:
"""
Mute or unmute a specific MIDI channel.
:param channel: MIDI channel (0-15)
:param mute: True to mute, False to unmute
"""
if mute:
if channel not in self.muted_channels:
self.muted_channels.append(channel)
log.message(
message=f"Channel {channel} muted",
scope=self.scope,
)
else:
if channel in self.muted_channels:
self.muted_channels.remove(channel)
log.message(
message=f"Channel {channel} unmuted",
scope=self.scope,
)
# Update engine if playing
if self.is_playing:
self.playback_engine.mute_channel(channel, mute)
[docs]
def mute_row(self, row: int, mute: bool = True) -> None:
"""
Mute or unmute a sequencer row.
Row to channel mapping:
- Row 0: Channel 0 (Digital Synth 1)
- Row 1: Channel 1 (Digital Synth 2)
- Row 2: Channel 2 (Analog Synth)
- Row 3: Channel 9 (Drums)
:param row: Row index (0-3)
:param mute: True to mute, False to unmute
"""
channel = row if row < 3 else 9
self.mute_channel(channel, mute)
[docs]
def toggle_mute_row(self, row: int) -> bool:
"""
Toggle mute for a row.
:param row: Row index (0-3)
:return: New mute state (True if now muted)
"""
channel = row if row < 3 else 9
is_muted = channel in self.muted_channels
self.mute_channel(channel, not is_muted)
return not is_muted
[docs]
def is_row_muted(self, row: int) -> bool:
"""
Check if a row is muted.
:param row: Row index (0-3)
:return: True if row is muted
"""
channel = row if row < 3 else 9
return channel in self.muted_channels
[docs]
def process_playback_tick(self, total_steps: int) -> Optional[PlaybackPosition]:
"""
Process a playback timer tick.
Called by timer. Updates engine and returns current position.
:param total_steps: Total steps in pattern (bars * steps_per_bar)
:return: Updated PlaybackPosition or None if playback has stopped
"""
if not self.is_playing or self.is_paused:
return None
try:
# Drive engine via worker (same pattern as MIDI file editor)
self.worker.do_work()
# Calculate current position from engine state
tick = self._get_engine_tick()
ticks_per_step = self.config.ticks_per_beat // 4 # 16th notes
global_step = (
(tick // ticks_per_step) % total_steps if total_steps > 0 else 0
)
self.current_position.global_step = global_step
self.current_position.bar_index = global_step // self.config.measure_beats
self.current_position.step_in_bar = global_step % self.config.measure_beats
# Trigger callbacks for changes
if self.current_position.bar_index != self.last_bar_index:
self.last_bar_index = self.current_position.bar_index
if self.on_bar_changed:
self.on_bar_changed(self.current_position.bar_index)
if self.current_position.step_in_bar != self.last_step_in_bar:
self.last_step_in_bar = self.current_position.step_in_bar
if self.on_step_changed:
self.on_step_changed(self.current_position.step_in_bar)
if self.on_position_changed:
self.on_position_changed(self.current_position)
# Check if engine has finished
if self.playback_engine.state == TransportState.STOPPED:
self.stop_playback()
return None
return self.current_position
except Exception as ex:
log.error(
message=f"Error during playback tick: {ex}",
scope=self.scope,
)
return None
[docs]
def get_current_position(self) -> PlaybackPosition:
"""
Get the current playback position.
:return: PlaybackPosition object
"""
return self.current_position
[docs]
def get_playback_state(self) -> Dict[str, bool]:
"""
Get the current playback state.
:return: Dictionary with is_playing, is_paused, muted_channels
"""
return {
"is_playing": self.is_playing,
"is_paused": self.is_paused,
"muted_channels": self.muted_channels.copy(),
}
[docs]
def set_tempo(self, bpm: int) -> None:
"""
Set playback tempo.
:param bpm: Tempo in BPM
"""
bpm = max(20, min(300, bpm))
self.current_bpm = bpm
# Update timer interval if playing
if self.is_playing and self.timer:
self._update_timer_interval()
log.message(
message=f"Playback tempo set to {bpm} BPM",
scope=self.scope,
)
[docs]
def reload_playback_with_tempo(self, measures: List, bpm: int) -> bool:
"""
Rebuild MIDI with new tempo and resume from current position.
Call when tempo changes during playback.
:param measures: Current pattern measures
:param bpm: New tempo in BPM
:return: True if reload succeeded
"""
if not self.is_playing or self.is_paused:
return False
try:
tick = self._get_engine_tick()
self.current_bpm = max(20, min(300, bpm))
midi_file = self._build_midi_file_for_playback(measures)
self.playback_engine.load_file(midi_file)
for ch in range(16):
self.playback_engine.mute_channel(ch, ch in self.muted_channels)
if self.on_midi_event:
self.playback_engine.on_event = lambda m: self.on_midi_event(m)
self.playback_engine.start(tick)
return True
except Exception as ex:
log.error(
message=f"Error reloading playback tempo: {ex}",
scope=self.scope,
)
return False
[docs]
def get_tempo(self) -> int:
"""Get current playback tempo."""
return self.current_bpm
[docs]
def _build_midi_file_for_playback(self, measures: List) -> MidiFile:
"""
Build a MIDI file from the pattern for playback.
:param measures: List of PatternMeasure objects
:return: MidiFile ready for playback
"""
midi_file = MidiFile(
type=1,
ticks_per_beat=self.config.ticks_per_beat,
)
track = MidiTrack()
midi_file.tracks.append(track)
# Add tempo (use mido.bpm2tempo to match Playback Worker / MIDI editor)
tempo_us = bpm2tempo(self.current_bpm)
track.append(
MetaMessage(
MidoMetaMessageType.SET_TEMPO.value,
tempo=tempo_us,
time=0,
)
)
# Collect all events
events: List[SequencerEvent] = self._collect_sequencer_events(measures)
if not events:
return midi_file
# Convert events to MIDI messages with absolute timing
midi_events = []
for e in events:
note = build_midi_note(event=e, channel=e.channel, bpm=self.current_bpm)
midi_events.extend(
[
(e.tick, note_on(note)),
(e.tick + e.duration_ticks, note_off(note)),
]
)
# Sort by tick and convert to relative time
midi_events.sort(key=lambda x: x[0])
prev_tick = 0
for tick, msg in midi_events:
msg.time = tick - prev_tick
track.append(msg)
prev_tick = tick
return midi_file
[docs]
def _collect_sequencer_events(self, measures: List) -> List[SequencerEvent]:
"""
Collect all note events from measures.
:param measures: List of PatternMeasure objects
:return: List of SequencerEvent objects
"""
ticks_per_step = self.config.ticks_per_beat // 4
events: List[SequencerEvent] = []
for bar_index, measure in enumerate(measures):
for step in range(min(self.config.measure_beats, 16)):
# Skip if step is beyond measure buttons
tick = (bar_index * self.config.measure_beats + step) * ticks_per_step
for row in range(4):
# Skip if row is muted
channel = row if row < 3 else 9
if channel in self.muted_channels:
continue
# Skip if step is beyond buttons in row
if step >= len(measure.buttons[row]):
continue
button = measure.buttons[row][step]
# Skip unchecked buttons
if not button.isChecked():
continue
# Get note spec
spec = self._get_button_note_spec(button)
if not spec or not spec.is_active:
continue
# Validate velocity
velocity = max(0, min(127, spec.velocity))
# Calculate duration
duration_ticks = (
self._ms_to_ticks(spec.duration_ms) or ticks_per_step
)
events.append(
self._sequencer_event(
channel, duration_ticks, spec, tick, velocity
)
)
return events
[docs]
def _sequencer_event(
self,
channel: int,
duration_ticks: int,
spec: NoteButtonEvent,
tick: int,
velocity: int,
) -> SequencerEvent:
"""add sequencer event"""
return SequencerEvent(
tick=tick,
note=spec.note,
velocity=velocity,
channel=channel,
duration_ticks=duration_ticks,
)
[docs]
def _ms_to_ticks(self, duration_ms: float) -> int:
"""
Convert milliseconds to MIDI ticks.
Formula: ticks = (duration_ms / 1000) * (bpm / 60) * ticks_per_beat
:param duration_ms: Duration in milliseconds
:return: Duration in MIDI ticks
"""
if duration_ms <= 0:
return 0
bpm = self.current_bpm
ticks_per_beat = self.config.ticks_per_beat
ticks = bpm_to_ticks(bpm, duration_ms, ticks_per_beat)
return max(1, int(ticks))
[docs]
def _get_engine_tick(self) -> int:
"""
Get the current tick position from the playback engine.
:return: Absolute tick position
"""
try:
if self.playback_engine.events:
idx = self.playback_engine.event_index
if idx > 0 and idx <= len(self.playback_engine.events):
return self.playback_engine.events[idx - 1].absolute_tick
else:
return self.playback_engine.start_tick
return 0
except Exception:
return 0
[docs]
def _start_timer(self) -> None:
"""Start the playback timer."""
if self.timer and self.timer.isActive():
return
self.timer = QTimer()
self.timer.setTimerType(Qt.TimerType.PreciseTimer)
self._update_timer_interval()
# Note: timeout connection must be handled by the UI
[docs]
def _stop_timer(self) -> None:
"""Stop the playback timer."""
if self.timer:
self.timer.stop()
self.timer = None
[docs]
def _update_timer_interval(self) -> None:
"""Update timer interval based on current tempo."""
if self.timer:
# Use a short interval for smooth playback
self.timer.setInterval(self.config.playback_interval_ms)
[docs]
def set_config(self, config: PlaybackConfig) -> None:
"""
Update controller configuration.
:param config: New PlaybackConfig
"""
self.config = config
[docs]
def get_engine(self) -> PlaybackEngine:
"""
Get the underlying PlaybackEngine.
Useful for advanced control if needed.
:return: PlaybackEngine instance
"""
return self.playback_engine