Source code for jdxi_editor.ui.editors.midi_player.editor

"""
MIDI Player for JDXI Editor
"""

import time
from pathlib import Path
from typing import Any, Optional

import mido
from decologr import Decologr as log
from mido import Message, MidiFile, bpm2tempo
from PySide6.QtCore import QMargins, Qt, QThread, QTimer
from PySide6.QtWidgets import (
    QButtonGroup,
    QGridLayout,
    QGroupBox,
    QHBoxLayout,
    QLabel,
    QPushButton,
)

from jdxi_editor.midi.channel.channel import MidiChannel
from jdxi_editor.midi.data.address.address import (
    JDXiSysExAddress,
    JDXiSysExAddressStartMSB,
    JDXiSysExOffsetProgramLMB,
    JDXiSysExOffsetSystemUMB,
)
from jdxi_editor.midi.data.parameter.effects.effects import (
    DelayParam,
    Effect1Param,
    Effect2Param,
    ReverbParam,
)
from jdxi_editor.midi.io.helper import MidiIOHelper
from jdxi_editor.midi.playback.state import MidiPlaybackState
from jdxi_editor.midi.sysex.composer import JDXiSysExComposer
from jdxi_editor.ui.common import JDXi, QVBoxLayout, QWidget
from jdxi_editor.ui.editors.helpers.widgets import (
    create_jdxi_button_from_spec,
    create_jdxi_row,
)
from jdxi_editor.ui.editors.midi_player.automation import AutomationWidget
from jdxi_editor.ui.editors.midi_player.helper import build_panel
from jdxi_editor.ui.editors.midi_player.midi_analyzer import MidiAnalyzer
from jdxi_editor.ui.editors.midi_player.track.category import (
    CATEGORY_META,
    STR_TO_TRACK_CATEGORY,
    TrackCategory,
)
from jdxi_editor.ui.editors.midi_player.utils import format_time, tempo2bpm
from jdxi_editor.ui.editors.midi_player.widgets import MidiPlayerWidgets
from jdxi_editor.ui.editors.synth.editor import SynthEditor
from jdxi_editor.ui.preset.helper import JDXiPresetHelper
from jdxi_editor.ui.preset.source import PresetSource
from jdxi_editor.ui.preset.tone.digital.list import JDXiPresetToneListDigital
from jdxi_editor.ui.style.factory import generate_sequencer_button_style
from jdxi_editor.ui.widgets.digital.title import DigitalTitle
from jdxi_editor.ui.widgets.editor.base import EditorBaseWidget
from jdxi_editor.ui.widgets.editor.helper import (
    create_group_with_layout,
    create_layout_with_items,
    create_vertical_layout,
)
from jdxi_editor.ui.widgets.midi.file.viewer import MidiFileViewer
from jdxi_editor.ui.widgets.midi.utils import get_total_duration_in_seconds
from jdxi_editor.ui.widgets.classification_group import ClassificationGroup
from jdxi_editor.ui.widgets.event_suppression_group import EventSuppressionGroup
from jdxi_editor.ui.widgets.midi_file_group import MidiFileGroup
from jdxi_editor.ui.widgets.mute_channels_group import MuteChannelsGroup
from jdxi_editor.ui.widgets.transport.transport import TransportWidget
from jdxi_editor.ui.widgets.usb.recording import USBFileRecordingWidget
from jdxi_editor.ui.windows.jdxi.utils import show_message_box_from_spec
from picomidi.constant import Midi
from picomidi.message.type import MidoMessageType
from picomidi.playback.engine import PlaybackEngine
from picomidi.playback.worker import MidiPlaybackWorker
from picomidi.ui.widget.transport.spec import TransportSpec
from picoui.helpers import create_layout_with_inner_layouts, create_widget_with_layout
from picoui.specs.widgets import (
    ButtonSpec,
    CheckBoxSpec,
    FileSelectionMode,
    FileSelectionSpec,
    MessageBoxSpec,
    get_file_save_from_spec,
)

# Expose Qt symbols for tests that patch via jdxi_editor.ui.editors.io.player
# Tests expect these names to exist at module level
[docs] QApplication = None # alias placeholder for patching
[docs] QObject = None
[docs] Signal = None
[docs] Slot = None
[docs] class MidiFileAttrs: """Midi File Attributes"""
[docs] TICKS_PER_BEAT = "ticks_per_beat"
[docs] class MidiFilePlayer(SynthEditor): """ Midi File Editor """
[docs] BUFFER_WINDOW_SECONDS = 30.0
def __init__( self, midi_helper: Optional[MidiIOHelper] = None, parent: QWidget = None, preset_helper: JDXiPresetHelper = None, ): """ Initialize the MidiPlayer :param midi_helper: Optional[MidiIOHelper] :param parent: Optional[QWidget] :param preset_helper: Optional[JDXIPresetHelper] """ super().__init__()
[docs] self.midi_helper = midi_helper
[docs] self.tick_duration = None
[docs] self.specs: dict | None = None
[docs] self._last_position_label: QLabel | None = None
[docs] self.parent: QWidget = parent
[docs] self.preset_helper: JDXiPresetHelper = preset_helper
# Midi-related
[docs] self.midi_state: MidiPlaybackState = MidiPlaybackState()
[docs] self.playback_engine: PlaybackEngine = PlaybackEngine()
[docs] self.midi_analyzer: MidiAnalyzer = MidiAnalyzer()
[docs] self.midi_playback_worker: MidiPlaybackWorker = MidiPlaybackWorker(parent=self)
self.midi_playback_worker.set_tempo.connect(self.update_tempo_us_from_worker)
[docs] self.midi_total_ticks: int | None = None
[docs] self.midi_port = self.midi_helper.midi_out
self.midi_timer_init()
[docs] self.current_tempo_bpm = None # Store current tempo BPM for digital
[docs] self.midi_preferred_channels = { MidiChannel.DIGITAL_SYNTH_1, MidiChannel.DIGITAL_SYNTH_2, MidiChannel.ANALOG_SYNTH, MidiChannel.DRUM_KIT, } # MIDI channels 1, 2, 3, 10 (zero-based)
# Initialize UI attributes
[docs] self.usb_recorder = USBFileRecordingWidget(self.midi_state)
[docs] self.automation = AutomationWidget(self.midi_state, parent=self)
[docs] self.ui = MidiPlayerWidgets()
[docs] self.transport: TransportWidget = TransportWidget(parent=self)
[docs] self.midi_file_group: MidiFileGroup = MidiFileGroup( parent=self, midi_state=self.midi_state )
[docs] self.event_suppression_group: EventSuppressionGroup = EventSuppressionGroup( parent=self, midi_state=self.midi_state )
[docs] self.classification_group: ClassificationGroup = ClassificationGroup( parent=self, midi_state=self.midi_state )
[docs] self.mute_channels_group: MuteChannelsGroup = MuteChannelsGroup( parent=self, midi_state=self.midi_state )
[docs] self.midi_file = MidiFileViewer(midi_state=self.midi_state, parent=self)
self.specs = self._build_specs() self.ui_init()
[docs] def midi_timer_init(self): """ Initialize or reinitialize the MIDI playback timer. Ensures previous connections are safely removed. """ try: if self.midi_state.timer: self.midi_state.timer.stop() self.midi_state.timer.timeout.disconnect() except Exception as ex: log.warning(f"⚠️ Failed to disconnect old midi.timer: {ex}") timer = QTimer(self) timer.timeout.connect(self.midi_play_next_event) self.midi_state.timer = timer
[docs] def ui_ensure_timer_connected(self): """ ui_ensure_timer_connected :return: Ensure the midi_play_next_event is connected to midi.timer.timeout """ try: self.midi_state.timer.timeout.disconnect(self.midi_play_next_event) except TypeError: pass # Already disconnected except Exception as ex: log.warning(f"⚠️ Could not disconnect midi_play_next_event: {ex}") try: self.midi_state.timer.timeout.connect(self.midi_play_next_event) except Exception as ex: log.error(f"❌ Failed to connect midi_play_next_event: {ex}")
[docs] def ui_init(self): """ Initialize the UI for the MidiPlayer. """ panel_widgets = [ build_panel(self._build_left_panel), build_panel(self._build_right_panel), ] # --- Top horizontal layout: file title and right-hand controls header_layout = create_layout_with_items( items=panel_widgets, start_stretch=False, end_stretch=False, margins=QMargins(0, 0, 0, 0), spacing=0, ) header_widget = create_widget_with_layout(header_layout) # --- Use EditorBaseWidget for consistent scrollable layout structure self.base_widget = EditorBaseWidget(parent=self, analog=False) self.base_widget.setup_scrollable_content() # --- Create content widget centered_layout = create_layout_with_items( items=[ self.transport, self.midi_file_group, self.event_suppression_group, ], vertical=False, start_stretch=False, end_stretch=False, margins=QMargins(0, 0, 0, 0), spacing=0, ) centered_widget = create_widget_with_layout(centered_layout) # Vertical stack: header, ruler, track viewer (stretch), controls. # So MidiTrackViewer and its MidiTrackWidgets get space and are visible. main_layout = create_vertical_layout(margins=QMargins(0, 0, 0, 0), spacing=0) main_layout.addWidget(header_widget) main_layout.addWidget(self.midi_file) main_layout.addWidget( self.midi_file.midi_track_viewer, 1 ) # stretch so track viewer gets remaining space main_layout.addWidget(centered_widget) # Add content to base widget container_layout = self.base_widget.get_container_layout() content_widget = create_widget_with_layout(main_layout) container_layout.addWidget(content_widget) self._add_base_widget_to_editor()
[docs] def _add_base_widget_to_editor(self): """Add base widget to editor's layout""" if not hasattr(self, "main_layout") or self.main_layout is None: self.main_layout = QVBoxLayout(self) self.setLayout(self.main_layout) self.main_layout.addWidget(self.base_widget)
[docs] def _build_left_panel(self) -> QVBoxLayout: """Build left panel""" # --- Create vertical layout for title and drum detection button self.ui.digital_title_file_name = DigitalTitle( tone_name="No file loaded", show_upper_text=True ) layout = create_layout_with_items( items=[self.ui.digital_title_file_name, self.classification_group], vertical=True, start_stretch=False, end_stretch=False, margins=QMargins(0, 0, 0, 0), spacing=0, ) return layout
[docs] def _build_right_panel(self): """Build right panel""" right_panel_layout = create_layout_with_inner_layouts( inner_layouts=[self.init_automation_usb_grid()], vertical=True, stretch=False, ) right_panel_layout.addWidget(self.mute_channels_group) right_panel_layout.setContentsMargins(QMargins(0, 0, 0, 0)) return right_panel_layout
[docs] def detect_and_assign_drum_tracks(self) -> None: """ Detect drum tracks in the loaded MIDI file and assign Channel 10 (1-based) to them. This analyzes all tracks using heuristics and automatically sets the channel spinboxes for tracks identified as drum tracks. """ if not self._validate_midi_loaded(): return try: drum_tracks = self.midi_analyzer.get_drum_tracks( self.midi_state.file, min_score=70.0 ) if not drum_tracks: show_message_box_from_spec( self.specs["message_box"]["no_drum_tracks_found"] ) return # --- Update the channel spinboxes for detected drum tracks # --- Channel 10 (1-based) = Channel 9 (0-based) drum_channel_display = 10 # 1-based digital channel drum_channel_binary = 9 # 0-based binary channel updated_tracks = [] for track_index, analysis in drum_tracks: if track_index in self.midi_file.midi_track_viewer.track_channel_spins: spin = self.midi_file.midi_track_viewer.track_channel_spins[ track_index ] spin.setValue(drum_channel_display) track_name = ( analysis.get("track_name") or f"Track {track_index + 1}" ) updated_tracks.append( f"Track {track_index + 1} ({track_name}) - Score: {analysis['score']:.1f}" ) # --- Show success message tracks_list = "\n".join(f" • {t}" for t in updated_tracks) message = ( f"🎵 Detected {len(updated_tracks)} drum track(s) and set to Channel 10" ) show_message_box_from_spec( self.specs["message_box"]["drum_tracks_detected"], message=message ) log.message(message=message) except Exception as ex: log.error(f"❌ Error detecting drum tracks: {ex}") import traceback log.error(traceback.format_exc()) show_message_box_from_spec( self.specs["message_box"]["error_detect_drums"], message=f"An error occurred while detecting drum tracks:\n\n{ex}", )
[docs] def classify_and_assign_tracks(self) -> None: """classify and assign tracks""" if not self._validate_midi_loaded(): return try: drum_indices = self._detect_drum_tracks() classifications = self._classify_tracks(drum_indices) updated_tracks = self._apply_channel_assignments(classifications) if not self._has_classified_tracks(updated_tracks): self._show_no_tracks_message() return message = self._build_classification_message(updated_tracks) self._show_info_message("Tracks Classified", message) log.message("🎵 Track classification completed") except Exception as ex: self._handle_classification_error(ex)
[docs] def _validate_midi_loaded(self) -> bool: """validate that a MIDI file is loaded""" if not self.midi_state.file: self._show_warning( title="No MIDI File Loaded", message="Please load a MIDI file first before classifying tracks.", ) return False return True
[docs] def _detect_drum_tracks(self) -> list[int]: """Return track indices identified as drum tracks.""" drum_tracks = self.midi_analyzer.get_drum_tracks( self.midi_state.file, min_score=70.0 ) return [idx for idx, _ in drum_tracks]
[docs] def _classify_tracks(self, drum_indices: list[int]): """Classify non-drum tracks into Bass, Keys/Guitars, Strings.""" return self.midi_analyzer.get_classifications( self.midi_state.file, exclude_drum_indices=drum_indices, min_score=30.0 )
[docs] def _apply_channel_assignments(self, classifications) -> dict[str, list[str]]: """apply channel assignments""" updated: dict[str, list[str]] = { TrackCategory.BASS: [], TrackCategory.KEYS_GUITARS: [], TrackCategory.STRINGS: [], TrackCategory.UNCLASSIFIED: [], } for category_str, tracks in classifications.items(): category = STR_TO_TRACK_CATEGORY.get( category_str, TrackCategory.UNCLASSIFIED ) if category == TrackCategory.UNCLASSIFIED: continue meta = CATEGORY_META[category] for track_index, analysis in tracks: spin = self.midi_file.midi_track_viewer.track_channel_spins.get( track_index ) if not spin: continue spin.setValue(meta.channel) track_name = ( getattr(analysis, "track_name", None) or f"Track {track_index + 1}" ) score = analysis.scores[category] updated[category].append( f"Track {track_index + 1} ({track_name}) - Score: {score:.1f}" ) self._handle_unclassified_tracks(classifications, updated) return updated
[docs] def _handle_unclassified_tracks( self, classifications, updated: dict[str, list[str]] ): """Handle unclassified""" for track_index, analysis in classifications.get("unclassified", []): track_name = ( getattr(analysis, "track_name", None) or f"Track {track_index + 1}" ) scores = analysis.scores max_category, max_score = max(scores.items(), key=lambda x: x[1]) updated[TrackCategory.UNCLASSIFIED].append( f"Track {track_index + 1} ({track_name}) - Best: {max_category} ({max_score:.1f})" )
[docs] def _has_classified_tracks(self, updated_tracks: dict) -> bool: """Return True if at least one track was classified (excluding unclassified).""" return ( sum( len(v) for k, v in updated_tracks.items() if k != TrackCategory.UNCLASSIFIED ) > 0 )
[docs] def _show_no_tracks_message(self) -> None: """Show the no-tracks-classified message box.""" show_message_box_from_spec(self.specs["message_box"]["no_tracks_classified"])
[docs] def _build_classification_message(self, updated: dict[str, list[str]]) -> str: """build classification message""" total_classified = sum( len(v) for k, v in updated.items() if k != TrackCategory.UNCLASSIFIED ) parts = [f"Classified {total_classified} track(s):\n"] for category in CATEGORY_META: tracks = updated[category] if not tracks: continue meta = CATEGORY_META[category] parts.append( f"\n{meta.emoji} {meta.label} " f"(Channel {meta.channel} - {meta.engine}):" ) for track in tracks: parts.append(f" • {track}") unclassified = updated[TrackCategory.UNCLASSIFIED] if unclassified: parts.append(f"\n❓ Unclassified ({len(unclassified)} track(s)):") for track in unclassified[:5]: parts.append(f" • {track}") if len(unclassified) > 5: parts.append(f" ... and {len(unclassified) - 5} more") parts.append("\n\nClick 'Apply All Track Changes' to save the changes.") return "\n".join(parts)
[docs] def _show_info_message(self, title: str, message: str) -> None: """show info dialog""" show_message_box_from_spec( self.specs["message_box"]["info"], title=title, message=message )
[docs] def _show_warning(self, title: str, message: str) -> None: """show warning""" show_message_box_from_spec( self.specs["message_box"]["warning"], title=title, message=message )
[docs] def _handle_classification_error(self, ex: Exception) -> None: """handle classification error""" log.error(f"❌ Error classifying tracks: {ex}") import traceback log.error(traceback.format_exc()) show_message_box_from_spec( self.specs["message_box"]["error_classify_tracks"], message=f"An error occurred while classifying tracks:\n\n{ex}", )
[docs] def init_automation_usb_grid(self) -> QGridLayout: """ Create a grid layout containing Automation, USB Port, and USB File controls. """ grid = QGridLayout() grid.setContentsMargins(QMargins(0, 0, 0, 0)) row = 0 grid.addWidget(self.automation, row, 0, 1, 5) row += 1 grid.addWidget(self.usb_recorder, row, 0, 1, 5) self.automation.populate_automation_programs(PresetSource.DIGITAL) return grid
[docs] def insert_program_change_current_position(self) -> None: """ Insert Bank Select (CC#0, CC#32) and Program Change at the current slider time. """ if not self.midi_state.file: return # Time in seconds from slider current_seconds = float(self.midi_file.position_slider.value()) # Channel (digital is 1-16, convert to 0-based) display_channel = int(self.automation.automation_channel_combo.currentData()) channel = display_channel - 1 # Selected program triple (msb, lsb, pc) data = self.automation.automation_program_combo.currentData() if not data: return msb, lsb, pc = data # Convert seconds to absolute ticks (approx using current tempo at position) try: tempo_usecs = self.midi_state.tempo_at_position or Midi.tempo.BPM_120_USEC abs_ticks = int( mido.second2tick( current_seconds, self.midi_state.file.ticks_per_beat, tempo_usecs ) ) except Exception: abs_ticks = int(current_seconds / max(self.tick_duration, 1e-9)) # Find a target track that uses this channel, else use track 0 track_index = self._find_track_for_channel(channel) target_track = self.midi_state.file.tracks[track_index] # Build messages: CC#0, CC#32, Program Change (PC is 0-based in MIDI spec) msgs = [ self._build_message( message_type=MidoMessageType.CONTROL_CHANGE.value, channel=channel, value=int(msb), ), self._build_message( message_type=MidoMessageType.CONTROL_CHANGE.value, channel=channel, value=int(lsb), ), self._build_message( message_type=MidoMessageType.PROGRAM_CHANGE.value, channel=channel, program=max(0, int(pc) - 1), ), ] self._insert_messages_at_abs_tick(target_track, abs_ticks, msgs) # Refresh viewer and internal state self.midi_file.midi_track_viewer.set_midi_file(self.midi_state.file) # Sync mute buttons after setting MIDI file self._sync_mute_buttons_from_track_viewer() self.midi_extract_events() self.calculate_duration() self.midi_file_position_label_update_time() # Add a visual marker to the time ruler try: preset_label = self.automation.automation_program_combo.currentText() short_label = ( preset_label.split(" ")[1] if " " in preset_label else preset_label ) self.midi_file.midi_track_viewer.ruler.add_marker( current_seconds, label=short_label ) except Exception: # Fail-safe: add without label self.midi_file.midi_track_viewer.ruler.add_marker(current_seconds)
[docs] def _build_message( self, message_type: str, channel: int, value: int = None, program: int = None ) -> Message: """Build message""" if program is None: message = mido.Message( type=message_type, control=0, value=int(value), channel=channel, time=0 ) else: message = mido.Message( type=message_type, program=program, channel=channel, time=0 ) return message
[docs] def _find_track_for_channel(self, channel: int) -> int: for i, track in enumerate(self.midi_state.file.tracks): for msg in track: if hasattr(msg, "channel") and msg.channel == channel: return i return 0
[docs] def _insert_messages_at_abs_tick( self, track: mido.MidiTrack, abs_tick_target: int, new_msgs: list[mido.Message] ) -> None: """ Insert a list of messages at a given absolute tick, preserving delta times. """ # Compute absolute ticks for each existing message, then rebuild with adjusted deltas abs_tick = 0 rebuilt: list[mido.Message] = [] inserted = False for msg in track: next_abs = abs_tick + msg.time if not inserted and next_abs >= abs_tick_target: # Insert right before this message # Delta from current abs_tick to target insert_delta = max(0, abs_tick_target - abs_tick) # First, push any time before insertion if insert_delta > 0: rebuilt.append( mido.Message( MidoMessageType.NOTE_ON.value, note=0, velocity=0, time=insert_delta, ) ) # Replace the placeholder with zero-time; we will discard it below rebuilt.pop() # Append new messages with proper delta times first = True for nm in new_msgs: nm_copy = nm.copy(time=insert_delta if first else 0) rebuilt.append(nm_copy) first = False insert_delta = 0 # Now adjust the current message's delta to account for insertion at target adjusted_time = max(0, next_abs - abs_tick_target) msg = msg.copy(time=adjusted_time) inserted = True rebuilt.append(msg) abs_tick = next_abs if not inserted: # Append at end; ensure the first inserted msg gets appropriate delta first = True for nm in new_msgs: nm_copy = nm.copy( time=0 if not first else max(0, abs_tick_target - abs_tick) ) rebuilt.append(nm_copy) first = False # --- Write back track.clear() for m in rebuilt: track.append(m)
[docs] def _toggle_channel_mute(self, channel: int, is_muted: bool, btn) -> None: """ Toggle mute state for a specific MIDI channel. Updates both the track viewer and the player's muted channels state. :param channel: int MIDI channel (1-16) :param is_muted: bool is the channel muted? """ # Update track viewer's mute state (this will also update track viewer's buttons) if self.midi_file and self.midi_file.midi_track_viewer: self.midi_file.midi_track_viewer.toggle_channel_mute(channel, is_muted) # Sync the track viewer's mute buttons if channel in self.midi_file.midi_track_viewer.mute_buttons: self.midi_file.midi_track_viewer.mute_buttons[channel].setChecked( is_muted ) self.midi_file.midi_track_viewer.mute_buttons[channel].setStyleSheet( generate_sequencer_button_style( not is_muted, checked_means_inactive=True ) ) # Update player's muted channels state self.midi_state.muted_channels = self.get_muted_channels()
[docs] def _sync_mute_buttons_from_track_viewer(self) -> None: """ Sync mute channel buttons in the USB file controls with the track viewer's state. Called when MIDI file is loaded or track viewer's mute state changes. """ if not hasattr(self, "mute_channels_group") or not self.mute_channels_group: return mute_channel_buttons = self.mute_channels_group.mute_channel_buttons if self.midi_file and self.midi_file.midi_track_viewer: muted_channels = self.midi_file.midi_track_viewer.get_muted_channels() for channel, btn in mute_channel_buttons.items(): btn.blockSignals(True) btn.setChecked(channel in muted_channels) btn.blockSignals(False)
[docs] def apply_all_track_changes(self) -> None: """ Apply all Track Name and MIDI Channel changes. Calls the track viewer's apply_all_track_changes method. """ if self.midi_file and self.midi_file.midi_track_viewer: self.midi_file.midi_track_viewer.apply_all_track_changes()
[docs] def apply_channel_presets(self) -> None: """ Send live MIDI Program Changes to the synth and insert presets into the MIDI file. Ch 1→Picked Bass, Ch 2→Piano, Ch 3→JP8 Strings. """ log.message( scope=self.__class__.__name__, message="apply_channel_presets called (Apply Presets button)", ) if not self.midi_file or not self.midi_file.midi_track_viewer: log.message( scope=self.__class__.__name__, message="Early return: no midi_file or midi_track_viewer", ) return tv = self.midi_file.midi_track_viewer channel_preset_map = tv._CHANNEL_PRESET_MAP pc_map = JDXiPresetToneListDigital.PROGRAM_CHANGE log.parameter( scope=self.__class__.__name__, message="midi_helper", parameter=self.midi_helper, ) log.parameter( scope=self.__class__.__name__, message="track_channel_spins count", parameter=len(tv.track_channel_spins) if tv.track_channel_spins else 0, ) if tv.track_channel_spins: ch_values = { i: tv.track_channel_spins[i].value() for i in sorted(tv.track_channel_spins) } log.parameter( scope=self.__class__.__name__, message="track_channel_spins (track_idx -> display_ch)", parameter=ch_values, ) # Send live MIDI to synth for each channel with a preset mapping if self.midi_helper: seen_channels: set[int] = set() sent_count = 0 for i in tv.track_channel_spins: display_ch = tv.track_channel_spins[i].value() if display_ch in seen_channels: continue preset_num = channel_preset_map.get(display_ch) if preset_num is None or preset_num not in pc_map: log.parameter( scope=self.__class__.__name__, message=f"Skip Ch {display_ch}: preset_num={preset_num}", parameter=preset_num, ) continue seen_channels.add(display_ch) spec = pc_map[preset_num] channel = display_ch + Midi.channel.DISPLAY_TO_BINARY # 0-based log.message( scope=self.__class__.__name__, message=f"Sending live MIDI: Ch {display_ch} -> preset {preset_num} ({spec.get('Name', '?')})", ) self.midi_helper.send_bank_select_and_program_change( channel=channel, bank_msb=spec["MSB"], bank_lsb=spec["LSB"], program=max(0, spec["PC"] - 1), ) sent_count += 1 log.parameter( scope=self.__class__.__name__, message="Live MIDI presets sent", parameter=sent_count, ) else: log.message( scope=self.__class__.__name__, message="No midi_helper: skipping live MIDI send", ) log.message( scope=self.__class__.__name__, message="Calling track_viewer.apply_channel_presets (file update)", ) tv.apply_channel_presets()
[docs] def _create_transport_control( self, spec: TransportSpec, layout: QHBoxLayout, button_group: QButtonGroup | None, ) -> None: """Create a transport button + label row""" # ---- Button btn = create_jdxi_button_from_spec(spec, button_group) setattr(self.ui, f"{spec.name}_button", btn) layout.addWidget(btn) # ---- Label row pixmap = JDXi.UI.Icon.get_icon_pixmap( spec.icon, color=JDXi.UI.Style.FOREGROUND, size=20 ) label_row, text_label = create_jdxi_row(spec.text, icon_pixmap=pixmap) setattr(self.ui, f"{spec.name}_label", text_label) layout.addWidget(label_row)
[docs] def update_tempo_us_from_worker(self, tempo_us: int) -> None: """ update_tempo_us_from_worker :param tempo_us: int tempo in microseconds e.g 500_000 :return: None """ log.parameter("tempo_us", tempo_us) log.message(f"Updating tempo to {tempo_us} microseconds from worker") # self.refill_midi_message_buffer() self.ui_display_set_tempo_usecs(tempo_us)
[docs] def update_playback_worker_tempo_us(self, tempo_us: int) -> None: """ update_playback_worker_tempo_us :param tempo_us: tempo in microseconds e.g 500_000 :return: None """ log.parameter("tempo_us", tempo_us) log.message(f"Updating tempo to {tempo_us} microseconds from editor") if self.midi_playback_worker: self.midi_playback_worker.update_tempo(tempo_us)
[docs] def setup_worker(self): """ setup_worker :return: None Setup the worker and thread for threaded playback using QTimer """ # Clean up any previous worker/thread if self.midi_state.playback_thread: self.midi_state.playback_thread.quit() self.midi_state.playback_thread.wait() self.midi_state.playback_thread.deleteLater() self.midi_playback_worker = None # Create worker with correct initial tempo if available initial_tempo = getattr( self.midi_state, "tempo_at_position", Midi.tempo.BPM_120_USEC ) self.midi_playback_worker = MidiPlaybackWorker(parent=self) self.midi_playback_worker.set_tempo.connect(self.update_tempo_us_from_worker) self.midi_playback_worker.position_tempo = initial_tempo self.midi_playback_worker.initial_tempo = initial_tempo log.message(f"self.midi_playback_worker: {self.midi_playback_worker}") # self.midi_playback_worker.set_editor(self) self.midi_state.playback_thread = QThread() self.midi_playback_worker.moveToThread(self.midi_state.playback_thread) self.midi_playback_worker.result_ready.connect( self.midi_playback_worker_handle_result ) # optional for UI update # QTimer lives in the main thread, but calls worker.do_work() self.midi_state.timer = QTimer(self) self.midi_state.timer.setInterval(JDXi.UI.Constants.TIMER_INTERVAL) # Note: Worker connection is handled in midi_playback_start() to avoid conflicts self.midi_state.playback_thread.start()
[docs] def midi_playback_worker_stop(self): """ midi_playback_worker_stop :return: None """ self.playback_engine.stop() if self.midi_state.timer.isActive(): self.midi_state.timer.stop() if self.midi_playback_worker: self.midi_playback_worker.stop() # signal to stop processing if self.midi_state.playback_thread: self.midi_state.playback_thread.quit() self.midi_state.playback_thread.wait() self.midi_state.playback_thread.deleteLater() self.midi_state.playback_thread = None self.midi_playback_worker = None
[docs] def on_suppress_program_changes_toggled(self, state: Qt.CheckState) -> None: """ on_suppress_program_changes_toggled :param state: Qt.CheckState :return: None """ self.midi_state.suppress_program_changes = state == JDXi.UI.Constants.CHECKED log.message( f"Suppress MIDI Program Changes = {self.midi_state.suppress_program_changes}" )
[docs] def on_suppress_control_changes_toggled(self, state: Qt.CheckState): """ on_suppress_control_changes_toggled :param state: Qt.CheckState :return: """ self.midi_state.suppress_control_changes = state == JDXi.UI.Constants.CHECKED log.message( f"Suppress MIDI Control Changes = {self.midi_state.suppress_control_changes}" )
[docs] def midi_save_file(self) -> None: """ midi_save_file :return: None Save the current MIDI file to disk. """ save_file_spec = FileSelectionSpec( caption="Save MIDI File", dir="", filter="MIDI Files (*.mid)" ) file_path = get_file_save_from_spec(save_file_spec, parent=self) if file_path: self.midi_file.midi_track_viewer.midi_file.save(file_path) file_name = f"Saved: {Path(file_path).name}" self.ui.digital_title_file_name.setText(file_name) # Update digital to show tempo only (no bar when not playing) if self.current_tempo_bpm is not None: self.ui.digital_title_file_name.set_upper_display_text( f"Tempo: {round(self.current_tempo_bpm)} BPM" )
[docs] def midi_load_file(self) -> None: """ Load a MIDI file and initialize parameters """ load_file_spec = FileSelectionSpec( mode=FileSelectionMode.LOAD, caption="Open MIDI File", dir="", filter="MIDI Files (*.mid)", ) file_path = get_file_save_from_spec(load_file_spec, parent=self) if not file_path: return self.midi_load_file_from_path(file_path)
[docs] def midi_load_file_from_path(self, file_path: str) -> None: """ Load a MIDI file from a given path and initialize parameters. :param file_path: Path to the MIDI file """ if not file_path: return self.midi_state.file = MidiFile(file_path) # Store filename in the MidiFile object for later use self.midi_state.file.filename = file_path file_name = f"Loaded: {Path(file_path).name}" self.ui.digital_title_file_name.setText(file_name) # Update digital to show tempo only (no bar when not playing) if self.current_tempo_bpm is not None: self.ui.digital_title_file_name.set_upper_display_text( f"Tempo: {round(self.current_tempo_bpm)} BPM" ) self.midi_file.midi_track_viewer.clear() self.midi_file.midi_track_viewer.set_midi_file(self.midi_state.file) # Sync mute buttons after loading MIDI file self._sync_mute_buttons_from_track_viewer() # Auto-generate WAV filename if checkbox is enabled if self.usb_recorder.file_auto_generate_checkbox.isChecked(): self.usb_recorder.update_auto_wav_filename() # Ensure ticks_per_beat is available early self.ticks_per_beat = self.midi_state.file.ticks_per_beat initial_track_tempos = self.detect_initial_tempo() self.ui_display_set_tempo_usecs(self.midi_state.tempo_initial) self.midi_state.tempo_at_position = ( self.midi_state.tempo_initial ) # Set initial tempo for playback log.parameter("Initial track tempos", initial_track_tempos) self.midi_channel_select() self.midi_extract_events() self.setup_worker() self.calculate_duration() self.calculate_tick_duration() self.ui_position_slider_reset() # Notify Pattern Sequencer if it exists self._notify_pattern_sequencer_file_loaded() # Add to recent files if parent has recent_files_manager if ( hasattr(self.parent, "recent_files_manager") and self.parent.recent_files_manager ): try: self.parent.recent_files_manager.add_file(file_path) if hasattr(self.parent, "_update_recent_files_menu"): # Check if menu still exists before updating if ( hasattr(self.parent, "recent_files_menu") and self.parent.recent_files_menu is not None ): try: self.parent._update_recent_files_menu() except RuntimeError: # Menu was deleted, skip update log.debug("Recent files menu was deleted, skipping update") except Exception as ex: log.debug(f"Error updating recent files menu: {ex}")
[docs] def calculate_tick_duration(self): """ calculate_tick_duration :return: Calculate the duration of a single MIDI tick in seconds. """ # Guard: ensure ticks_per_beat is set if ( not hasattr(self, MidiFileAttrs.TICKS_PER_BEAT) or self.ticks_per_beat is None ): # Fallback to current file's ticks_per_beat if available if self.midi_state.file is not None: self.ticks_per_beat = getattr( self.midi_state.file, MidiFileAttrs.TICKS_PER_BEAT, 480 ) else: self.ticks_per_beat = 480 self.tick_duration = ( self.midi_state.tempo_at_position / Midi.tempo.MILLISECONDS_PER_SECOND / self.ticks_per_beat )
[docs] def calculate_duration(self) -> None: """ calculate_duration :return: None Accurate Total Duration Calculation """ # Handle empty events list gracefully if not getattr(self.midi_state, "events", None): self.midi_total_ticks = 0 else: self.midi_total_ticks = max(t for t, _, _ in self.midi_state.events) self.midi_state.file_duration_seconds = get_total_duration_in_seconds( self.midi_state.file )
[docs] def midi_channel_select(self) -> None: """ Select a suitable MIDI channel for playback from the file. """ selected_channel = self.midi_analyzer.get_preferred_channel( self.midi_state.file, self.midi_preferred_channels ) if selected_channel is None: selected_channel = MidiChannel.DIGITAL_SYNTH_1 log.warning("No suitable channel found; defaulting to channel 1") self.midi_state.channel_selected = selected_channel
[docs] def midi_extract_events(self) -> None: """ midi_extract_events :return: None Extract events from the MIDI file and store them in the midi_state. """ events = [] for track_index, track in enumerate(self.midi_state.file.tracks): abs_time = 0 for msg in track: abs_time += msg.time events.append((abs_time, msg, track_index)) # Ensure ticks_per_beat is set before calculations if ( not hasattr(self, MidiFileAttrs.TICKS_PER_BEAT) or self.ticks_per_beat is None ): self.ticks_per_beat = getattr( self.midi_state.file, MidiFileAttrs.TICKS_PER_BEAT, 480 ) self.calculate_tick_duration() self.midi_state.events = sorted(events, key=lambda x: x[0])
[docs] def detect_initial_tempo(self) -> dict[int, int]: """ Detect initial tempo from the file and set midi_state.tempo_initial. :return: dict[track_number, tempo_usec] for tracks that have set_tempo """ tempo_initial, initial_track_tempos = self.midi_analyzer.get_initial_tempo( self.midi_state.file ) self.midi_state.tempo_initial = tempo_initial log.parameter("self.tempo", self.midi_state.tempo_initial) return initial_track_tempos
[docs] def ui_display_set_tempo_usecs(self, tempo_usecs: int) -> None: """ ui_display_set_tempo_usecs :param tempo_usecs: int tempo in microseconds :return: None Set the tempo in the UI and log it. """ log.message(f"Setting tempo to {tempo_usecs} microseconds on UI") self.on_tempo_usecs_changed(tempo_usecs) bpm = tempo2bpm(tempo_usecs) log.parameter("tempo_bpm", bpm) log.message(f"Setting tempo to {bpm} BPM on UI") self.set_display_tempo_bpm(bpm)
[docs] def set_display_tempo_bpm(self, tempo_bpm: float) -> None: """ set_display_tempo_bpm :param tempo_bpm: float tempo in BPM :return: None Set the tempo in the UI and log it. Also pushes tempo to Pattern Sequencer so both editors stay in sync when using the same playback pipeline. """ self.current_tempo_bpm = tempo_bpm self.update_upper_display_with_tempo_and_bar() log.parameter("tempo_bpm", tempo_bpm) self._push_tempo_to_pattern_sequencer(tempo_bpm)
[docs] def update_upper_display_with_tempo_and_bar( self, elapsed_time: Optional[float] = None ) -> None: """ Update the upper digital with tempo and optionally bar number. :param elapsed_time: Optional elapsed time for bar calculation. If None, uses current playback state. """ if self.current_tempo_bpm is None: return tempo_text = f"Tempo: {round(self.current_tempo_bpm)} BPM" # Append bar number if playing or paused if elapsed_time is not None or ( self.midi_state.timer and (self.midi_state.timer.isActive() or self.midi_state.playback_paused) ): if elapsed_time is None and self.midi_state.playback_start_time: elapsed_time = time.time() - self.midi_state.playback_start_time if elapsed_time is not None and self.midi_state.file: current_bar = self.calculate_current_bar(elapsed_time) if current_bar is not None: tempo_text += f" | Bar {int(current_bar)}" self.ui.digital_title_file_name.set_upper_display_text(tempo_text)
[docs] def turn_off_effects(self) -> None: """ Turn off all effects (Effect 1, Effect 2, Delay, Reverb) when starting playback. This prevents distortion and other effects from being accidentally enabled. """ if not self.midi_helper: return try: # Create SysEx composer and address for effects sysex_composer = JDXiSysExComposer() address = JDXiSysExAddress( JDXiSysExAddressStartMSB.TEMPORARY_PROGRAM, JDXiSysExOffsetSystemUMB.COMMON, JDXiSysExOffsetProgramLMB.COMMON, Midi.value.ZERO, ) # Turn off Effect 1: Set level to 0 and type to Thru (0) efx1_address = address.add_offset((0, 2, 0)) efx1_level_msg = sysex_composer.compose_message( address=efx1_address, param=Effect1Param.EFX1_LEVEL, value=0 ) efx1_type_msg = sysex_composer.compose_message( address=efx1_address, param=Effect1Param.EFX1_TYPE, value=0, # Thru (no effect) ) # Turn off Effect 2: Set level to 0 and type to OFF (0) efx2_address = address.add_offset((0, 4, 0)) efx2_level_msg = sysex_composer.compose_message( address=efx2_address, param=Effect2Param.EFX2_LEVEL, value=0 ) efx2_type_msg = sysex_composer.compose_message( address=efx2_address, param=Effect2Param.EFX2_TYPE, value=0 # OFF ) # Turn off Delay: Set level to 0 delay_address = address.add_offset((0, 6, 0)) delay_level_msg = sysex_composer.compose_message( address=delay_address, param=DelayParam.DELAY_LEVEL, value=0 ) # Turn off Reverb: Set level to 0 reverb_address = address.add_offset((0, 8, 0)) reverb_level_msg = sysex_composer.compose_message( address=reverb_address, param=ReverbParam.REVERB_LEVEL, value=0 ) # Send all messages messages = [ efx1_level_msg, efx1_type_msg, efx2_level_msg, efx2_type_msg, delay_level_msg, reverb_level_msg, ] for msg in messages: self.midi_helper.send_midi_message(msg) time.sleep(0.01) # Small delay between messages log.message( "🎛️ Turned off all effects (Effect 1, Effect 2, Delay, Reverb) at playback start" ) except Exception as ex: log.error(f"❌ Error turning off effects: {ex}") import traceback log.error(traceback.format_exc())
[docs] def midi_playback_start(self): """ Start playback of the MIDI file from the beginning (or resume if paused). """ # Reset position slider to beginning self.transport.set_state("play") self.ui_position_slider_reset() # Turn off all effects when starting playback (prevents accidental distortion) # if not self.midi_state.playback_paused: # self.turn_off_effects() if not self.midi_state.file or not self.midi_state.events: return self._test_paused_state_then_rewind() # If paused, resume from current position (pause logic handles this) # Ensure worker is properly set up before connecting self.setup_playback_worker() if self.midi_state.file: start_tick = self.calculate_start_tick() if start_tick is None: start_tick = 0 self.playback_engine.start(start_tick) self._disconnect_ui() self._connect_worker() self._connect_ui() self.usb_recorder.start_recording() try: # Start the playback worker (already set up above) self.start_playback_worker() except Exception as ex: log.error(f"Error {ex} occurred starting playback")
[docs] def _test_paused_state_then_rewind(self): # If not paused, reset everything to start from beginning if not self.midi_state.playback_paused: # Clear buffer and reset playback position to beginning self.midi_state.event_buffer.clear() self.midi_state.buffer_end_time = 0 self.midi_state.event_index = 0 self.midi_state.playback_start_time = time.time()
[docs] def _connect_ui(self): """Connect UI update""" try: self.midi_state.timer.timeout.connect(self.midi_play_next_event) log.message("Success: Connected midi_play_next_event to timeout") except Exception as ex: log.error(f"Error {ex} connecting midi_play_next_event to timeout")
[docs] def _disconnect_ui(self): """Disconnect all existing connections first""" try: self.midi_state.timer.timeout.disconnect() # Disconnect all log.debug("Disconnected all timeout signals") except Exception as ex: log.debug(f"Disconnecting all timeout signals: {ex}")
[docs] def _connect_worker(self): # Connect worker if available try: if self.midi_playback_worker is not None: self.midi_state.timer.timeout.connect(self.midi_playback_worker.do_work) log.message( "Success: Connected midi_playback_worker.do_work to timeout" ) else: log.warning("⚠️ midi_playback_worker is None, skipping connection") except Exception as ex: log.error(f"Error {ex} connecting worker to timeout")
[docs] def setup_playback_worker(self): """ setup_playback_worker :return: None Setup the MIDI playback worker (prepare buffered messages, etc.) Loads engine and wires on_event; worker uses engine.process_until_now(). """ # Setup worker if not already initialized if not self.midi_state.playback_thread or not self.midi_playback_worker: self.setup_worker() self.ui_ensure_timer_connected() self.midi_state.muted_channels = self.get_muted_channels() self.midi_state.muted_tracks = self.get_muted_tracks() if not self.midi_state.file: return self.playback_engine.load_file(self.midi_state.file) # Sync mute state to engine for i in range(len(self.midi_state.file.tracks)): self.playback_engine.mute_track(i, i in self.midi_state.muted_tracks) for ch in range(16): display_ch = ch + Midi.channel.BINARY_TO_DISPLAY self.playback_engine.mute_channel( ch, display_ch in self.midi_state.muted_channels ) self.playback_engine.suppress_program_changes = ( self.midi_state.suppress_program_changes ) self.playback_engine.suppress_control_changes = ( self.midi_state.suppress_control_changes ) midi_out = self.midi_helper.midi_out self.playback_engine.on_event = lambda msg: midi_out.send_message(msg.bytes()) self.midi_playback_worker.setup( buffered_msgs=[], midi_out_port=self.midi_helper.midi_out, ticks_per_beat=self.midi_state.file.ticks_per_beat, play_program_changes=not self.midi_state.suppress_program_changes, start_time=self.midi_state.playback_start_time, initial_tempo=self.midi_state.tempo_at_position, playback_engine=self.playback_engine, )
[docs] def start_playback_worker(self): """ start_playback_worker :return: None Start the timer for actual playback. """ self.midi_state.timer.start()
[docs] def setup_and_start_playback_worker(self): """ setup_and_start_playback_worker :return: None Setup the MIDI playback worker and start the timer. """ self.setup_playback_worker() self.start_playback_worker()
[docs] def initialize_midi_state(self) -> None: """ Initialize muted tracks, muted channels, and buffered messages. """ if self.midi_state.muted_tracks is None: self.midi_state.muted_tracks = set() if self.midi_state.muted_channels is None: self.midi_state.muted_channels = set() if self.midi_state.playback_start_time is None: self.midi_state.playback_start_time = time.time()
[docs] def calculate_start_tick(self) -> Optional[int]: """ Calculate the start tick based on elapsed playback time. :return: Start tick or None if an error occurs. """ try: # Check if playback_start_time is initialized if self.midi_state.playback_start_time is None: log.debug("playback_start_time not initialized, using 0") return 0 elapsed_time_secs = time.time() - self.midi_state.playback_start_time return int( mido.second2tick( second=elapsed_time_secs, ticks_per_beat=self.midi_state.file.ticks_per_beat, tempo=self.midi_state.tempo_at_position, ) ) except Exception as ex: log.error(f"Error converting playback start time to ticks: {ex}") return None
[docs] def is_track_muted(self, track_index: int) -> bool: """ is_track_muted :param track_index: Index of the track to check. :return: True if the track is muted, otherwise False. Check if the track is muted. """ return track_index in self.midi_state.muted_tracks
[docs] def is_channel_muted(self, channel_index: int) -> bool: """ is_channel_muted :param channel_index: Index of the track to check. :return: True if the channel is muted, otherwise False. Check if the channel is muted. """ return ( channel_index + Midi.channel.BINARY_TO_DISPLAY in self.midi_state.muted_channels )
[docs] def get_muted_tracks(self): """ get_muted_tracks :return: None Get the muted tracks from the MIDI track viewer. """ muted_tracks_raw = self.midi_file.midi_track_viewer.get_muted_tracks() muted_tracks = {int(t) for t in muted_tracks_raw if not isinstance(t, set)} for track in muted_tracks: log.parameter("Muted track", track) return muted_tracks
[docs] def get_muted_channels(self): """ get_muted_channels :return: None Get the muted channels from the MIDI track viewer. """ muted_channels_raw = self.midi_file.midi_track_viewer.get_muted_channels() muted_channels = {int(c) for c in muted_channels_raw if not isinstance(c, set)} log.parameter("Muted channels", muted_channels) for channel in muted_channels: log.parameter("Muted channel", channel) return muted_channels
[docs] def on_tempo_usecs_changed(self, tempo: int): """ on_tempo_usecs_changed :param tempo: int :return: None """ self.midi_state.tempo_at_position = tempo
[docs] def on_tempo_bpm_changed(self, bpm: float): """ on_tempo_bpm_changed :param bpm: float :return: None """ tempo = bpm2tempo(bpm) self.on_tempo_usecs_changed(tempo)
[docs] def midi_play_next_event(self): """ UI update: Update slider and label to reflect playback progress. """ try: if self.midi_state.playback_start_time is None: return now = time.time() elapsed_time = now - self.midi_state.playback_start_time self.ui_midi_file_position_slider_set_position(elapsed_time) except Exception as ex: log.error(f"Error {ex} occurred updating playback UI")
[docs] def ui_midi_file_position_slider_set_position(self, elapsed_time: float) -> None: """ ui_midi_file.position_slider_set_position :param elapsed_time: float :return: None Update the slider position and label based on elapsed time. """ if self.midi_file.position_slider.isSliderDown(): return # Don't update while user is dragging new_value = int(elapsed_time) current_value = self.midi_file.position_slider.value() if abs(new_value - current_value) >= 1: # Only update if full second has passed self.midi_file.position_slider.setValue(new_value) self.ui_position_label_set_time(elapsed_time)
[docs] def midi_scrub_position(self): """ Updates the MIDI playback state based on the scrub position. """ if not self.is_midi_ready(): log.message( "Either self.midi.file or self.midi.events not present, returning" ) return self.stop_playback() target_time = self.get_target_time() self.update_event_index(target_time) self.update_playback_start_time(target_time) if self.midi_state.events and 0 <= self.midi_state.event_index < len( self.midi_state.events ): scrub_tick = self.midi_state.events[self.midi_state.event_index][0] self.playback_engine.scrub_to_tick(scrub_tick) self.stop_all_notes() self.prepare_for_playback() # Update digital with tempo and bar for scrubbed position self.update_upper_display_with_tempo_and_bar(target_time)
[docs] def is_midi_ready(self) -> bool: """ Checks if the MIDI file and events are available. """ return bool(self.midi_state.file and self.midi_state.events)
[docs] def stop_playback(self) -> None: """ Stops playback and resets the paused state. """ self.midi_playback_worker_stop() self.midi_playback_worker_disconnect() self.midi_state.playback_paused = False # Optional: reset paused state
[docs] def get_target_time(self) -> float: """ Retrieves the target time from the slider and logs it. """ target_time = self.midi_file.position_slider.value() log.parameter("target_time", target_time) return target_time
[docs] def update_event_index(self, target_time: float) -> None: """ Finds and updates the event index based on the target time. """ for i, (tick, _, _) in enumerate(self.midi_state.events): if tick * self.tick_duration >= target_time: self.midi_state.event_index = i log.parameter( "self.midi_state.event_index now", self.midi_state.event_index ) return self.midi_state.event_index = 0 # Default to the start if no match
[docs] def update_playback_start_time(self, target_time: float) -> None: """ Adjusts the playback start time based on the scrub position. Uses target_time (slider value in seconds) directly so elapsed_time matches the scrubbed position and advances correctly during playback. """ self.midi_state.playback_start_time = time.time() - target_time
[docs] def stop_all_notes(self) -> None: """ Sends Control Change 123 and note_off messages to silence all notes. """ if not self.midi_helper: return for ch in range(16): # CC 123 = All Notes Off self.midi_helper.midi_out.send_message( mido.Message( MidoMessageType.CONTROL_CHANGE.value, control=123, value=0, channel=ch, ).bytes() ) # Extra safety in case the synth ignores CC123 for note in range(128): self.midi_helper.midi_out.send_message( mido.Message( MidoMessageType.NOTE_OFF.value, note=note, velocity=0, channel=ch, ).bytes() )
[docs] def prepare_for_playback(self) -> None: """ Clears the event buffer and starts the playback worker. """ self.midi_state.event_buffer.clear() self.setup_playback_worker() if self.midi_state.events and 0 <= self.midi_state.event_index < len( self.midi_state.events ): scrub_tick = self.midi_state.events[self.midi_state.event_index][0] self.playback_engine.start(scrub_tick) # Reconnect worker and UI update signals to timer (needed after scrubbing) try: # Disconnect all existing connections first self.midi_state.timer.timeout.disconnect() except Exception: pass # Already disconnected or not connected self._connect_to_worker() self._connect_to_ui() self.start_playback_worker()
[docs] def _connect_to_ui(self): """Connect UI update""" try: self.midi_state.timer.timeout.connect(self.midi_play_next_event) log.message( "Success: Reconnected midi_play_next_event to timeout after scrub" ) except Exception as ex: log.error(f"Error {ex} reconnecting midi_play_next_event to timeout")
[docs] def _connect_to_worker(self): """Connect worker if available""" if self.midi_playback_worker is not None: try: self.midi_state.timer.timeout.connect(self.midi_playback_worker.do_work) log.message( "Success: Reconnected midi_playback_worker.do_work to timeout after scrub" ) except Exception as ex: log.error(f"Error {ex} reconnecting worker to timeout")
[docs] def midi_playback_stop(self): """ Stops playback and resets everything to the beginning. """ self.transport.set_state("stop") self.playback_engine.stop() self.playback_engine.scrub_to_tick(0) # Reset the worker's index before stopping (if it exists) if self.midi_playback_worker: self.midi_playback_worker.index = 0 self.midi_playback_worker.should_stop = True # Stop the playback worker self.stop_playback_worker() # Reset all MIDI state (including event_index and playback_start_time) self.reset_midi_state() # Reset UI position slider to beginning self.ui_position_slider_reset() # Stop all notes and reset tempo self.stop_all_notes() self.reset_tempo() self.clear_active_notes() # Clear event buffer to ensure clean state self.midi_state.event_buffer.clear() self.midi_state.buffer_end_time = 0 # Stop USB recording if active self.usb_recorder.recorder.stop_recording() # Update digital to show tempo only (no bar when stopped) if self.current_tempo_bpm is not None: self.ui.digital_title_file_name.set_upper_display_text( f"Tempo: {round(self.current_tempo_bpm)} BPM" ) # Logging and profiling self.log_event_buffer() self.perform_profiling() log.message("MIDI playback stopped and reset to beginning")
[docs] def stop_playback_worker(self): """ Stops and disconnects the playback worker. """ self.midi_playback_worker_stop() self.midi_playback_worker_disconnect() self.midi_play_next_event_disconnect()
[docs] def reset_midi_state(self): """ Resets MIDI state variables. """ self.midi_state.playback_paused = False self.midi_state.event_index = 0 self.midi_state.playback_start_time = ( None # Reset start time so playback starts from beginning ) self.midi_state.playback_paused_time = None # Clear paused time
[docs] def reset_tempo(self): """ Resets the tempo to the initial value. """ self.ui_display_set_tempo_usecs(self.midi_state.tempo_initial)
[docs] def clear_active_notes(self) -> None: """ Clears the active notes. """ self.midi_state.active_notes.clear()
[docs] def log_event_buffer(self) -> None: """ log_event_buffer :return: None Logs the event buffer for debugging. """ log.parameter("self.midi.event_buffer", self.midi_state.event_buffer) for t, msg in self.midi_state.event_buffer[:20]: log.message(f"Queued @ {t:.3f}s: {msg}")
[docs] def perform_profiling(self) -> None: """No-op: per-playback profiling was removed to avoid skewing app-wide profiles.""" pass
[docs] def midi_play_next_event_disconnect(self) -> None: """ midi_play_next_event_disconnect :return: None Disconnect the midi_play_next_event from the timer's timeout signal. """ # --- Check if timer is initialized if not hasattr(self.midi_state, "timer") or self.midi_state.timer is None: log.debug("Timer not initialized, skipping disconnect") return # --- Disconnect midi_play_next_event safely try: self.midi_state.timer.timeout.disconnect(self.midi_play_next_event) log.debug( "Successfully disconnected midi_play_next_event from timeout signal" ) except TypeError: # Signal was not connected log.debug("midi_play_next_event was not connected to timeout signal") except Exception as ex: log.debug(f"Could not disconnect midi_play_next_event: {ex}")
[docs] def midi_playback_worker_disconnect(self) -> None: """ midi_playback_worker_disconnect :return: None Disconnect the midi_playback_worker.do_work from the timer's timeout signal. """ try: if ( hasattr(self, "midi_playback_worker") and self.midi_playback_worker is not None ): self.midi_state.timer.timeout.disconnect( self.midi_playback_worker.do_work ) except TypeError: log.warning("⚠️ do_work was not connected to timeout signal.") except Exception as ex: log.warning(f"⚠️ Could not disconnect do_work: {ex}")
[docs] def ui_position_slider_reset(self) -> None: """ position_slider_reset :return: None Reset the position slider and label to initial state. """ self.midi_file.position_slider.setEnabled(False) self.midi_file.position_slider.setValue(0) self.midi_file.position_slider.setEnabled(True) self.midi_file.position_slider.setRange( 0, int(self.midi_state.file_duration_seconds) ) self.ui_position_label_set_time() # Update digital to show tempo only (no bar when resetting) if self.current_tempo_bpm is not None: self.ui.digital_title_file_name.set_upper_display_text( f"Tempo: {round(self.current_tempo_bpm)} BPM" )
[docs] def midi_file_position_label_update_time( self, time_seconds: Optional[float] = None ) -> None: """ midi_file_position_label_update_time :param time_seconds: float, optional :return: None """ self.midi_file.position_label_update_time(time_seconds)
[docs] def calculate_current_bar( self, elapsed_time: Optional[float] = None ) -> Optional[float]: """ Calculate the current bar number based on elapsed playback time. :param elapsed_time: Optional elapsed time in seconds. If None, calculates from current playback state. :return: Current bar number (1-based) or None if calculation fails. """ try: if ( not self.midi_state.file or not hasattr(self, "ticks_per_beat") or self.ticks_per_beat is None ): return None if elapsed_time is None: if self.midi_state.playback_start_time is None: return None elapsed_time = time.time() - self.midi_state.playback_start_time # Cap elapsed time to file duration elapsed_time = min(elapsed_time, self.midi_state.file_duration_seconds) # Convert elapsed time to ticks current_tick = mido.second2tick( second=elapsed_time, ticks_per_beat=self.midi_state.file.ticks_per_beat, tempo=self.midi_state.tempo_at_position, ) # Calculate bar number (assuming 4/4 time signature: 4 beats per measure) # Bar number is 1-based current_bar = (current_tick / (4 * self.ticks_per_beat)) + 1 return max(1.0, current_bar) # Ensure at least bar 1 except Exception as ex: log.debug(f"Error calculating current bar: {ex}") return None
[docs] def ui_position_label_set_time(self, elapsed_time: Optional[float] = None) -> None: """ Update the position label with formatted elapsed time and total duration. Caps elapsed_time to total duration to prevent overflow digital. Also updates the bar number in the DigitalTitle upper digital. """ total = self.midi_state.file_duration_seconds if elapsed_time is None: elapsed_str = "0:00" else: elapsed_capped = min(elapsed_time, total) elapsed_str = format_time(elapsed_capped) total_str = format_time(total) self.update_position_text(elapsed_str, total_str) # Update upper digital with tempo and bar number during active playback or when paused if elapsed_time is not None and self.midi_state.file and self.midi_state.timer: # Update digital with tempo and bar if timer was active (playing or paused) if self.midi_state.timer.isActive() or self.midi_state.playback_paused: self.update_upper_display_with_tempo_and_bar(elapsed_time)
[docs] def update_position_text(self, elapsed_str: str, total_str: str): """update position text""" label_text = f"Playback Position: {elapsed_str} / {total_str}" if getattr(self.midi_file, "position_label", "") != label_text: self.midi_file.position_label.setText(label_text)
[docs] def midi_playback_pause_toggle(self): """ midi_playback_pause_toggle :return: None Toggle pause and resume playback. """ if not self.midi_state.file or not self.midi_state.timer: return if not self.midi_state.playback_paused: self._pause_playback() else: self._resume_playback()
[docs] def _resume_playback(self): """Resuming playback""" if self.midi_state.playback_paused_time and self.midi_state.playback_start_time: pause_duration = time.time() - self.midi_state.playback_paused_time self.midi_state.playback_start_time += pause_duration # Adjust start time self.midi_state.timer.start() self.midi_state.playback_paused = False self.transport.pause_label.setText("Pause")
[docs] def _pause_playback(self): """Pausing playback""" self.midi_state.playback_paused_time = time.time() self.midi_state.timer.stop() self.midi_state.playback_paused = True self.transport.pause_label.setText("Resume")
[docs] def midi_playback_worker_handle_result(self, result=None): """ Handle the result from the worker. This can be used to update the UI or perform further actions. :param result: The result from the worker """ pass
[docs] def _get_pattern_sequencer_editor(self): """Return the Pattern Sequencer editor instance if available, else None.""" if not self.parent: return None if hasattr(self.parent, "get_existing_editor"): try: from jdxi_editor.ui.editors.pattern.pattern import PatternSequenceEditor return self.parent.get_existing_editor(PatternSequenceEditor) except Exception: return None return None
[docs] def _push_tempo_to_pattern_sequencer(self, tempo_bpm: float) -> None: """Push current display tempo to Pattern Sequencer so it stays in sync with playback pipeline.""" try: pattern_editor = self._get_pattern_sequencer_editor() if not pattern_editor: return bpm_int = int(round(tempo_bpm)) if ( hasattr(pattern_editor, "tempo_spinbox") and pattern_editor.tempo_spinbox ): pattern_editor.tempo_spinbox.blockSignals(True) pattern_editor.tempo_spinbox.setValue(bpm_int) pattern_editor.tempo_spinbox.blockSignals(False) if hasattr(pattern_editor, MidoMessageType.SET_TEMPO.value): pattern_editor.set_tempo(bpm_int) except Exception as ex: log.debug(f"Could not push tempo to Pattern Sequencer: {ex}")
[docs] def _notify_pattern_sequencer_file_loaded(self) -> None: """Notify Pattern Sequencer that a MIDI file has been loaded.""" try: pattern_editor = self._get_pattern_sequencer_editor() log.debug( f"Found Pattern Sequencer for file load: {pattern_editor is not None}" ) if pattern_editor: if hasattr(pattern_editor, "_load_from_midi_file_editor"): log.message("Notifying Pattern Sequencer of MIDI file load") # Pass self (MidiFileEditor) directly to ensure the reference is available pattern_editor._load_from_midi_file_editor(midi_file_editor=self) elif hasattr(pattern_editor, "set_midi_file_editor"): # If Pattern Sequencer doesn't have the reference, set it first log.message("Setting MidiFileEditor reference in Pattern Sequencer") pattern_editor.set_midi_file_editor(self) if hasattr(pattern_editor, "_load_from_midi_file_editor"): pattern_editor._load_from_midi_file_editor( midi_file_editor=self ) else: log.debug("Pattern Sequencer found but missing required methods") else: log.debug("Pattern Sequencer not found - it may not be initialized yet") except Exception as ex: log.error(f"Error notifying Pattern Sequencer: {ex}") import traceback log.debug(traceback.format_exc())
[docs] def _build_specs(self) -> dict[str, Any]: """build specs for the Midi file player""" return { "buttons": self._build_button_specs(), "message_box": self._build_message_box_specs(), "check_box": self._build_checkbox_specs(), }
[docs] def _build_button_specs(self) -> dict[str, ButtonSpec]: return { "automation_insert": ButtonSpec( label="Insert Program Change Here", tooltip="Insert Program Change at current position", icon=JDXi.UI.Icon.ADD, slot=self.insert_program_change_current_position, ), }
[docs] def _build_message_box_specs(self) -> dict[str, MessageBoxSpec]: """Assemble all static message box specs for the MIDI player.""" return { "no_midi_file": MessageBoxSpec( title="No MIDI File Loaded", message="Please load a MIDI file first before detecting drum tracks.", type_attr="Warning", ), "no_midi_file_classify": MessageBoxSpec( title="No MIDI File Loaded", message="Please load a MIDI file first before classifying tracks.", type_attr="Warning", ), "no_drum_tracks_found": MessageBoxSpec( title="No Drum Tracks Found", message=( "No tracks were identified as drum tracks with high confidence (score >= 70).\n\n" "Try lowering the threshold or manually assign channels." ), type_attr="Information", ), "no_tracks_classified": MessageBoxSpec( title="No Tracks Classified", message=( "No tracks were classified with sufficient confidence (score >= 30).\n\n" "Tracks may need manual assignment." ), type_attr="Information", ), "no_output_file": MessageBoxSpec( title="No Output File", message="Please select a WAV output file or enable auto-generate filename.", type_attr="Warning", ), # Dynamic message (pass message= at call site): "drum_tracks_detected": MessageBoxSpec( title="Drum Tracks Detected", message="", type_attr="Information" ), "error_detect_drums": MessageBoxSpec( title="Error", message="", type_attr="Critical" ), "error_classify_tracks": MessageBoxSpec( title="Error", message="", type_attr="Critical" ), "tracks_classified": MessageBoxSpec( title="Tracks Classified", message="", type_attr="Information" ), "error_saving_file": MessageBoxSpec( title="Error Saving File", message="", type_attr="Critical" ), # Generic templates (pass title= and message= at call site): "info": MessageBoxSpec(title="", message="", type_attr="Information"), "warning": MessageBoxSpec(title="", message="", type_attr="Warning"), }
[docs] def _build_checkbox_specs(self) -> dict[str, CheckBoxSpec]: """build check box specs (event suppression moved to EventSuppressionGroup)""" return {}