"""
# midi_worker.py
Playback Worker to play Midi files in a new thread
"""
import threading
import time
from picomidi.constant import Midi
from picomidi.core.bitmask import BitMask
from PySide6.QtCore import QObject, Signal, Slot
[docs]
class MidiPlaybackWorker(QObject):
"""MidiPlaybackWorker"""
[docs]
set_tempo = Signal(int) # Tempo in microseconds
[docs]
result_ready = Signal(str) # optional
def __init__(
self, parent: QObject | None = None
) -> None: # pylint: disable=unsupported-binary-operation
super().__init__()
[docs]
self.position_tempo = Midi.TEMPO.BPM_120_USEC
[docs]
self.initial_tempo = Midi.TEMPO.BPM_120_USEC
[docs]
self.should_stop = False
[docs]
self.buffered_msgs = []
[docs]
self.midi_out_port = None
[docs]
self.play_program_changes = True
[docs]
self.ticks_per_beat = 480
[docs]
self.lock = threading.Lock()
[docs]
self.start_time = time.time()
[docs]
def __str__(self) -> str:
return (
f"MidiPlaybackWorker(position_tempo={self.position_tempo}, "
f"should_stop={self.should_stop}, buffered_msgs={len(self.buffered_msgs)}, "
f"midi_out_port={self.midi_out_port}, play_program_changes={self.play_program_changes}, "
f"ticks_per_beat={self.ticks_per_beat}, index={self.index}, "
f"start_time={self.start_time})"
)
[docs]
def setup(
self,
buffered_msgs: list,
midi_out_port: object,
ticks_per_beat: int = 480,
play_program_changes: bool = True,
start_time: float | None = None, # pylint: disable=unsupported-binary-operation
initial_tempo: int = Midi.TEMPO.BPM_120_USEC,
) -> None:
"""Setup the playback worker with buffered messages and configuration."""
self.buffered_msgs = buffered_msgs
self.midi_out_port = midi_out_port
self.ticks_per_beat = ticks_per_beat
self.play_program_changes = play_program_changes
self.initial_tempo = initial_tempo
self.index = 0
if start_time is None:
self.start_time = time.time()
else:
self.start_time = start_time
self.should_stop = False
# Set initial tempo (use provided value or default)
if initial_tempo is not None:
self.initial_tempo = initial_tempo
self.position_tempo = initial_tempo
else:
# Use default tempo if none provided
self.initial_tempo = Midi.TEMPO.BPM_120_USEC
self.position_tempo = Midi.TEMPO.BPM_120_USEC
# Debug logging
print(f"🎵 Worker setup: received {len(buffered_msgs)} buffered messages")
if len(buffered_msgs) > 0:
print(f"🎵 First few buffered messages: {buffered_msgs[:3]}")
# existing setup...
[docs]
def stop(self) -> None:
"""Stop the playback worker."""
with self.lock:
self.should_stop = True
[docs]
def update_tempo(self, new_tempo: int) -> None:
"""
update_tempo
:param new_tempo: int
:return: None
"""
if new_tempo is None:
return # No change in tempo
print(f"Emitting {new_tempo}")
self.set_tempo.emit(new_tempo)
with self.lock:
self.position_tempo = new_tempo
if self.parent is not None:
if hasattr(self.parent, "set_display_tempo_usecs"):
# Assuming parent has a method to update display tempo
print(f"Updating display tempo to {new_tempo}")
self.parent.set_display_tempo_usecs(new_tempo)
@Slot()
[docs]
def do_work(self) -> None:
"""Process MIDI messages for playback."""
if self.should_stop:
return
now = time.time()
elapsed = now - self.start_time
# Add small delay to prevent immediate processing of events at tick 0
if elapsed < 0.1: # Wait 100ms before processing any events
return
# Print format header on first run
if not hasattr(self, "_header_printed"):
print("🎵 Real-time Playback Tracking:")
print(
"Format: [Elapsed] Bar X.X | BPM XXX.X | Expected: X.XXs | Real: X.XXs | Diff: ±X.XXs | Index: XXXX"
)
print("=" * 100)
self._header_printed = True
# Debug logging
if len(self.buffered_msgs) == 0:
print(f"⚠️ No buffered messages available (elapsed: {elapsed:.3f}s)")
return
while self.index < len(self.buffered_msgs):
abs_ticks, raw_bytes, msg_tempo = self.buffered_msgs[self.index]
# Calculate the time this message should be sent using incremental tempo calculation
# This correctly handles tempo changes by calculating time segment by segment
msg_time_sec = self._calculate_message_time(abs_ticks)
if msg_time_sec > elapsed:
break
# Add detailed position tracking logging (temporary)
if self.index % 100 == 0: # Log every 100 messages to avoid spam
current_bar = abs_ticks / (4 * self.ticks_per_beat)
current_bpm = 60000000 / self.position_tempo
time_diff = elapsed - msg_time_sec
print(
f"[{elapsed:6.1f}s] Bar {current_bar:5.1f} | BPM {current_bpm:6.1f} | "
f"Expected: {msg_time_sec:5.2f}s | Real: {elapsed:5.2f}s | "
f"Diff: {time_diff:+5.2f}s | Index: {self.index:4d}"
)
# Process the message
if raw_bytes is None:
# This is a tempo change message - process it
# Skip an initial tempo-only message at tick 0 (already set during setup)
if abs_ticks == 0:
self.index += 1
continue
current_bar = abs_ticks / (4 * self.ticks_per_beat)
new_bpm = 60000000 / msg_tempo
print(
f"🎵 TEMPO CHANGE at Bar {current_bar:.1f} ({elapsed:.2f}s): {msg_tempo} ({new_bpm:.1f} BPM)"
)
self.update_tempo(msg_tempo)
else:
# Send the MIDI message
status_byte = raw_bytes[0]
message_type = status_byte & BitMask.HIGH_4_BITS
if message_type == Midi.PC.STATUS and not self.play_program_changes:
# 0xC0 = program_change
pass # Skip
else:
self.midi_out_port.send_message(raw_bytes)
self.index += 1
if self.index >= len(self.buffered_msgs):
self.finished.emit()
[docs]
def _calculate_message_time(self, target_ticks: int) -> float:
"""
Calculate the time for a message at target_ticks using incremental tempo calculation.
This correctly handles tempo changes by processing events in chronological order.
"""
if not hasattr(self, "_cached_times"):
self._cached_times = {}
# Return cached time if available
if target_ticks in self._cached_times:
return self._cached_times[target_ticks]
# Calculate time incrementally like get_total_duration_in_seconds does
current_tempo = self.initial_tempo
time_seconds = 0.0
last_tick = 0
# Process all messages up to target_ticks in chronological order
for abs_ticks, raw_bytes, msg_tempo in self.buffered_msgs:
if abs_ticks > target_ticks:
break
# Calculate time for this segment using the tempo that was active
delta_ticks = abs_ticks - last_tick
time_seconds += (current_tempo / 1_000_000.0) * (
delta_ticks / self.ticks_per_beat
)
last_tick = abs_ticks
# Update tempo if this is a tempo change message
if raw_bytes is None: # This is a tempo change message
current_tempo = msg_tempo
# Cache the result
self._cached_times[target_ticks] = time_seconds
return time_seconds