Source code for jdxi_editor.midi.io.output_handler

"""
MIDI Output Handler
===================

This module provides the `MIDIOutHandler` class for managing MIDI output, allowing users to send
note-on, note-off, and control change messages through address specified MIDI output port.

Dependencies:
    - rtmidi: A library for working with MIDI messages and ports.

Example usage:
    handler = MIDIOutHandler("MIDI Output Port")
    handler.send_note_on(60, velocity=100)
    handler.send_note_off(60)
    handler.send_control_change(7, 127)
    handler.close()

"""

import logging
from typing import Iterable, Optional

from decologr import Decologr as log
from picomidi.constant import Midi
from picomidi.core.bitmask import BitMask
from PySide6.QtCore import Signal

from jdxi_editor.midi.data.parsers.util import OUTBOUND_MESSAGE_IGNORED_KEYS
from jdxi_editor.midi.io.controller import MidiIOController
from jdxi_editor.midi.io.utils import format_midi_message_to_hex_string
from jdxi_editor.midi.message import (
    ChannelMessage,
    ControlChangeMessage,
    IdentityRequestMessage,
    MidiMessage,
    ProgramChangeMessage,
)
from jdxi_editor.midi.sysex.parser.sysex import JDXiSysExParser
from jdxi_editor.midi.sysex.validation import validate_midi_message


[docs] class MidiOutHandler(MidiIOController): """Helper class for MIDI communication with the JD-Xi."""
[docs] midi_message_outgoing = Signal(object)
def __init__(self, parent=None): super().__init__(parent)
[docs] self.parent = parent
[docs] self.channel = 1
[docs] self.sysex_parser = JDXiSysExParser()
import threading # Global lock for all MIDI output operations
[docs] _midi_send_lock = threading.RLock()
[docs] def send_raw_message(self, message: Iterable[int]) -> bool: """ Thread-safe version of sending a raw MIDI message. Handles logging, validation, and exceptions safely. """ with self._midi_send_lock: try: if not validate_midi_message(message): log.message("MIDI message validation failed.") return False formatted_message = format_midi_message_to_hex_string(message) if not self.midi_out.is_port_open(): log.message("MIDI output port is not open.") return False # Parse SysEx safely - only attempt if message is actually SysEx (starts with 0xF0) filtered_data = {} message_list = list(message) if message_list and message_list[0] == Midi.SYSEX.START: # This is a SysEx message, try to parse it try: parsed_data = self.sysex_parser.parse_bytes(bytes(message)) filtered_data = { k: v for k, v in parsed_data.items() if k not in OUTBOUND_MESSAGE_IGNORED_KEYS } except ValueError as parse_ex: # Skip logging for non-JD-Xi messages (e.g., universal identity requests) error_msg = str(parse_ex) if "Not a JD-Xi SysEx message" in error_msg: # This is a universal MIDI message, not a JD-Xi message - skip silently filtered_data = {} else: # Log warning for actual JD-Xi parsing errors log.message( f"SysEx parsing failed: {parse_ex}", level=logging.WARNING, ) filtered_data = {} except Exception as parse_ex: # Log warning for other parsing errors log.message( f"SysEx parsing failed: {parse_ex}", level=logging.WARNING ) filtered_data = {} # For non-SysEx messages, filtered_data remains empty (no warning needed) # Log safely log.message( f"[MIDI QC passed] — [ Sending message: {formatted_message} ] {filtered_data}", level=logging.INFO, silent=False, ) # Send the message self.midi_out.send_message(message) self.midi_message_outgoing.emit(message) return True except Exception as ex: # Catch everything to prevent C-level crash from propagating log.error(f"Unexpected error sending MIDI message: {ex}") return False
[docs] def send_note_on( self, note: int = 60, velocity: int = 127, channel: int = 1 ) -> None: """ Send 'Note On' message to the specified MIDI channel. :param note: int MIDI note number (0–127), default is 60 (Middle C). :param velocity: int Note velocity (0–127), default is 127. :param channel: int MIDI channel (1–16), default is 1. """ self.send_channel_message(Midi.NOTE.ON, note, velocity, channel)
[docs] def send_note_off( self, note: int = 60, velocity: int = 0, channel: int = 1 ) -> None: """ Send address 'Note Off' message :param note: int MIDI note number (0–127), default is 60 (Middle C). :param velocity: int Note velocity (0–127), default is 127. :param channel: int MIDI channel (1–16), default is 1. """ self.send_channel_message(Midi.NOTE.OFF, note, velocity, channel)
[docs] def send_channel_message( self, status: int, data1: Optional[int] = None, data2: Optional[int] = None, channel: int = 1, ) -> None: """ Send a MIDI Channel Message. :param status: int Status byte (e.g., NOTE_ON, NOTE_OFF, CONTROL_CHANGE). :param data1: Optional[int]): First data byte, typically a note or controller number. :param data2: Optional[int]): Second data byte, typically velocity or value. :param channel: int MIDI channel (1-based, range 1-16). :raises: ValueError If the channel is out of range (1-16). """ if not 1 <= channel <= 16: raise ValueError(f"Invalid MIDI channel: {channel}. Must be 1-16.") channel_message = ChannelMessage( status, data1, data2, channel - 1 ) # convert to 0-based message_bytes_list = channel_message.to_message_list() self.send_raw_message(message_bytes_list)
[docs] def send_bank_select(self, msb: int, lsb: int, channel: int = 0) -> bool: """ Send address bank select message. :param msb: int Upper byte of the bank. :param lsb: int Lower byte of the bank. :param channel: int midi channel (0-15). :return: bool True if successful, False otherwise. """ log.message("========Sending bank select==========") log.parameter("MSB", msb) log.parameter("LSB", lsb) log.parameter("channel", channel) try: # Bank Select MSB (CC#0) status = Midi.CC.STATUS | (channel & BitMask.LOW_4_BITS) self.send_raw_message([status, Midi.CC.BANK.MSB, msb]) # Bank Select LSB (CC#32) self.send_raw_message([status, Midi.CC.BANK.LSB, lsb]) return True except (ValueError, TypeError, OSError, IOError) as ex: log.error(f"Error sending bank select: {ex}") return False
[docs] def send_identity_request(self) -> bool: """ Send identity request message (Universal System Exclusive). :return: bool True if the message was sent successfully, False otherwise. """ log.message("=========Sending identity request========") try: identity_request_message = IdentityRequestMessage() identity_request_bytes_list = identity_request_message.to_message_list() log.message( f"sending identity request message: " f"{type(identity_request_bytes_list)} {identity_request_bytes_list}" ) self.send_raw_message(identity_request_bytes_list) return True except (ValueError, TypeError, OSError, IOError) as ex: log.error(f"Error sending identity request: {ex}") return False
[docs] def send_midi_message(self, sysex_message: MidiMessage) -> bool: """ Send SysEx parameter change message using a MidiMessage. :param sysex_message: MidiMessage instance to be converted and sent. :return: True if the message was successfully sent, False otherwise. """ try: message = sysex_message.to_message_list() return self.send_raw_message(message) except (ValueError, TypeError, OSError, IOError) as ex: log.error(f"Error sending message: {ex}") return False
[docs] def send_program_change(self, program: int, channel: int = 0) -> bool: """ Send address program change message. :param program: int Program number (0-127). :param channel: int MIDI channel (0-15). :return: True if successful, False otherwise. """ log.message("=====Sending program change====") log.parameter("program", program) log.parameter("channel", channel) try: program_change_message = ProgramChangeMessage( channel=channel, program=program ) message = program_change_message.to_message_list() return self.send_raw_message(message) except (ValueError, TypeError, OSError, IOError) as ex: log.error(f"Error sending program change: {ex}") return False
[docs] def send_control_change( self, controller: int, value: int, channel: int = 0 ) -> bool: """ Send control change message. :param controller: int Controller number (0–127). :param value: int Controller value (0–127). :param channel: int MIDI channel (0–15). :return: bool True if successful, False otherwise. """ log.message("=====Sending control change====") log.parameter("controller", controller) log.parameter("value", value) log.parameter("channel", channel) if not 0 <= channel <= 15: log.message(f"Invalid MIDI channel: {channel}. Must be 0-15.") return False if not 0 <= controller <= 127: log.message(f"Invalid controller number: {controller}. Must be 0-127.") return False if not 0 <= value <= 127: log.message(f"Invalid controller value: {value}. Must be 0-127.") return False try: control_change_message = ControlChangeMessage(channel, controller, value) message = control_change_message.to_message_list() return self.send_raw_message(message) except (ValueError, TypeError, OSError, IOError) as ex: log.message(f"send_control_change: Error sending control change: {ex}") return False
[docs] def send_rpn(self, parameter: int, value: int, channel: int = 0) -> bool: """ Send a Registered Parameter Number (RPN) message via MIDI Control Change. :param parameter: int RPN parameter number (0–16383). :param value: int Parameter value (0–16383). :param channel: int MIDI channel (0–15). :return: True if messages sent successfully, False otherwise. """ log.message("========sending rpn=========") if not 0 <= parameter <= 16383: log.message(f"Invalid RPN parameter: {parameter}. Must be 0–16383.") return False if not 0 <= value <= 16383: log.message(f"Invalid RPN value: {value}. Must be 0–16383.") return False # Split into MSB/LSB rpn_msb = (parameter >> 7) & BitMask.LOW_7_BITS rpn_lsb = parameter & BitMask.LOW_7_BITS value_msb = (value >> 7) & BitMask.LOW_7_BITS value_lsb = value & BitMask.LOW_7_BITS success = ( self.send_control_change(101, rpn_msb, channel) and self.send_control_change(100, rpn_lsb, channel) # RPN MSB and self.send_control_change(6, value_msb, channel) # RPN LSB and self.send_control_change( # Data Entry MSB 38, value_lsb, channel ) # Data Entry LSB ) if success: log.message("Success: Sent RPN") log.parameter("Param", parameter) log.parameter("Value", value) log.parameter("Channel", channel) else: log.message("Failed to send RPN messages.") return success
[docs] def send_nrpn( self, parameter: int, value: int, channel: int = 0, use_14bit: bool = False ) -> bool: """ Send a Non-Registered Parameter Number (NRPN) message via MIDI Control Change. :param parameter: int NRPN parameter number (0–16383). :param value: int Parameter value (0–16383 for 14-bit, 0–127 for 7-bit). :param channel: int MIDI channel (0–15). :param use_14bit: bool If True, send both MSB and LSB for value (14-bit). If False, send only MSB (7-bit). :return: True if all messages were sent successfully, False otherwise. """ log.message("========sending nrpn=========") log.parameter("parameter", parameter) log.parameter("value", value) log.parameter("channel", channel) log.parameter("use_14bit", use_14bit) if not 0 <= parameter <= 16383: log.message(f"Invalid NRPN parameter: {parameter}. Must be 0–16383.") return False if not 0 <= value <= (16383 if use_14bit else 127): log.message( f"Invalid NRPN value: {value}. Must be 0–{16383 if use_14bit else 127}." ) return False nrpn_msb = (parameter >> 7) & BitMask.LOW_7_BITS nrpn_lsb = parameter & BitMask.LOW_7_BITS if use_14bit: value_msb = (value >> 7) & BitMask.LOW_7_BITS value_lsb = value & BitMask.LOW_7_BITS else: value_msb = value & BitMask.LOW_7_BITS value_lsb = 0 # Optional; not sent anyway ok = True ok &= self.send_control_change(99, nrpn_msb, channel) # NRPN MSB ok &= self.send_control_change(98, nrpn_lsb, channel) # NRPN LSB ok &= self.send_control_change(6, value_msb, channel) # Data Entry MSB if use_14bit: ok &= self.send_control_change(38, value_lsb, channel) # Data Entry LSB # Optional cleanup (nulling NRPN, not required but can prevent stuck parameters) ok &= self.send_control_change(99, 127, channel) # NRPN MSB null ok &= self.send_control_change(98, 127, channel) # NRPN LSB null if ok: log.message("Sent NRPN:") log.parameter("parameter", parameter) log.parameter("value", value) log.parameter("channel", channel) else: log.message("Failed to send NRPN messages.") return ok
[docs] def send_bank_select_and_program_change( self, channel: int, bank_msb: int, bank_lsb: int, program: int ) -> bool: """ Sends Bank Select and Program Change messages with delays between messages to ensure the synthesizer can process them correctly. :param channel: int MIDI channel (1-16). :param bank_msb: int Bank MSB value. :param bank_lsb: int Bank LSB value. :param program: int Program number. :return: bool True if all messages are sent successfully, False otherwise. """ try: import time from jdxi_editor.midi.sleep import MIDI_SLEEP_TIME log.message("========send_bank_select_and_program_change=========") log.parameter("channel", channel) log.parameter("bank_msb", bank_msb) log.parameter("bank_lsb", bank_lsb) log.parameter("program", program) log.message( f"-------#1 send_control_change controller=0, bank_msb={bank_msb}, channel: {channel} --------" ) self.send_control_change(0, bank_msb, channel) time.sleep(MIDI_SLEEP_TIME) # Small delay between bank select messages log.message( f"-------#2 send_control_change controller=32, bank_lsb={bank_lsb}, channel: {channel} --------" ) self.send_control_change(32, bank_lsb, channel) time.sleep(MIDI_SLEEP_TIME) # Small delay before program change log.message( f"-------#3 send_program_change program: {program} channel: {channel} --------" ) self.send_program_change(program, channel) return True except Exception as ex: log.error(f"Error {ex} occurred sending bank and program change message") return False
[docs] def identify_device(self) -> None: """ Send Identity Request and verify response :return: None """ request = IdentityRequestMessage() self.send_message(request) log.parameter("sending identity request message:", request)
[docs] def send_message(self, message: MidiMessage) -> None: """ unpack the message list and send it :param message: MidiMessage :return: None """ try: raw_message = message.to_message_list() self.send_raw_message(raw_message) log.parameter("Sent MIDI message:", raw_message) except Exception as ex: log.error(f"Error sending identity request: {str(ex)}")