top of page

DC Removal

import wave
import numpy as np
from scipy.io import wavfile
import io

def DC_Removal_filter(input_file,cutoff_frequency=100, numtaps=4400):
    # Check if the input file is a WAV file
    if not isinstance(input_file, (io.BytesIO, wave.Wave_read)):
        return "Error: Input must be a WAV file object."

    def sinc_high_pass_filter(cutoff, fs, numtaps):
        t = np.arange(numtaps) - (numtaps - 1) / 2
        sinc_func = np.sinc(2 * cutoff * t / fs)
        window = np.array([0.54 - 0.46 * np.cos(2 * np.pi * n / (numtaps - 1)) for n in range(numtaps)])
        sinc_func *= window
        sinc_func /= np.sum(sinc_func)
        delta = np.zeros(numtaps)
        delta[(numtaps - 1) // 2] = 1
        return delta - sinc_func

    # Read the WAV file
    with wave.open(input_file, 'rb') as wav_file:
        fs = wav_file.getframerate()
        n_channels = wav_file.getnchannels()
        sampwidth = wav_file.getsampwidth()
        n_frames = wav_file.getnframes()
        
        # Read all frames
        frames = wav_file.readframes(n_frames)
        
    # Convert frames to numpy array
    signal = np.frombuffer(frames, dtype=np.int16)
    
    # Convert to float32 for processing
    signal = signal.astype(np.float32)
    
    # If stereo, convert to mono by averaging channels
    if n_channels == 2:
        signal = signal.reshape(-1, 2).mean(axis=1)
    
    # Normalize the signal
    signal = signal / np.max(np.abs(signal))
    
    high_pass_filter = sinc_high_pass_filter(cutoff_frequency, fs, numtaps)
    
    # Initialize output signal
    output_signal = np.zeros_like(signal)
    
    # Buffer to store previous input values (length of filter coefficients)
    buffer = np.zeros(numtaps)
    
    # Apply the filter sample-by-sample
    for n in range(len(signal)):
        # Update the buffer with the current input sample
        buffer = np.roll(buffer, 1)
        buffer[0] = signal[n]
        # Compute the output sample by convolving the filter coefficients with the buffer
        output_signal[n] = np.dot(high_pass_filter, buffer)
    
    # Convert back to int16
    output_signal = (output_signal * 32767).astype(np.int16)
    
    # Create a BytesIO object to store the output WAV
    output_wav = io.BytesIO()
    
    # Write the filtered signal to the BytesIO object
    with wave.open(output_wav, 'wb') as wav_file:
        wav_file.setnchannels(1)  # mono
        wav_file.setsampwidth(2)  # 2 bytes per sample
        wav_file.setframerate(fs)
        wav_file.writeframes(output_signal.tobytes())
    
    # Reset the BytesIO object's position to the beginning
    output_wav.seek(0)
    
    return output_wav

​

​

​

​

In-depth explainations from the team

video of the code

​

PDM2PCM

import numpy as np
import scipy
from scipy.io.wavfile import write
import io
import yaml


def read_pdm_file(file_path):
    """
    Reads a PDM signal from a text file. Assumes the signal is composed of -1 and 1 values.

    Parameters:
        file_path (str): Path to the PDM file.

    Returns:
        np.array: Array containing the PDM signal.
    """
    pdm_signal = []

    # Open the file and read each line
    with open(file_path, 'r') as file:
        for line in file:
            # Split the line into individual values and append them to the signal list
            values = line.split()
            for val in values:
                pdm_signal.append(int(val))

    # Convert the list to a numpy array of type int8
    pdm_array = np.array(pdm_signal, dtype=np.int8)

    return pdm_array

def moving_average(signal, window_size):
    """
    Computes the moving average of the input signal using a specified window size.

    Parameters:
        signal (np.array): The input signal.
        window_size (int): The window size for the moving average.

    Returns:
        np.array: The signal after applying the moving average.
    """
    # Pad the signal with zeros at the beginning
    pad_width = window_size - 1
    padding = np.zeros(pad_width, dtype=signal.dtype)
    padded_signal = np.concatenate((padding, signal), axis=0)

    # Compute the cumulative sum of the padded signal
    cumsum = np.cumsum(padded_signal)

    # Calculate the moving average using the cumulative sum
    window_size_float = float(window_size)
    moving_sum = cumsum[window_size:] - cumsum[:-window_size]
    moving_average_result = moving_sum / window_size_float

    return moving_average_result

def cic_filter(pdm_signal, decimation_factor=64, order=1):
    """
    Applies a CIC filter to the PDM signal.

    Parameters:
        pdm_signal (np.array): The PDM signal to filter.
        decimation_factor (int): The decimation factor for the CIC filter.
        order (int): The order of the CIC filter.

    Returns:
        np.array: The filtered PCM signal.
    """
    # Step 1: Integrator stage (Moving Average)
    integrator = pdm_signal
    for _ in range(order):
        integrator = moving_average(integrator, decimation_factor)

    # Step 2: Decimation
    decimated_integrator = integrator[::decimation_factor]

    # Step 3: Comb stage
    comb = np.diff(decimated_integrator)

    return comb

def save_pcm_as_wav_file(pcm_signal, sample_rate):
    """
    Saves the PCM signal as a bytes object.

    Parameters:
        pcm_signal (np.array): The PCM signal to save. This should be a 1D numpy array of raw PCM data.
        sample_rate (int): The sample rate of the PCM signal, in Hz (samples per second).

    Returns:
        bytes: The WAV file data as a bytes object.
    """
    # Normalize the PCM signal
    max_val = np.max(np.abs(pcm_signal))
    normalized_signal = pcm_signal / max_val
    scaled_signal = normalized_signal * 32767
    int16_signal = np.int16(scaled_signal)

    # Save the PCM signal to a WAV file in memory
    wav_buffer = io.BytesIO()
    write(wav_buffer, sample_rate, int16_signal)
    wav_data = wav_buffer.getvalue()
    return wav_data

def Pdm2Pcm(pdm_file_path, decimation_factor=64, order=4, pcm_sample_rate=8000):
    """
    Converts a PDM signal from a file to a PCM signal and saves it as a WAV file.

    Parameters:
        pdm_file_path (str): Path to the PDM file.

    Returns:
        bytes: The WAV file data as a bytes object.
    """

    # Read PDM signal from file
    pdm_signal = read_pdm_file(pdm_file_path)

    # Convert PDM signal to PCM signal
    pcm_signal = cic_filter(pdm_signal, decimation_factor, order)

    # Save the PCM signal as a variable
    wav_file = save_pcm_as_wav_file(pcm_signal, pcm_sample_rate)

    return wav_file

​

​

​

​

In-depth explainations from the team

Noise Reduction

import io
import wave
import numpy as np
import matplotlib.pyplot as plt
import scipy.io.wavfile as wav
import warnings

warnings.filterwarnings("ignore", category=wav.WavFileWarning)
#STFT part
def CreateHanningWindow(window_length):
    return 0.5 * (1 - np.cos(2 * np.pi * np.arange(window_length) / (window_length - 1)))


def FFT(signal):
    N = len(signal)
    if N <= 1:
        return signal
    even = FFT(signal[0::2])
    odd = FFT(signal[1::2])
    T = np.exp(-2j * np.pi * np.arange(N) / N)
    return np.concatenate([even + T[:N // 2] * odd, even + T[N // 2:] * odd])


def IFFT(spectrum):
    N = len(spectrum)
    if N <= 1:
        return spectrum
    even = IFFT(spectrum[0::2])
    odd = IFFT(spectrum[1::2])
    T = np.exp(2j * np.pi * np.arange(N) / N)
    return (np.concatenate([even + T[:N // 2] * odd, even + T[N // 2:] * odd]) / 2)


def STFT(audio_signal, sample_rate=None, window_size=2048, hop_size=512):
    if isinstance(audio_signal, str):
        sample_rate, audio_signal = wav.read(audio_signal)
    audio_signal = audio_signal.astype(np.float64)
    if np.issubdtype(audio_signal.dtype, np.integer):
        audio_signal /= np.iinfo(audio_signal.dtype).max
    elif np.issubdtype(audio_signal.dtype, np.floating):
        audio_signal /= np.max(np.abs(audio_signal))

    window = CreateHanningWindow(window_size)
    num_frames = 1 + (len(audio_signal) - window_size) // hop_size
    stft_matrix = np.zeros((window_size, num_frames), dtype=np.complex128)

    for frame in range(num_frames):
        start = frame * hop_size
        end = start + window_size
        frame_data = audio_signal[start:end] * window
        stft_matrix[:, frame] = FFT(frame_data)

    return stft_matrix, sample_rate


def ISTFT(stft_matrix, sample_rate, window_size, hop_size, output_wav):
    num_frames = stft_matrix.shape[1]
    expected_signal_length = window_size + hop_size * (num_frames - 1)
    reconstructed_signal = np.zeros(expected_signal_length, dtype=np.float64)
    window = CreateHanningWindow(window_size)

    for frame in range(num_frames):
        start = frame * hop_size
        end = start + window_size
        frame_data = IFFT(stft_matrix[:, frame])
        reconstructed_signal[start:end] += np.real(frame_data) * window

    reconstructed_signal = reconstructed_signal * 32768
    reconstructed_signal = np.int16(reconstructed_signal / np.max(np.abs(reconstructed_signal)) * 32767)
    wav.write(output_wav, sample_rate, reconstructed_signal)

 

def check_file(input_file):
    if isinstance(input_file, str):
        return input_file.lower().endswith('.wav')
    elif isinstance(input_file, io.BytesIO):
        return True
    else:
        return False

def NoiseReduction(input_file, output_file, speech_segments, frame_size, hop_size):
    if not check_file(input_file):
        print("This is not a WAV file")
        return None

    # Load input file
    print('Loading wav file:', input_file)
    with wave.open(input_file, 'rb') as wav_file:
        sample_rate = wav_file.getframerate()
        num_channels = wav_file.getnchannels()
        sample_width = wav_file.getsampwidth()
        num_frames = wav_file.getnframes()
        waveform = wav_file.readframes(num_frames)
        waveform = np.frombuffer(waveform, dtype=np.int16)
        if num_channels > 1:
            waveform = waveform.reshape(-1, num_channels).mean(axis=1)
        waveform = waveform.astype(np.float32) / 32768.0  # Normalize to -1 to 1 range

 

    # Ensure speech_segments length matches the number of STFT frames
    num_frames = 1 + (len(waveform) - frame_size) // hop_size
    speech_segments = np.array([int(s) for s in speech_segments.split(',')])
    if len(speech_segments) > num_frames:
        speech_segments = speech_segments[:num_frames]
    elif len(speech_segments) < num_frames:
        speech_segments = np.pad(speech_segments, (0, num_frames - len(speech_segments)), 'constant')

    # Perform STFT on the entire noisy signal
    stft_matrix, _ = STFT(waveform, sample_rate=sample_rate, window_size=frame_size, hop_size=hop_size)
    magnitude_spectrum = np.abs(stft_matrix)
    phase_spectrum = np.angle(stft_matrix)

    # Compute noise spectrum from non-speech segments
    noise_spectrum = np.zeros_like(magnitude_spectrum[:, 0])
    non_speech_frame_count = 0

    for i, is_speech in enumerate(speech_segments):
        if is_speech == 0:
            noise_spectrum += magnitude_spectrum[:, i]
            non_speech_frame_count += 1

    if non_speech_frame_count > 0:
        noise_spectrum /= non_speech_frame_count
    else:
        print("No non-speech frames detected, estimating noise from the first 0.25 seconds")
        noise_estimation_duration = 0.25
        noise_samples = int(noise_estimation_duration * sample_rate)
        noise_signal = waveform[:noise_samples]

        # Perform STFT on noise signal
        noise_stft_matrix, _ = STFT(noise_signal, sample_rate=sample_rate, window_size=frame_size, hop_size=hop_size)
        noise_magnitude_spectrum = np.abs(noise_stft_matrix)
        noise_spectrum = np.mean(noise_magnitude_spectrum, axis=1)

    mean_noise_spectrum = noise_spectrum

    # Noise reduction with oversubtraction and flooring
    alpha = 2  # Oversubtraction factor
    beta = 0.01  # Spectral floor
    cleaned_spectrum = np.maximum(magnitude_spectrum - alpha * mean_noise_spectrum.reshape((mean_noise_spectrum.shape[0], 1)), beta * magnitude_spectrum)

    # Reconstruct signal using inverse STFT
    cleaned_complex_spectrum = cleaned_spectrum * np.exp(1.0j * phase_spectrum)
    ISTFT(cleaned_complex_spectrum, sample_rate, window_size=frame_size, hop_size=hop_size, output_wav=output_file)
    print('Output wav file saved:', output_file)

​

​

​

​

In-depth explainations from the team

Transmission and Reception

from random import sample
import numpy as np
import scipy.signal as signal
import soundfile as sf
import matplotlib.pyplot as plt
import scipy
import sounddevice as sd
import threading
import os

def SSB(mode='file', file=None, carrier_freq=10000, samplerate=44100):

    # Load the WAV file
    def load_wav(file):
        data, samplerate = sf.read(file)
        if len(data) == 0:
            raise ValueError("The WAV file is empty.")
        if data.ndim > 1:  # Handle stereo by taking only one channel
            data = data[:, 0]  # Use only the first channel for stereo files
        return data, samplerate

    # Save the WAV file
    def save_wav(filename, data, samplerate):
        sf.write(filename, data, samplerate)

    # Generate a SSB signal
    def ssb_modulate(signal, carrier_freq, samplerate):
        t = np.arange(len(signal)) / samplerate
        signal_mul_cos = signal * np.cos(2 * np.pi * carrier_freq * t)  # Multiply with cosine
        signal_hilbert = scipy.signal.hilbert(signal)  # Apply Hilbert transform
        signal_mul_sin = np.imag(signal_hilbert) * np.sin(2 * np.pi * carrier_freq * t)  # Multiply with sine
        ssb_signal = signal_mul_cos - signal_mul_sin  # SSB signal (Upper Sideband)
        return ssb_signal

    # Low-pass filter using FFT
    def low_pass_filter(signal, cutoff_freq, samplerate):
        fft_signal = np.fft.fft(signal)
        freqs = np.fft.fftfreq(len(signal), 1 / samplerate)
        filtered_fft_signal = fft_signal.copy()
        filtered_fft_signal[np.abs(freqs) > cutoff_freq] = 0  # Zero out frequencies above cutoff
        filtered_signal = 2 * np.fft.ifft(filtered_fft_signal)
        return np.real(filtered_signal)

    # Demodulate the SSB signal
    def ssb_demodulate(ssb_signal, carrier_freq, samplerate):
        t = np.arange(len(ssb_signal)) / samplerate
        demodulated_signal = ssb_signal * np.cos(2 * np.pi * carrier_freq * t)  # Multiply with cosine
        cutoff_freq = 4000  # Low-pass filter cutoff frequency
        recovered_signal = low_pass_filter(demodulated_signal, cutoff_freq, samplerate)  # Apply low-pass filter
        return recovered_signal

    # Real-time recording and processing callback
    def audio_callback(indata, outdata, frames, time, status):
        if status:
            print(status, flush=True)  # Print status if there's an error

        ssb_signal = ssb_modulate(indata[:, 0], carrier_freq, samplerate)  # Modulate input signal
        recovered_signal = ssb_demodulate(ssb_signal, carrier_freq, samplerate)  # Demodulate signal

        if outdata is not None:
            outdata[:, 0] = recovered_signal  # Output recovered signal to both channels
            outdata[:, 1] = recovered_signal

    if mode == 'file' and file:
        try:
            # Load the input WAV file
            data, samplerate = load_wav(file)
            carrier_freq = samplerate / 2  # Set carrier frequency to half the sample rate

            # Modulate the signal using SSB
            ssb_signal = ssb_modulate(data, carrier_freq, samplerate)

            # Demodulate the signal to recover the original data
            recovered_signal = 0.5 * ssb_demodulate(ssb_signal, carrier_freq, samplerate)

            # Save the recovered signal to an output WAV file
            output_filename = 'output_file.wav'
            save_wav(output_filename, recovered_signal, samplerate)

            # Plotting the signals
            t = np.arange(len(data)) / samplerate

            plt.figure()
            plt.subplot(3, 1, 1)
            plt.plot(np.fft.fftfreq(len(t)), np.fft.fft(data))
            plt.title('Original Baseband Signal')
            plt.xlabel('Frequency (Hz)')
            plt.ylabel('Amplitude')

            plt.subplot(3, 1, 2)
            plt.plot(np.fft.fftfreq(len(t)), np.fft.fft(ssb_signal))
            plt.title('SSB Modulated Signal (Upper Sideband)')
            plt.xlabel('Frequency (Hz)')
            plt.ylabel('Amplitude')

            plt.subplot(3, 1, 3)
            plt.plot(np.fft.fftfreq(len(t)), np.fft.fft(recovered_signal))
            plt.title('Demodulated Signal')
            plt.xlabel('Frequency (Hz)')
            plt.ylabel('Amplitude')

            plt.tight_layout()
            plt.show()

            return output_filename
        
        except FileNotFoundError as e:
            print(e)
        except ValueError as e:
            print(e)

   

    else:
        print("Invalid mode or filename not provided for file mode.")

​

​

​

​

In-depth explainations from the team

Pitch Estimation

import numpy as np
import wave
import scipy.signal
import matplotlib.pyplot as plt


# Constants
CHUNK = 1500  # Number of samples per frame
MIN_PITCH = 50  # Minimum pitch frequency (Hz)
MAX_PITCH = 800  # Maximum pitch frequency (Hz)

# Create pitch estimation function
def pitch_estimation(frame, fs, min_pitch, max_pitch):
    # Constants
    MIN_PERIOD = fs // max_pitch
    MAX_PERIOD = fs // min_pitch
   
    # Remove mean
    frame -= np.mean(frame)
   
    # Apply Hanning window
    windowed_frame = frame * np.hanning(len(frame))
   
    # Autocorrelation
    corr = np.correlate(windowed_frame, windowed_frame, mode='full')
    corr = corr[len(corr) // 2:]
    
    # Thresholding the autocorrelation to remove low-magnitude peaks
    corr[corr < 0.1 * np.max(corr)] = 0
   
    # Find the first positive slope
    d = np.diff(corr)
    PositiveSlope=np.where(d > 0)
    if PositiveSlope[0].size == 0:
        start=0
    else:
        start = PositiveSlope[0][0]
   
    # Find the peak in the specified range
    peak = np.argmax(corr[start + MIN_PERIOD:start + MAX_PERIOD]) + start + MIN_PERIOD
   
    # Calculate pitch
    if peak >= MIN_PERIOD and peak <= MAX_PERIOD:
        # Refine peak using parabolic interpolation
        if peak > 0 and peak < len(corr) - 1:
            alpha = corr[peak - 1]
            beta = corr[peak]
            gamma = corr[peak + 1]
            p = 0.5 * (alpha - gamma) / (alpha - 2 * beta + gamma) if alpha - 2 * beta + gamma != 0 else 0
            pitch_period = peak + p
        else:
            pitch_period = peak
       
        pitch = fs / pitch_period
        if pitch > max_pitch or pitch < min_pitch:
            pitch = None
    else:
        pitch = None
    if(start==0):
        return None
    return pitch

# Convert wav file to numpy array and plot results for each frame.  used to show the pitch estimation over time
def process_wav_file_pitches(wf, plot_pitch_estimation=False):
    # Open the WAV file
    num_channels = wf.getnchannels()
    sampwidth = wf.getsampwidth()
    framerate = wf.getframerate()
    num_frames = wf.getnframes()
        
        # Read the entire file
    audio_data = wf.readframes(num_frames)
        
        # Convert the byte data to numpy array
    audio_data = np.frombuffer(audio_data, dtype=np.int16)
        
        # If the audio is stereo, take only one channel
    if num_channels > 1:
            audio_data = audio_data[::num_channels]
        
        # Process the audio in chunks
    pitches = []
    for i in range(0, len(audio_data), CHUNK//2): #CHUNK//2 is used to overlap the frames
        frame = audio_data[i:i + CHUNK].astype(np.float32)
        if len(frame) == CHUNK:
            pitch = pitch_estimation(frame, framerate, MIN_PITCH, MAX_PITCH)
            pitches.append(pitch)
        
        # Optional: Apply median filter to smooth pitch estimates. only used for the plot
    #if len(pitches) > 0:
        #pitches = scipy.signal.medfilt(pitches, kernel_size=5)
        
        # Plot the detected pitches
    if plot_pitch_estimation:
        # Plot the detected pitches
        t = np.arange(len(pitches)) * CHUNK / framerate
        plt.figure(figsize=(10, 6))
        plt.plot(t, pitches, label='Detected Pitch')
        plt.xlabel('Time (s)')
        plt.ylabel('Pitch (Hz)')
        plt.title('Pitch Estimation Over Time')
        plt.legend()
        plt.grid()
        plt.show()

    return pitches #return the pitches array.

 

 

 

 

 

In-depth explainations from the team
 

Acoustic Gain Control

import numpy as np
import wave
import io

def vad_aware_agc_process(input_signal_bytes, binary_vector, frame_duration=0.01, gain=0.1):
    """
    Apply VAD-aware Automatic Gain Control (AGC) to the input audio signal.

    Args:
    input_signal_bytes (bytes): Input audio data as bytes
    binary_vector (np.array): VAD decision for each frame (1 for speech, 0 for non-speech)

    Returns:
    bytes: Processed audio signal as WAV bytes
    """
    def extract_audio_data(audio_bytes):
        if isinstance(audio_bytes, io.BytesIO):
            audio_bytes.seek(0)  # Move the cursor to the beginning of the BytesIO object
            wav_file = wave.open(audio_bytes, 'rb')
        else:
            wav_file = wave.open(io.BytesIO(audio_bytes), 'rb')
        channels = wav_file.getnchannels()
        bit_depth = wav_file.getsampwidth()
        sample_rate = wav_file.getframerate()
        frame_count = wav_file.getnframes()
        audio_data = wav_file.readframes(frame_count)

        if bit_depth == 1:
            data_type = np.int8
        elif bit_depth == 2:
            data_type = np.int16
        else:
            raise ValueError("Unsupported bit depth in the audio file.")

        audio_array = np.frombuffer(audio_data, dtype=data_type)

        if channels == 2:
            audio_array = audio_array.reshape(-1, 2).mean(axis=1)

        return audio_array.astype(np.float32) / np.iinfo(data_type).max, sample_rate, channels, bit_depth

    def compute_rms(signal, window_size):
        return np.sqrt(np.convolve(signal**2, np.ones(window_size)/window_size, mode='same'))

    def calculate_parameters(input_signal, sample_rate):
        signal_duration = len(input_signal) / sample_rate
        signal_energy = np.sum(input_signal**2)
        
        attack_time = max(0.001, min(0.02, 0.01 * (signal_duration / 10)))
        release_time = max(0.05, min(0.2, 0.1 * (signal_duration / 10)))
        noise_floor = max(-80, min(-40, -60 + 10 * np.log10(signal_energy)))
        rms_window_ms = max(20, min(100, 50 * (signal_duration / 10)))
        
        return attack_time, release_time, noise_floor, rms_window_ms

    # Extract audio data and sample rate
    input_signal, sample_rate, channels, bit_depth = extract_audio_data(input_signal_bytes)

    # Calculate adaptive parameters
    attack_time, release_time, noise_floor, rms_window_ms = calculate_parameters(input_signal, sample_rate)

    # Convert time constants to sample counts
    frame_length_samples = int(frame_duration * sample_rate)
    attack_samples = int(attack_time * sample_rate)
    release_samples = int(release_time * sample_rate)
    rms_window_samples = int(rms_window_ms * sample_rate / 1000)

    # Calculate the number of frames based on the input signal length
    num_samples = len(input_signal)
    num_frames = (num_samples + frame_length_samples - 1) // frame_length_samples

    # Ensure the VAD binary vector matches the number of frames
    if len(binary_vector) < num_frames:
        binary_vector = np.pad(binary_vector, (0, num_frames - len(binary_vector)), mode='constant', constant_values=0)
    elif len(binary_vector) > num_frames:
        binary_vector = binary_vector[:num_frames]

    assert len(binary_vector) == num_frames, f"VAD array length ({len(binary_vector)}) does not match the number of frames ({num_frames})"

    # Compute target RMS
    target_rms = compute_rms(np.abs(input_signal), rms_window_samples)

    # Initialize arrays
    envelope = np.zeros_like(input_signal)
    agc_signal = np.zeros_like(input_signal)
    noise_floor_linear = 10 ** (noise_floor / 20)

    # Process each sample
    for i in range(len(input_signal)):
        frame_index = i // frame_length_samples
        instant_level = abs(input_signal[i])

        # Update envelope based on VAD decision
        if binary_vector[frame_index] == 1:
            if i == 0:
                envelope[i] = instant_level
            else:
                envelope[i] = max(instant_level, envelope[i-1] * (1 - 1/release_samples))
        else:
            if i == 0:
                envelope[i] = instant_level
            else:
                envelope[i] = envelope[i-1]

        # Compute desired gain
        if envelope[i] > noise_floor_linear:
            desired_gain = target_rms[i] / (envelope[i] + 1e-6)
        else:
            desired_gain = gain

        # Apply attack/release to gain
        if desired_gain < gain:
            gain = desired_gain + (gain - desired_gain) * np.exp(-1 / attack_samples)
        else:
            gain = desired_gain + (gain - desired_gain) * np.exp(-1 / release_samples)

        # Apply gain and clip to prevent overflow
        agc_signal[i] = np.clip(input_signal[i] * gain, -1.0, 1.0)

    # Convert the processed signal back to the original data type
    if bit_depth == 1:
        output_signal = (agc_signal * 127).astype(np.int8)
    elif bit_depth == 2:
        output_signal = (agc_signal * 32767).astype(np.int16)

    # Create a BytesIO object to hold the WAV data
    output_bytes = io.BytesIO()

    # Write the processed audio to a WAV file in memory
    with wave.open(output_bytes, 'wb') as wav_file:
        wav_file.setnchannels(channels)
        wav_file.setsampwidth(bit_depth)
        wav_file.setframerate(sample_rate)
        wav_file.writeframes(output_signal.tobytes())

    # Get the WAV bytes
    wav_bytes = output_bytes.getvalue()

    return wav_bytes

# Example usage:
# processed_wav_bytes = vad_aware_agc_process(input_wav_bytes, binary_vector)

​

​

​

​

In-depth explainations from the team

Voice Activity Detector

import wave
import numpy as np
import matplotlib.pyplot as plt
import io
from io import BytesIO


class VoiceActivityDetector:
    def __init__(self, input_file, frame_duration, threshold, smoothness, remove_dc,
                 plot_graphs):
        """
        Initialize the VoiceActivityDetector with the given parameters.
        :param input_file: Input file object (BytesIO)
        :param frame_duration: Duration of each frame in seconds (0.01, 0.02, or 0.03)
        :param threshold: Energy threshold for speech detection (0 to 1)
        :param smoothness: Smoothness level for VAD (0, 1, 2, or 3)
        :param remove_dc: Boolean to remove DC component from the audio data
        :param plot_graphs: Boolean to plot graphs for debugging
        """
        # Parameters Section
        self.input_file = input_file
        self.frame_duration = frame_duration
        self.threshold = threshold
        self.aggressiveness = smoothness
        self.look_back = self.get_look_back(smoothness)
        self.min_ones = 1
        self.remove_dc_flag = remove_dc
        self.plot_graphs = plot_graphs

        # Validate Parameters
        self.validate_parameters()

        # Operations Section
        self.audio_data, self.frame_rate = self.read_wav(self.input_file)
        self.original_audio_data = self.audio_data.copy()
        if self.remove_dc_flag:
            self.audio_data = self.remove_dc_component(self.audio_data)
        self.speech_segments = None
        self.energy = None
        self.smoothed_speech_segments = None

    def validate_parameters(self):
        """
        Validate the parameters passed to the VoiceActivityDetector.
        :return: None
        """
        # Validate frame_duration
        if self.frame_duration not in [0.01, 0.02, 0.03]:
            raise ValueError("frame_duration must be 0.01, 0.02, or 0.03 seconds.")

        # Validate threshold
        if not (0 <= self.threshold <= 1):
            raise ValueError("threshold must be between 0 and 1.")

    @staticmethod
    def get_look_back(level):
        """
        Get the look-back window size based on the smoothness level.
        :param level: Smoothness level (0, 1, 2, or 3)
        :return: Look-back window size
        """
        if level == 0:
            return 0
        elif level == 1:
            return 4
        elif level == 2:
            return 6
        elif level == 3:
            return 8
        else:
            raise ValueError("Invalid aggressiveness level. Choose between 0, 1, 2, or 3.")

    def read_wav(self, input_file):
        """
        Read the WAV file and return the audio data and frame rate.
        :param input_file: Input file object
        :return: Tuple of audio data and frame rate
        """
        # Check if the file is a WAV file
        try:
            with wave.open(input_file, 'rb') as input_wav_file:
                # Get basic information
                n_channels = input_wav_file.getnchannels()
                sample_width = input_wav_file.getsampwidth()
                frame_rate = input_wav_file.getframerate()
                n_frames = input_wav_file.getnframes()

                # Read raw audio data
                raw_data = input_wav_file.readframes(n_frames)

        except Exception as X:
            raise ValueError("The provided file is not a WAV file.")

        # Convert raw data to numpy array
        if sample_width == 1:
            dtype = np.int8
        elif sample_width == 2:
            dtype = np.int16
        else:
            raise ValueError("Unsupported sample width")

        audio_data = np.frombuffer(raw_data, dtype=dtype)

        # If stereo, take the mean of both channels
        if n_channels == 2:
            audio_data = audio_data.reshape(-1, 2).mean(axis=1)

        # Convert to int32 for further processing
        audio_data = audio_data.astype(np.int32)

        return audio_data, frame_rate

    def remove_dc_component(self, audio_data):
        """
        Removes the DC component from the audio data.
        :param audio_data: Numpy array of audio data
        :return: Numpy array of audio data without DC component
        """
        # Calculate the DC component (mean of the signal)
        dc_component = np.mean(audio_data)

        # Subtract the DC component from the signal to get the AC component
        ac_component = audio_data - dc_component

        return ac_component

    def vad(self):
        """
        Perform Voice Activity Detection on the audio data.
        :return: None
        """
        # Calculate frame size
        frame_size = int(self.frame_rate * self.frame_duration)

        # Pad the audio data to ensure all frames are full
        remainder_size = (len(self.audio_data) % frame_size)
        if remainder_size > 0:
            pad_size = frame_size - remainder_size
            self.audio_data = np.pad(self.audio_data, (0, pad_size), 'constant')

        # Reshape audio data into frames
        frames = self.audio_data.reshape(-1, frame_size)

        # Convert frames to float32 before squaring to avoid overflow
        frames = frames.astype(np.float32)

        # Calculate energy for each frame
        self.energy = np.sum(frames ** 2, axis=1) / frame_size

        # Normalize the energy values
        max_energy = np.max(self.energy)
        self.energy = self.energy / max_energy if max_energy != 0 else self.energy

        # Apply threshold to get speech segments
        self.speech_segments = self.energy > self.threshold

    def smooth_speech_segments(self):
        """
        Smooths the speech segments based on a look-back window.
        :return: None
        """
        smoothed_segments = self.speech_segments.copy()
        for i in range(len(self.speech_segments)):
            if smoothed_segments[i] == 0 and self.look_back > 0:
                # Check the previous `look_back` frames
                if i >= self.look_back and np.sum(self.speech_segments[i - self.look_back:i]) >= self.min_ones:
                    smoothed_segments[i] = 1
        self.smoothed_speech_segments = smoothed_segments

    def plot_audio_data(self):
        """
        Plot the audio data before and after DC removal.
        :return: None
        """
        if not self.plot_graphs:
            return
        times_audio = np.arange(len(self.audio_data)) / self.frame_rate
        times_original_audio = np.arange(len(self.original_audio_data)) / self.frame_rate
        plt.figure(figsize=(12, 4))
        plt.plot(times_original_audio, self.original_audio_data, 'b', alpha=0.5, label='Original Audio')
        if self.remove_dc_flag:
            plt.plot(times_audio, self.audio_data, 'g', alpha=0.5, label='Audio After DC Removal')
        plt.title('Audio Data Before and After DC Removal')
        plt.xlabel('Time (s)')
        plt.ylabel('Amplitude')
        plt.legend()
        plt.tight_layout()
        plt.show()

    def plot_vad_results(self):
        """
        Plot the results of Voice Activity Detection.
        :return: None
        """
        if not self.plot_graphs:
            return
        times_energy = np.arange(len(self.energy)) * (len(self.audio_data) / len(self.energy) / self.frame_rate)
        plt.figure(figsize=(12, 4))
        plt.plot(times_energy, self.energy, 'b', alpha=0.3, label='Energy')
        plt.plot(times_energy, self.smoothed_speech_segments * np.max(self.energy), linewidth=2,
                 label='Smoothed Speech Segments')
        plt.axhline(y=self.threshold, color='b', linestyle='--', label='Threshold')
        plt.title('Voice Activity Detection')
        plt.xlabel('Time (s)')
        plt.ylabel('Normalized Energy')
        plt.legend()
        plt.tight_layout()
        plt.show()

    def get_speech_segments(self):
        """
        Get the binary sequence showing detected speech segments.
        :return: Binary sequence of detected speech segments
        """
        binary_sequence = ','.join(map(str, self.smoothed_speech_segments.astype(int)))
        return binary_sequence


def process_audio_file(input_file,frame_duration=0.01, threshold=0.1, smoothness=0, remove_dc=False, plot_graphs=False):
    """
    Process the audio file and perform Voice Activity Detection.
    :param input_file: Input file object
    :return: Binary sequence of detected speech segments
    """

    vad = VoiceActivityDetector(input_file, frame_duration, threshold, smoothness, remove_dc, plot_graphs)
    vad.vad()
    vad.smooth_speech_segments()
    vad.plot_audio_data()
    vad.plot_vad_results()
    return vad.get_speech_segments()

​

​

​

​

​

In-depth explainations from the team

Short Time Fourier Transform

import numpy as np  # Import numpy for mathematical computations
from scipy.io import wavfile  # Import wavfile module for reading and writing WAV files
import warnings  # Import warnings module for managing warnings

warnings.filterwarnings("ignore", category=wavfile.WavFileWarning)  # Ignore WAV warnings

# Function to create a Hanning window
# Input: window_length (int) - the length of the window
# Output: A numpy array representing the Hanning window
def CreateHanningWindow(window_length):
    return 0.5 * (1 - np.cos(2 * np.pi * np.arange(window_length) / (window_length - 1)))  # Create Hanning window of given length

# Function to compute the Fast Fourier Transform (FFT)
# Input: signal (numpy array) - the input signal
# Output: A numpy array representing the FFT of the input signal
def FFT(signal):
    N = len(signal)  # Length of the signal
    if N <= 1:  # If the signal length is 1 or less
        return signal  # Return the signal as it is
    even = FFT(signal[0::2])  # Compute FFT for even indexed elements
    odd = FFT(signal[1::2])  # Compute FFT for odd indexed elements
    T = np.exp(-2j * np.pi * np.arange(N) / N)  # Compute FFT factors
    return np.concatenate([even + T[:N // 2] * odd, even + T[N // 2:] * odd])  # Combine results

# Function to compute the Inverse Fast Fourier Transform (IFFT)
# Input: spectrum (numpy array) - the input spectrum
# Output: A numpy array representing the IFFT of the input spectrum
def IFFT(spectrum):
    N = len(spectrum)  # Length of the spectrum
    if N <= 1:  # If the spectrum length is 1 or less
        return spectrum  # Return the spectrum as it is
    even = IFFT(spectrum[0::2])  # Compute IFFT for even indexed elements
    odd = IFFT(spectrum[1::2])  # Compute IFFT for odd indexed elements
    T = np.exp(2j * np.pi * np.arange(N) / N)  # Compute IFFT factors
    return (np.concatenate([even + T[:N // 2] * odd, even + T[N // 2:] * odd]) / 2)  # Combine results and divide by 2

# Function to compute the Short-Time Fourier Transform (STFT)
# Input: input_wav (str) - path to the input WAV file
#        window_size (int) - the size of the window (default 2048)
#        hop_size (int) - the hop size (default 512)
# Output: A tuple containing the STFT matrix and the sample rate
def STFT(input_wav, window_size=2048, hop_size=512):
    sample_rate, audio_signal = wavfile.read(input_wav)  # Read WAV file and get sample rate and signal

    audio_signal = audio_signal.astype(np.float64)  # Convert to float64
    if np.issubdtype(audio_signal.dtype, np.integer):  # Check if the signal is of integer type
        audio_signal /= np.iinfo(audio_signal.dtype).max  # Normalize by the maximum possible value for the data type
    elif np.issubdtype(audio_signal.dtype, np.floating):  # Check if the signal is of float type
        audio_signal /= np.max(np.abs(audio_signal))  # Normalize by the maximum absolute value in the signal

    if len(audio_signal.shape) == 2:  # Check if the signal is stereo
        audio_signal = np.mean(audio_signal, axis=1)  # Convert to mono by averaging the channels

    window = CreateHanningWindow(window_size)  # Create Hanning window of given length
    num_frames = 1 + (len(audio_signal) - window_size) // hop_size  # Compute the number of windows
    stft_matrix = np.zeros((window_size, num_frames), dtype=np.complex128)  # Create matrix to store STFT results

    for frame in range(num_frames):  # Loop over all windows
        start = frame * hop_size  # Compute start of the window
        end = start + window_size  # Compute end of the window
        frame_data = audio_signal[start:end] * window  # Multiply signal by Hanning window
        stft_matrix[:, frame] = FFT(frame_data)  # Compute FFT for the window and store result in matrix

    return stft_matrix, sample_rate  # Return STFT matrix and sample rate

# Function to compute the Inverse Short-Time Fourier Transform (ISTFT)
# Input: stft_matrix (numpy array) - the STFT matrix
#        sample_rate (int) - the sample rate
#        window_size (int) - the size of the window
#        hop_size (int) - the hop size
#        output_wav (str) - path to the output WAV file
# Output: None (writes the reconstructed signal to the output WAV file)
def ISTFT(stft_matrix, sample_rate, window_size, hop_size, output_wav):
    num_frames = stft_matrix.shape[1]  # Compute the number of windows
    expected_signal_length = window_size + hop_size * (num_frames - 1)  # Compute expected length of the reconstructed signal
    reconstructed_signal = np.zeros(expected_signal_length, dtype=np.float64)  # Create array to store reconstructed signal
    window = CreateHanningWindow(window_size)  # Create Hanning window of given length

    for frame in range(num_frames):  # Loop over all windows
        start = frame * hop_size  # Compute start of the window
        end = start + window_size  # Compute end of the window
        frame_data = IFFT(stft_matrix[:, frame])  # Compute IFFT for the window
        reconstructed_signal[start:end] += np.real(frame_data) * window  # Add reconstructed window to the reconstructed signal

    normalization = np.zeros_like(reconstructed_signal)  # Create array for normalization
    for frame in range(num_frames):  # Loop over all windows
        start = frame * hop_size  # Compute start of the window
        end = start + window_size  # Compute end of the window
        normalization[start:end] += window ** 2  # Compute normalization by squaring the window

    epsilon = 1e-8  # Small value to avoid division by zero
    reconstructed_signal /= (normalization + epsilon)  # Normalize the reconstructed signal

    reconstructed_signal = np.clip(reconstructed_signal, -1.0, 1.0)  # Clip the signal to the range [-1, 1]
    reconstructed_signal = (reconstructed_signal * 32767).astype(np.int16)  # Convert to int16 range

    wavfile.write(output_wav, sample_rate, reconstructed_signal)  # Write the reconstructed signal to the output WAV file

​

​

​

​

​

In-depth explainations from the team

Decimation and Interpolation

import numpy as np
from scipy.io import wavfile
import os

def my_convolution(signal, impulse_response, mode='full'):
    '''
    signal: input signal
    impulse_response: impulse response
    mode: 'full' or 'same'
    full: returns the convolution of signal and impulse_response
    same: returns the central part of the convolution of signal and impulse_response
    return: convolution of signal and impulse_response
    '''
    signal_length = len(signal)
    impulse_length = len(impulse_response)
    # Padding the signal for multiplication
    padded_signal = np.pad(signal, (impulse_length - 1, impulse_length - 1), 'constant')
    flip_impulse_response = np.flip(impulse_response)
    result_length = signal_length + impulse_length - 1
    convolution_result = np.zeros(result_length)
    for i in range(result_length):
        convolution_result[i] = np.sum(padded_signal[i:i + impulse_length] * flip_impulse_response)
    if mode == 'full':
        return convolution_result
    elif mode == 'same':
        start_index = (impulse_length - 1) // 2
        return convolution_result[start_index:start_index + signal_length]
    return convolution_result


def generate_new_filename_decimation(input_filename):
    '''
    input_filename: The original input file name
    return: New filename with "new" inserted before the file extension
    '''
    base, ext = os.path.splitext(input_filename)
    new_filename = f"{base}_new_deci{ext}"
    return new_filename


def generate_new_filename_interpolation(input_filename):
    '''
    input_filename: The original input file name
    return: New filename with "new" inserted before the file extension
    '''
    base, ext = os.path.splitext(input_filename)
    new_filename = f"{base}_new_inter{ext}"
    return new_filename


def down_sample(filtered_signal, decimation_factor):
    '''
    x: input signal
    M: decimation factor
    return: downsampled signal
    '''
    return filtered_signal[::decimation_factor]


def decimate(input_filename, decimation_factor):
    samplerate, input_signal = wavfile.read(input_filename)

    time_indices = np.arange(-20 * decimation_factor, 20 * decimation_factor + 1)
    sinc_filter = np.sin(np.pi / decimation_factor * time_indices) / (np.pi * time_indices)
    sinc_filter[time_indices == 0] = 1 / decimation_factor

    filtered_signal = my_convolution(input_signal, sinc_filter, mode='same')
    downsampled_signal = down_sample(filtered_signal, decimation_factor)

    new_samplerate = int(samplerate / decimation_factor)
    normalized_signal = np.int16(
        downsampled_signal / np.max(np.abs(downsampled_signal)) * 32767)  # Normalize signal to int16 range

    output_filename = generate_new_filename_decimation(input_filename)
    wavfile.write(output_filename, new_samplerate, normalized_signal)

    return output_filename


def my_interpolation_LPF(L, LPF_type):
    '''
    L: interpolation factor
    n: number of samples
    h: impulse response of the interpolation filter
    LPF_type: 'shanon', 'ZOH', 'FOH'
    return: impulse response of the interpolation filter
    '''
    if LPF_type == 'shanon':
        n = np.arange(-20 * L, 20 * L + 1)
        h = L * np.sin(np.pi / L * n) / (np.pi * n)
        h[n == 0] = 1
    elif LPF_type == 'ZOH':
        h = np.ones(L)
    elif LPF_type == 'FOH':
        impulse_response_step = np.ones(L)
        impulse_response_triangle = my_convolution(impulse_response_step, impulse_response_step, mode='full')
        h = impulse_response_triangle / np.max(impulse_response_triangle)
    else:
        raise ValueError("Invalid filter type. Choose 'shanon', 'ZOH', or 'FOH'.")
    return h


def up_sample(input_signal, interpolation_factor):
    up_sampled_signal = np.zeros(interpolation_factor * len(input_signal))
    up_sampled_signal[::interpolation_factor] = input_signal
    return up_sampled_signal


def interpolate(input_filename, L, filter_type):
    '''
    x: input signal
    L: interpolation factor
    filter_type: 'shanon', 'ZOH', 'FOH'
    return:new wav file with upsampled signal include interpolation filter
    '''

    samplerate, signals_sample = wavfile.read(input_filename)
    upsampled_signal = up_sample(signals_sample, L)
    interpolation_filter = my_interpolation_LPF(L, filter_type)
    filtered_signal = my_convolution(upsampled_signal, interpolation_filter, mode='same')

    new_samplerate = int(samplerate * L)
    normalized_signal = np.int16(filtered_signal / np.max(np.abs(filtered_signal)) * 32767)
    output_filename = generate_new_filename_interpolation(input_filename)
    wavfile.write(output_filename, new_samplerate, normalized_signal)

    return output_filename

​

​

​

​

​

In-depth explainations from the team

Speed Down/Up

import struct
import io

def process_wav_data(input_wav_data, speed_factor):
    """
    Process the WAV file data: read, change speed, and return modified data.
    
    :param input_wav_data: bytes object containing the entire WAV file
    :param speed_factor: float, speed factor to apply (e.g., 0.5 to slow down, 2 to speed up)
    :return: bytes object containing the modified WAV file
    """
    def read_wav_header(data):
        """Read the WAV file header from the data."""
        if data[:4] != b'RIFF' or data[8:12] != b'WAVE':
            raise ValueError("Not a valid WAV file")

        channels = struct.unpack_from('<H', data, 22)[0]
        sample_rate = struct.unpack_from('<I', data, 24)[0]
        bits_per_sample = struct.unpack_from('<H', data, 34)[0]

        return channels, sample_rate, bits_per_sample

    def write_wav_header(f, channels, sample_rate, bits_per_sample, data_size):
        """Write a valid WAV header to the file-like object."""
        f.write(b'RIFF')
        f.write(struct.pack('<I', data_size + 36))  # File size - 8
        f.write(b'WAVE')
        f.write(b'fmt ')
        f.write(struct.pack('<I', 16))  # Size of fmt chunk
        f.write(struct.pack('<H', 1))  # Audio format (1 for PCM)
        f.write(struct.pack('<H', channels))
        f.write(struct.pack('<I', sample_rate))
        f.write(struct.pack('<I', sample_rate * channels * bits_per_sample // 8))  # Byte rate
        f.write(struct.pack('<H', channels * bits_per_sample // 8))  # Block align
        f.write(struct.pack('<H', bits_per_sample))
        f.write(b'data')
        f.write(struct.pack('<I', data_size))

    def change_speed(data, speed_factor, sample_width):
        """Change the speed of the audio data by the given speed factor."""
        num_samples = len(data) // sample_width
        new_num_samples = int(num_samples / speed_factor)
        new_data = bytearray()
        for i in range(new_num_samples):
            sample_index = int(i * speed_factor) * sample_width
            if sample_index + sample_width <= len(data):
                new_data.extend(data[sample_index:sample_index + sample_width])
        return bytes(new_data)

    try:
        # Edge case: Check if speed_factor is valid
        if speed_factor <= 0:
            raise ValueError("Speed factor must be greater than 0.")

        channels, sample_rate, bits_per_sample = read_wav_header(input_wav_data)
        sample_width = bits_per_sample // 8

        # Process the audio data
        audio_data = input_wav_data[44:]  # Skip header
        processed_data = change_speed(audio_data, speed_factor, sample_width)

        # Edge case: Check if processing resulted in empty data
        if not processed_data:
            raise ValueError("Processing resulted in empty audio data.")

        # Create a new in-memory file to store the result
        output_buffer = io.BytesIO()

        # Write the new header
        write_wav_header(output_buffer, channels, sample_rate, bits_per_sample, len(processed_data))

        # Write the processed audio data
        output_buffer.write(processed_data)

        # Get the final result as bytes
        result = output_buffer.getvalue()

        return result
    except Exception as e:
        print(f"Error processing WAV data: {e}")
        return None

​

​

​

​

​

In-depth explainations from the team

bottom of page