DC Removal
import wave
import numpy as np
from scipy.io import wavfile
import io
def DC_Removal_filter(input_file,cutoff_frequency=100, numtaps=4400):
# Check if the input file is a WAV file
if not isinstance(input_file, (io.BytesIO, wave.Wave_read)):
return "Error: Input must be a WAV file object."
def sinc_high_pass_filter(cutoff, fs, numtaps):
t = np.arange(numtaps) - (numtaps - 1) / 2
sinc_func = np.sinc(2 * cutoff * t / fs)
window = np.array([0.54 - 0.46 * np.cos(2 * np.pi * n / (numtaps - 1)) for n in range(numtaps)])
sinc_func *= window
sinc_func /= np.sum(sinc_func)
delta = np.zeros(numtaps)
delta[(numtaps - 1) // 2] = 1
return delta - sinc_func
# Read the WAV file
with wave.open(input_file, 'rb') as wav_file:
fs = wav_file.getframerate()
n_channels = wav_file.getnchannels()
sampwidth = wav_file.getsampwidth()
n_frames = wav_file.getnframes()
# Read all frames
frames = wav_file.readframes(n_frames)
# Convert frames to numpy array
signal = np.frombuffer(frames, dtype=np.int16)
# Convert to float32 for processing
signal = signal.astype(np.float32)
# If stereo, convert to mono by averaging channels
if n_channels == 2:
signal = signal.reshape(-1, 2).mean(axis=1)
# Normalize the signal
signal = signal / np.max(np.abs(signal))
high_pass_filter = sinc_high_pass_filter(cutoff_frequency, fs, numtaps)
# Initialize output signal
output_signal = np.zeros_like(signal)
# Buffer to store previous input values (length of filter coefficients)
buffer = np.zeros(numtaps)
# Apply the filter sample-by-sample
for n in range(len(signal)):
# Update the buffer with the current input sample
buffer = np.roll(buffer, 1)
buffer[0] = signal[n]
# Compute the output sample by convolving the filter coefficients with the buffer
output_signal[n] = np.dot(high_pass_filter, buffer)
# Convert back to int16
output_signal = (output_signal * 32767).astype(np.int16)
# Create a BytesIO object to store the output WAV
output_wav = io.BytesIO()
# Write the filtered signal to the BytesIO object
with wave.open(output_wav, 'wb') as wav_file:
wav_file.setnchannels(1) # mono
wav_file.setsampwidth(2) # 2 bytes per sample
wav_file.setframerate(fs)
wav_file.writeframes(output_signal.tobytes())
# Reset the BytesIO object's position to the beginning
output_wav.seek(0)
return output_wav
​
​
​
​
In-depth explainations from the team
​
PDM2PCM
import numpy as np
import scipy
from scipy.io.wavfile import write
import io
import yaml
def read_pdm_file(file_path):
"""
Reads a PDM signal from a text file. Assumes the signal is composed of -1 and 1 values.
Parameters:
file_path (str): Path to the PDM file.
Returns:
np.array: Array containing the PDM signal.
"""
pdm_signal = []
# Open the file and read each line
with open(file_path, 'r') as file:
for line in file:
# Split the line into individual values and append them to the signal list
values = line.split()
for val in values:
pdm_signal.append(int(val))
# Convert the list to a numpy array of type int8
pdm_array = np.array(pdm_signal, dtype=np.int8)
return pdm_array
def moving_average(signal, window_size):
"""
Computes the moving average of the input signal using a specified window size.
Parameters:
signal (np.array): The input signal.
window_size (int): The window size for the moving average.
Returns:
np.array: The signal after applying the moving average.
"""
# Pad the signal with zeros at the beginning
pad_width = window_size - 1
padding = np.zeros(pad_width, dtype=signal.dtype)
padded_signal = np.concatenate((padding, signal), axis=0)
# Compute the cumulative sum of the padded signal
cumsum = np.cumsum(padded_signal)
# Calculate the moving average using the cumulative sum
window_size_float = float(window_size)
moving_sum = cumsum[window_size:] - cumsum[:-window_size]
moving_average_result = moving_sum / window_size_float
return moving_average_result
def cic_filter(pdm_signal, decimation_factor=64, order=1):
"""
Applies a CIC filter to the PDM signal.
Parameters:
pdm_signal (np.array): The PDM signal to filter.
decimation_factor (int): The decimation factor for the CIC filter.
order (int): The order of the CIC filter.
Returns:
np.array: The filtered PCM signal.
"""
# Step 1: Integrator stage (Moving Average)
integrator = pdm_signal
for _ in range(order):
integrator = moving_average(integrator, decimation_factor)
# Step 2: Decimation
decimated_integrator = integrator[::decimation_factor]
# Step 3: Comb stage
comb = np.diff(decimated_integrator)
return comb
def save_pcm_as_wav_file(pcm_signal, sample_rate):
"""
Saves the PCM signal as a bytes object.
Parameters:
pcm_signal (np.array): The PCM signal to save. This should be a 1D numpy array of raw PCM data.
sample_rate (int): The sample rate of the PCM signal, in Hz (samples per second).
Returns:
bytes: The WAV file data as a bytes object.
"""
# Normalize the PCM signal
max_val = np.max(np.abs(pcm_signal))
normalized_signal = pcm_signal / max_val
scaled_signal = normalized_signal * 32767
int16_signal = np.int16(scaled_signal)
# Save the PCM signal to a WAV file in memory
wav_buffer = io.BytesIO()
write(wav_buffer, sample_rate, int16_signal)
wav_data = wav_buffer.getvalue()
return wav_data
def Pdm2Pcm(pdm_file_path, decimation_factor=64, order=4, pcm_sample_rate=8000):
"""
Converts a PDM signal from a file to a PCM signal and saves it as a WAV file.
Parameters:
pdm_file_path (str): Path to the PDM file.
Returns:
bytes: The WAV file data as a bytes object.
"""
# Read PDM signal from file
pdm_signal = read_pdm_file(pdm_file_path)
# Convert PDM signal to PCM signal
pcm_signal = cic_filter(pdm_signal, decimation_factor, order)
# Save the PCM signal as a variable
wav_file = save_pcm_as_wav_file(pcm_signal, pcm_sample_rate)
return wav_file
​
​
​
​
Noise Reduction
import io
import wave
import numpy as np
import matplotlib.pyplot as plt
import scipy.io.wavfile as wav
import warnings
warnings.filterwarnings("ignore", category=wav.WavFileWarning)
#STFT part
def CreateHanningWindow(window_length):
return 0.5 * (1 - np.cos(2 * np.pi * np.arange(window_length) / (window_length - 1)))
def FFT(signal):
N = len(signal)
if N <= 1:
return signal
even = FFT(signal[0::2])
odd = FFT(signal[1::2])
T = np.exp(-2j * np.pi * np.arange(N) / N)
return np.concatenate([even + T[:N // 2] * odd, even + T[N // 2:] * odd])
def IFFT(spectrum):
N = len(spectrum)
if N <= 1:
return spectrum
even = IFFT(spectrum[0::2])
odd = IFFT(spectrum[1::2])
T = np.exp(2j * np.pi * np.arange(N) / N)
return (np.concatenate([even + T[:N // 2] * odd, even + T[N // 2:] * odd]) / 2)
def STFT(audio_signal, sample_rate=None, window_size=2048, hop_size=512):
if isinstance(audio_signal, str):
sample_rate, audio_signal = wav.read(audio_signal)
audio_signal = audio_signal.astype(np.float64)
if np.issubdtype(audio_signal.dtype, np.integer):
audio_signal /= np.iinfo(audio_signal.dtype).max
elif np.issubdtype(audio_signal.dtype, np.floating):
audio_signal /= np.max(np.abs(audio_signal))
window = CreateHanningWindow(window_size)
num_frames = 1 + (len(audio_signal) - window_size) // hop_size
stft_matrix = np.zeros((window_size, num_frames), dtype=np.complex128)
for frame in range(num_frames):
start = frame * hop_size
end = start + window_size
frame_data = audio_signal[start:end] * window
stft_matrix[:, frame] = FFT(frame_data)
return stft_matrix, sample_rate
def ISTFT(stft_matrix, sample_rate, window_size, hop_size, output_wav):
num_frames = stft_matrix.shape[1]
expected_signal_length = window_size + hop_size * (num_frames - 1)
reconstructed_signal = np.zeros(expected_signal_length, dtype=np.float64)
window = CreateHanningWindow(window_size)
for frame in range(num_frames):
start = frame * hop_size
end = start + window_size
frame_data = IFFT(stft_matrix[:, frame])
reconstructed_signal[start:end] += np.real(frame_data) * window
reconstructed_signal = reconstructed_signal * 32768
reconstructed_signal = np.int16(reconstructed_signal / np.max(np.abs(reconstructed_signal)) * 32767)
wav.write(output_wav, sample_rate, reconstructed_signal)
def check_file(input_file):
if isinstance(input_file, str):
return input_file.lower().endswith('.wav')
elif isinstance(input_file, io.BytesIO):
return True
else:
return False
def NoiseReduction(input_file, output_file, speech_segments, frame_size, hop_size):
if not check_file(input_file):
print("This is not a WAV file")
return None
# Load input file
print('Loading wav file:', input_file)
with wave.open(input_file, 'rb') as wav_file:
sample_rate = wav_file.getframerate()
num_channels = wav_file.getnchannels()
sample_width = wav_file.getsampwidth()
num_frames = wav_file.getnframes()
waveform = wav_file.readframes(num_frames)
waveform = np.frombuffer(waveform, dtype=np.int16)
if num_channels > 1:
waveform = waveform.reshape(-1, num_channels).mean(axis=1)
waveform = waveform.astype(np.float32) / 32768.0 # Normalize to -1 to 1 range
# Ensure speech_segments length matches the number of STFT frames
num_frames = 1 + (len(waveform) - frame_size) // hop_size
speech_segments = np.array([int(s) for s in speech_segments.split(',')])
if len(speech_segments) > num_frames:
speech_segments = speech_segments[:num_frames]
elif len(speech_segments) < num_frames:
speech_segments = np.pad(speech_segments, (0, num_frames - len(speech_segments)), 'constant')
# Perform STFT on the entire noisy signal
stft_matrix, _ = STFT(waveform, sample_rate=sample_rate, window_size=frame_size, hop_size=hop_size)
magnitude_spectrum = np.abs(stft_matrix)
phase_spectrum = np.angle(stft_matrix)
# Compute noise spectrum from non-speech segments
noise_spectrum = np.zeros_like(magnitude_spectrum[:, 0])
non_speech_frame_count = 0
for i, is_speech in enumerate(speech_segments):
if is_speech == 0:
noise_spectrum += magnitude_spectrum[:, i]
non_speech_frame_count += 1
if non_speech_frame_count > 0:
noise_spectrum /= non_speech_frame_count
else:
print("No non-speech frames detected, estimating noise from the first 0.25 seconds")
noise_estimation_duration = 0.25
noise_samples = int(noise_estimation_duration * sample_rate)
noise_signal = waveform[:noise_samples]
# Perform STFT on noise signal
noise_stft_matrix, _ = STFT(noise_signal, sample_rate=sample_rate, window_size=frame_size, hop_size=hop_size)
noise_magnitude_spectrum = np.abs(noise_stft_matrix)
noise_spectrum = np.mean(noise_magnitude_spectrum, axis=1)
mean_noise_spectrum = noise_spectrum
# Noise reduction with oversubtraction and flooring
alpha = 2 # Oversubtraction factor
beta = 0.01 # Spectral floor
cleaned_spectrum = np.maximum(magnitude_spectrum - alpha * mean_noise_spectrum.reshape((mean_noise_spectrum.shape[0], 1)), beta * magnitude_spectrum)
# Reconstruct signal using inverse STFT
cleaned_complex_spectrum = cleaned_spectrum * np.exp(1.0j * phase_spectrum)
ISTFT(cleaned_complex_spectrum, sample_rate, window_size=frame_size, hop_size=hop_size, output_wav=output_file)
print('Output wav file saved:', output_file)
​
​
​
​
Transmission and Reception
from random import sample
import numpy as np
import scipy.signal as signal
import soundfile as sf
import matplotlib.pyplot as plt
import scipy
import sounddevice as sd
import threading
import os
def SSB(mode='file', file=None, carrier_freq=10000, samplerate=44100):
# Load the WAV file
def load_wav(file):
data, samplerate = sf.read(file)
if len(data) == 0:
raise ValueError("The WAV file is empty.")
if data.ndim > 1: # Handle stereo by taking only one channel
data = data[:, 0] # Use only the first channel for stereo files
return data, samplerate
# Save the WAV file
def save_wav(filename, data, samplerate):
sf.write(filename, data, samplerate)
# Generate a SSB signal
def ssb_modulate(signal, carrier_freq, samplerate):
t = np.arange(len(signal)) / samplerate
signal_mul_cos = signal * np.cos(2 * np.pi * carrier_freq * t) # Multiply with cosine
signal_hilbert = scipy.signal.hilbert(signal) # Apply Hilbert transform
signal_mul_sin = np.imag(signal_hilbert) * np.sin(2 * np.pi * carrier_freq * t) # Multiply with sine
ssb_signal = signal_mul_cos - signal_mul_sin # SSB signal (Upper Sideband)
return ssb_signal
# Low-pass filter using FFT
def low_pass_filter(signal, cutoff_freq, samplerate):
fft_signal = np.fft.fft(signal)
freqs = np.fft.fftfreq(len(signal), 1 / samplerate)
filtered_fft_signal = fft_signal.copy()
filtered_fft_signal[np.abs(freqs) > cutoff_freq] = 0 # Zero out frequencies above cutoff
filtered_signal = 2 * np.fft.ifft(filtered_fft_signal)
return np.real(filtered_signal)
# Demodulate the SSB signal
def ssb_demodulate(ssb_signal, carrier_freq, samplerate):
t = np.arange(len(ssb_signal)) / samplerate
demodulated_signal = ssb_signal * np.cos(2 * np.pi * carrier_freq * t) # Multiply with cosine
cutoff_freq = 4000 # Low-pass filter cutoff frequency
recovered_signal = low_pass_filter(demodulated_signal, cutoff_freq, samplerate) # Apply low-pass filter
return recovered_signal
# Real-time recording and processing callback
def audio_callback(indata, outdata, frames, time, status):
if status:
print(status, flush=True) # Print status if there's an error
ssb_signal = ssb_modulate(indata[:, 0], carrier_freq, samplerate) # Modulate input signal
recovered_signal = ssb_demodulate(ssb_signal, carrier_freq, samplerate) # Demodulate signal
if outdata is not None:
outdata[:, 0] = recovered_signal # Output recovered signal to both channels
outdata[:, 1] = recovered_signal
if mode == 'file' and file:
try:
# Load the input WAV file
data, samplerate = load_wav(file)
carrier_freq = samplerate / 2 # Set carrier frequency to half the sample rate
# Modulate the signal using SSB
ssb_signal = ssb_modulate(data, carrier_freq, samplerate)
# Demodulate the signal to recover the original data
recovered_signal = 0.5 * ssb_demodulate(ssb_signal, carrier_freq, samplerate)
# Save the recovered signal to an output WAV file
output_filename = 'output_file.wav'
save_wav(output_filename, recovered_signal, samplerate)
# Plotting the signals
t = np.arange(len(data)) / samplerate
plt.figure()
plt.subplot(3, 1, 1)
plt.plot(np.fft.fftfreq(len(t)), np.fft.fft(data))
plt.title('Original Baseband Signal')
plt.xlabel('Frequency (Hz)')
plt.ylabel('Amplitude')
plt.subplot(3, 1, 2)
plt.plot(np.fft.fftfreq(len(t)), np.fft.fft(ssb_signal))
plt.title('SSB Modulated Signal (Upper Sideband)')
plt.xlabel('Frequency (Hz)')
plt.ylabel('Amplitude')
plt.subplot(3, 1, 3)
plt.plot(np.fft.fftfreq(len(t)), np.fft.fft(recovered_signal))
plt.title('Demodulated Signal')
plt.xlabel('Frequency (Hz)')
plt.ylabel('Amplitude')
plt.tight_layout()
plt.show()
return output_filename
except FileNotFoundError as e:
print(e)
except ValueError as e:
print(e)
else:
print("Invalid mode or filename not provided for file mode.")
​
​
​
​
Pitch Estimation
import numpy as np
import wave
import scipy.signal
import matplotlib.pyplot as plt
# Constants
CHUNK = 1500 # Number of samples per frame
MIN_PITCH = 50 # Minimum pitch frequency (Hz)
MAX_PITCH = 800 # Maximum pitch frequency (Hz)
# Create pitch estimation function
def pitch_estimation(frame, fs, min_pitch, max_pitch):
# Constants
MIN_PERIOD = fs // max_pitch
MAX_PERIOD = fs // min_pitch
# Remove mean
frame -= np.mean(frame)
# Apply Hanning window
windowed_frame = frame * np.hanning(len(frame))
# Autocorrelation
corr = np.correlate(windowed_frame, windowed_frame, mode='full')
corr = corr[len(corr) // 2:]
# Thresholding the autocorrelation to remove low-magnitude peaks
corr[corr < 0.1 * np.max(corr)] = 0
# Find the first positive slope
d = np.diff(corr)
PositiveSlope=np.where(d > 0)
if PositiveSlope[0].size == 0:
start=0
else:
start = PositiveSlope[0][0]
# Find the peak in the specified range
peak = np.argmax(corr[start + MIN_PERIOD:start + MAX_PERIOD]) + start + MIN_PERIOD
# Calculate pitch
if peak >= MIN_PERIOD and peak <= MAX_PERIOD:
# Refine peak using parabolic interpolation
if peak > 0 and peak < len(corr) - 1:
alpha = corr[peak - 1]
beta = corr[peak]
gamma = corr[peak + 1]
p = 0.5 * (alpha - gamma) / (alpha - 2 * beta + gamma) if alpha - 2 * beta + gamma != 0 else 0
pitch_period = peak + p
else:
pitch_period = peak
pitch = fs / pitch_period
if pitch > max_pitch or pitch < min_pitch:
pitch = None
else:
pitch = None
if(start==0):
return None
return pitch
# Convert wav file to numpy array and plot results for each frame. used to show the pitch estimation over time
def process_wav_file_pitches(wf, plot_pitch_estimation=False):
# Open the WAV file
num_channels = wf.getnchannels()
sampwidth = wf.getsampwidth()
framerate = wf.getframerate()
num_frames = wf.getnframes()
# Read the entire file
audio_data = wf.readframes(num_frames)
# Convert the byte data to numpy array
audio_data = np.frombuffer(audio_data, dtype=np.int16)
# If the audio is stereo, take only one channel
if num_channels > 1:
audio_data = audio_data[::num_channels]
# Process the audio in chunks
pitches = []
for i in range(0, len(audio_data), CHUNK//2): #CHUNK//2 is used to overlap the frames
frame = audio_data[i:i + CHUNK].astype(np.float32)
if len(frame) == CHUNK:
pitch = pitch_estimation(frame, framerate, MIN_PITCH, MAX_PITCH)
pitches.append(pitch)
# Optional: Apply median filter to smooth pitch estimates. only used for the plot
#if len(pitches) > 0:
#pitches = scipy.signal.medfilt(pitches, kernel_size=5)
# Plot the detected pitches
if plot_pitch_estimation:
# Plot the detected pitches
t = np.arange(len(pitches)) * CHUNK / framerate
plt.figure(figsize=(10, 6))
plt.plot(t, pitches, label='Detected Pitch')
plt.xlabel('Time (s)')
plt.ylabel('Pitch (Hz)')
plt.title('Pitch Estimation Over Time')
plt.legend()
plt.grid()
plt.show()
return pitches #return the pitches array.
Acoustic Gain Control
import numpy as np
import wave
import io
def vad_aware_agc_process(input_signal_bytes, binary_vector, frame_duration=0.01, gain=0.1):
"""
Apply VAD-aware Automatic Gain Control (AGC) to the input audio signal.
Args:
input_signal_bytes (bytes): Input audio data as bytes
binary_vector (np.array): VAD decision for each frame (1 for speech, 0 for non-speech)
Returns:
bytes: Processed audio signal as WAV bytes
"""
def extract_audio_data(audio_bytes):
if isinstance(audio_bytes, io.BytesIO):
audio_bytes.seek(0) # Move the cursor to the beginning of the BytesIO object
wav_file = wave.open(audio_bytes, 'rb')
else:
wav_file = wave.open(io.BytesIO(audio_bytes), 'rb')
channels = wav_file.getnchannels()
bit_depth = wav_file.getsampwidth()
sample_rate = wav_file.getframerate()
frame_count = wav_file.getnframes()
audio_data = wav_file.readframes(frame_count)
if bit_depth == 1:
data_type = np.int8
elif bit_depth == 2:
data_type = np.int16
else:
raise ValueError("Unsupported bit depth in the audio file.")
audio_array = np.frombuffer(audio_data, dtype=data_type)
if channels == 2:
audio_array = audio_array.reshape(-1, 2).mean(axis=1)
return audio_array.astype(np.float32) / np.iinfo(data_type).max, sample_rate, channels, bit_depth
def compute_rms(signal, window_size):
return np.sqrt(np.convolve(signal**2, np.ones(window_size)/window_size, mode='same'))
def calculate_parameters(input_signal, sample_rate):
signal_duration = len(input_signal) / sample_rate
signal_energy = np.sum(input_signal**2)
attack_time = max(0.001, min(0.02, 0.01 * (signal_duration / 10)))
release_time = max(0.05, min(0.2, 0.1 * (signal_duration / 10)))
noise_floor = max(-80, min(-40, -60 + 10 * np.log10(signal_energy)))
rms_window_ms = max(20, min(100, 50 * (signal_duration / 10)))
return attack_time, release_time, noise_floor, rms_window_ms
# Extract audio data and sample rate
input_signal, sample_rate, channels, bit_depth = extract_audio_data(input_signal_bytes)
# Calculate adaptive parameters
attack_time, release_time, noise_floor, rms_window_ms = calculate_parameters(input_signal, sample_rate)
# Convert time constants to sample counts
frame_length_samples = int(frame_duration * sample_rate)
attack_samples = int(attack_time * sample_rate)
release_samples = int(release_time * sample_rate)
rms_window_samples = int(rms_window_ms * sample_rate / 1000)
# Calculate the number of frames based on the input signal length
num_samples = len(input_signal)
num_frames = (num_samples + frame_length_samples - 1) // frame_length_samples
# Ensure the VAD binary vector matches the number of frames
if len(binary_vector) < num_frames:
binary_vector = np.pad(binary_vector, (0, num_frames - len(binary_vector)), mode='constant', constant_values=0)
elif len(binary_vector) > num_frames:
binary_vector = binary_vector[:num_frames]
assert len(binary_vector) == num_frames, f"VAD array length ({len(binary_vector)}) does not match the number of frames ({num_frames})"
# Compute target RMS
target_rms = compute_rms(np.abs(input_signal), rms_window_samples)
# Initialize arrays
envelope = np.zeros_like(input_signal)
agc_signal = np.zeros_like(input_signal)
noise_floor_linear = 10 ** (noise_floor / 20)
# Process each sample
for i in range(len(input_signal)):
frame_index = i // frame_length_samples
instant_level = abs(input_signal[i])
# Update envelope based on VAD decision
if binary_vector[frame_index] == 1:
if i == 0:
envelope[i] = instant_level
else:
envelope[i] = max(instant_level, envelope[i-1] * (1 - 1/release_samples))
else:
if i == 0:
envelope[i] = instant_level
else:
envelope[i] = envelope[i-1]
# Compute desired gain
if envelope[i] > noise_floor_linear:
desired_gain = target_rms[i] / (envelope[i] + 1e-6)
else:
desired_gain = gain
# Apply attack/release to gain
if desired_gain < gain:
gain = desired_gain + (gain - desired_gain) * np.exp(-1 / attack_samples)
else:
gain = desired_gain + (gain - desired_gain) * np.exp(-1 / release_samples)
# Apply gain and clip to prevent overflow
agc_signal[i] = np.clip(input_signal[i] * gain, -1.0, 1.0)
# Convert the processed signal back to the original data type
if bit_depth == 1:
output_signal = (agc_signal * 127).astype(np.int8)
elif bit_depth == 2:
output_signal = (agc_signal * 32767).astype(np.int16)
# Create a BytesIO object to hold the WAV data
output_bytes = io.BytesIO()
# Write the processed audio to a WAV file in memory
with wave.open(output_bytes, 'wb') as wav_file:
wav_file.setnchannels(channels)
wav_file.setsampwidth(bit_depth)
wav_file.setframerate(sample_rate)
wav_file.writeframes(output_signal.tobytes())
# Get the WAV bytes
wav_bytes = output_bytes.getvalue()
return wav_bytes
# Example usage:
# processed_wav_bytes = vad_aware_agc_process(input_wav_bytes, binary_vector)
​
​
​
​
Voice Activity Detector
import wave
import numpy as np
import matplotlib.pyplot as plt
import io
from io import BytesIO
class VoiceActivityDetector:
def __init__(self, input_file, frame_duration, threshold, smoothness, remove_dc,
plot_graphs):
"""
Initialize the VoiceActivityDetector with the given parameters.
:param input_file: Input file object (BytesIO)
:param frame_duration: Duration of each frame in seconds (0.01, 0.02, or 0.03)
:param threshold: Energy threshold for speech detection (0 to 1)
:param smoothness: Smoothness level for VAD (0, 1, 2, or 3)
:param remove_dc: Boolean to remove DC component from the audio data
:param plot_graphs: Boolean to plot graphs for debugging
"""
# Parameters Section
self.input_file = input_file
self.frame_duration = frame_duration
self.threshold = threshold
self.aggressiveness = smoothness
self.look_back = self.get_look_back(smoothness)
self.min_ones = 1
self.remove_dc_flag = remove_dc
self.plot_graphs = plot_graphs
# Validate Parameters
self.validate_parameters()
# Operations Section
self.audio_data, self.frame_rate = self.read_wav(self.input_file)
self.original_audio_data = self.audio_data.copy()
if self.remove_dc_flag:
self.audio_data = self.remove_dc_component(self.audio_data)
self.speech_segments = None
self.energy = None
self.smoothed_speech_segments = None
def validate_parameters(self):
"""
Validate the parameters passed to the VoiceActivityDetector.
:return: None
"""
# Validate frame_duration
if self.frame_duration not in [0.01, 0.02, 0.03]:
raise ValueError("frame_duration must be 0.01, 0.02, or 0.03 seconds.")
# Validate threshold
if not (0 <= self.threshold <= 1):
raise ValueError("threshold must be between 0 and 1.")
@staticmethod
def get_look_back(level):
"""
Get the look-back window size based on the smoothness level.
:param level: Smoothness level (0, 1, 2, or 3)
:return: Look-back window size
"""
if level == 0:
return 0
elif level == 1:
return 4
elif level == 2:
return 6
elif level == 3:
return 8
else:
raise ValueError("Invalid aggressiveness level. Choose between 0, 1, 2, or 3.")
def read_wav(self, input_file):
"""
Read the WAV file and return the audio data and frame rate.
:param input_file: Input file object
:return: Tuple of audio data and frame rate
"""
# Check if the file is a WAV file
try:
with wave.open(input_file, 'rb') as input_wav_file:
# Get basic information
n_channels = input_wav_file.getnchannels()
sample_width = input_wav_file.getsampwidth()
frame_rate = input_wav_file.getframerate()
n_frames = input_wav_file.getnframes()
# Read raw audio data
raw_data = input_wav_file.readframes(n_frames)
except Exception as X:
raise ValueError("The provided file is not a WAV file.")
# Convert raw data to numpy array
if sample_width == 1:
dtype = np.int8
elif sample_width == 2:
dtype = np.int16
else:
raise ValueError("Unsupported sample width")
audio_data = np.frombuffer(raw_data, dtype=dtype)
# If stereo, take the mean of both channels
if n_channels == 2:
audio_data = audio_data.reshape(-1, 2).mean(axis=1)
# Convert to int32 for further processing
audio_data = audio_data.astype(np.int32)
return audio_data, frame_rate
def remove_dc_component(self, audio_data):
"""
Removes the DC component from the audio data.
:param audio_data: Numpy array of audio data
:return: Numpy array of audio data without DC component
"""
# Calculate the DC component (mean of the signal)
dc_component = np.mean(audio_data)
# Subtract the DC component from the signal to get the AC component
ac_component = audio_data - dc_component
return ac_component
def vad(self):
"""
Perform Voice Activity Detection on the audio data.
:return: None
"""
# Calculate frame size
frame_size = int(self.frame_rate * self.frame_duration)
# Pad the audio data to ensure all frames are full
remainder_size = (len(self.audio_data) % frame_size)
if remainder_size > 0:
pad_size = frame_size - remainder_size
self.audio_data = np.pad(self.audio_data, (0, pad_size), 'constant')
# Reshape audio data into frames
frames = self.audio_data.reshape(-1, frame_size)
# Convert frames to float32 before squaring to avoid overflow
frames = frames.astype(np.float32)
# Calculate energy for each frame
self.energy = np.sum(frames ** 2, axis=1) / frame_size
# Normalize the energy values
max_energy = np.max(self.energy)
self.energy = self.energy / max_energy if max_energy != 0 else self.energy
# Apply threshold to get speech segments
self.speech_segments = self.energy > self.threshold
def smooth_speech_segments(self):
"""
Smooths the speech segments based on a look-back window.
:return: None
"""
smoothed_segments = self.speech_segments.copy()
for i in range(len(self.speech_segments)):
if smoothed_segments[i] == 0 and self.look_back > 0:
# Check the previous `look_back` frames
if i >= self.look_back and np.sum(self.speech_segments[i - self.look_back:i]) >= self.min_ones:
smoothed_segments[i] = 1
self.smoothed_speech_segments = smoothed_segments
def plot_audio_data(self):
"""
Plot the audio data before and after DC removal.
:return: None
"""
if not self.plot_graphs:
return
times_audio = np.arange(len(self.audio_data)) / self.frame_rate
times_original_audio = np.arange(len(self.original_audio_data)) / self.frame_rate
plt.figure(figsize=(12, 4))
plt.plot(times_original_audio, self.original_audio_data, 'b', alpha=0.5, label='Original Audio')
if self.remove_dc_flag:
plt.plot(times_audio, self.audio_data, 'g', alpha=0.5, label='Audio After DC Removal')
plt.title('Audio Data Before and After DC Removal')
plt.xlabel('Time (s)')
plt.ylabel('Amplitude')
plt.legend()
plt.tight_layout()
plt.show()
def plot_vad_results(self):
"""
Plot the results of Voice Activity Detection.
:return: None
"""
if not self.plot_graphs:
return
times_energy = np.arange(len(self.energy)) * (len(self.audio_data) / len(self.energy) / self.frame_rate)
plt.figure(figsize=(12, 4))
plt.plot(times_energy, self.energy, 'b', alpha=0.3, label='Energy')
plt.plot(times_energy, self.smoothed_speech_segments * np.max(self.energy), linewidth=2,
label='Smoothed Speech Segments')
plt.axhline(y=self.threshold, color='b', linestyle='--', label='Threshold')
plt.title('Voice Activity Detection')
plt.xlabel('Time (s)')
plt.ylabel('Normalized Energy')
plt.legend()
plt.tight_layout()
plt.show()
def get_speech_segments(self):
"""
Get the binary sequence showing detected speech segments.
:return: Binary sequence of detected speech segments
"""
binary_sequence = ','.join(map(str, self.smoothed_speech_segments.astype(int)))
return binary_sequence
def process_audio_file(input_file,frame_duration=0.01, threshold=0.1, smoothness=0, remove_dc=False, plot_graphs=False):
"""
Process the audio file and perform Voice Activity Detection.
:param input_file: Input file object
:return: Binary sequence of detected speech segments
"""
vad = VoiceActivityDetector(input_file, frame_duration, threshold, smoothness, remove_dc, plot_graphs)
vad.vad()
vad.smooth_speech_segments()
vad.plot_audio_data()
vad.plot_vad_results()
return vad.get_speech_segments()
​
​
​
​
​
Short Time Fourier Transform
import numpy as np # Import numpy for mathematical computations
from scipy.io import wavfile # Import wavfile module for reading and writing WAV files
import warnings # Import warnings module for managing warnings
warnings.filterwarnings("ignore", category=wavfile.WavFileWarning) # Ignore WAV warnings
# Function to create a Hanning window
# Input: window_length (int) - the length of the window
# Output: A numpy array representing the Hanning window
def CreateHanningWindow(window_length):
return 0.5 * (1 - np.cos(2 * np.pi * np.arange(window_length) / (window_length - 1))) # Create Hanning window of given length
# Function to compute the Fast Fourier Transform (FFT)
# Input: signal (numpy array) - the input signal
# Output: A numpy array representing the FFT of the input signal
def FFT(signal):
N = len(signal) # Length of the signal
if N <= 1: # If the signal length is 1 or less
return signal # Return the signal as it is
even = FFT(signal[0::2]) # Compute FFT for even indexed elements
odd = FFT(signal[1::2]) # Compute FFT for odd indexed elements
T = np.exp(-2j * np.pi * np.arange(N) / N) # Compute FFT factors
return np.concatenate([even + T[:N // 2] * odd, even + T[N // 2:] * odd]) # Combine results
# Function to compute the Inverse Fast Fourier Transform (IFFT)
# Input: spectrum (numpy array) - the input spectrum
# Output: A numpy array representing the IFFT of the input spectrum
def IFFT(spectrum):
N = len(spectrum) # Length of the spectrum
if N <= 1: # If the spectrum length is 1 or less
return spectrum # Return the spectrum as it is
even = IFFT(spectrum[0::2]) # Compute IFFT for even indexed elements
odd = IFFT(spectrum[1::2]) # Compute IFFT for odd indexed elements
T = np.exp(2j * np.pi * np.arange(N) / N) # Compute IFFT factors
return (np.concatenate([even + T[:N // 2] * odd, even + T[N // 2:] * odd]) / 2) # Combine results and divide by 2
# Function to compute the Short-Time Fourier Transform (STFT)
# Input: input_wav (str) - path to the input WAV file
# window_size (int) - the size of the window (default 2048)
# hop_size (int) - the hop size (default 512)
# Output: A tuple containing the STFT matrix and the sample rate
def STFT(input_wav, window_size=2048, hop_size=512):
sample_rate, audio_signal = wavfile.read(input_wav) # Read WAV file and get sample rate and signal
audio_signal = audio_signal.astype(np.float64) # Convert to float64
if np.issubdtype(audio_signal.dtype, np.integer): # Check if the signal is of integer type
audio_signal /= np.iinfo(audio_signal.dtype).max # Normalize by the maximum possible value for the data type
elif np.issubdtype(audio_signal.dtype, np.floating): # Check if the signal is of float type
audio_signal /= np.max(np.abs(audio_signal)) # Normalize by the maximum absolute value in the signal
if len(audio_signal.shape) == 2: # Check if the signal is stereo
audio_signal = np.mean(audio_signal, axis=1) # Convert to mono by averaging the channels
window = CreateHanningWindow(window_size) # Create Hanning window of given length
num_frames = 1 + (len(audio_signal) - window_size) // hop_size # Compute the number of windows
stft_matrix = np.zeros((window_size, num_frames), dtype=np.complex128) # Create matrix to store STFT results
for frame in range(num_frames): # Loop over all windows
start = frame * hop_size # Compute start of the window
end = start + window_size # Compute end of the window
frame_data = audio_signal[start:end] * window # Multiply signal by Hanning window
stft_matrix[:, frame] = FFT(frame_data) # Compute FFT for the window and store result in matrix
return stft_matrix, sample_rate # Return STFT matrix and sample rate
# Function to compute the Inverse Short-Time Fourier Transform (ISTFT)
# Input: stft_matrix (numpy array) - the STFT matrix
# sample_rate (int) - the sample rate
# window_size (int) - the size of the window
# hop_size (int) - the hop size
# output_wav (str) - path to the output WAV file
# Output: None (writes the reconstructed signal to the output WAV file)
def ISTFT(stft_matrix, sample_rate, window_size, hop_size, output_wav):
num_frames = stft_matrix.shape[1] # Compute the number of windows
expected_signal_length = window_size + hop_size * (num_frames - 1) # Compute expected length of the reconstructed signal
reconstructed_signal = np.zeros(expected_signal_length, dtype=np.float64) # Create array to store reconstructed signal
window = CreateHanningWindow(window_size) # Create Hanning window of given length
for frame in range(num_frames): # Loop over all windows
start = frame * hop_size # Compute start of the window
end = start + window_size # Compute end of the window
frame_data = IFFT(stft_matrix[:, frame]) # Compute IFFT for the window
reconstructed_signal[start:end] += np.real(frame_data) * window # Add reconstructed window to the reconstructed signal
normalization = np.zeros_like(reconstructed_signal) # Create array for normalization
for frame in range(num_frames): # Loop over all windows
start = frame * hop_size # Compute start of the window
end = start + window_size # Compute end of the window
normalization[start:end] += window ** 2 # Compute normalization by squaring the window
epsilon = 1e-8 # Small value to avoid division by zero
reconstructed_signal /= (normalization + epsilon) # Normalize the reconstructed signal
reconstructed_signal = np.clip(reconstructed_signal, -1.0, 1.0) # Clip the signal to the range [-1, 1]
reconstructed_signal = (reconstructed_signal * 32767).astype(np.int16) # Convert to int16 range
wavfile.write(output_wav, sample_rate, reconstructed_signal) # Write the reconstructed signal to the output WAV file
​
​
​
​
​
Decimation and Interpolation
import numpy as np
from scipy.io import wavfile
import os
def my_convolution(signal, impulse_response, mode='full'):
'''
signal: input signal
impulse_response: impulse response
mode: 'full' or 'same'
full: returns the convolution of signal and impulse_response
same: returns the central part of the convolution of signal and impulse_response
return: convolution of signal and impulse_response
'''
signal_length = len(signal)
impulse_length = len(impulse_response)
# Padding the signal for multiplication
padded_signal = np.pad(signal, (impulse_length - 1, impulse_length - 1), 'constant')
flip_impulse_response = np.flip(impulse_response)
result_length = signal_length + impulse_length - 1
convolution_result = np.zeros(result_length)
for i in range(result_length):
convolution_result[i] = np.sum(padded_signal[i:i + impulse_length] * flip_impulse_response)
if mode == 'full':
return convolution_result
elif mode == 'same':
start_index = (impulse_length - 1) // 2
return convolution_result[start_index:start_index + signal_length]
return convolution_result
def generate_new_filename_decimation(input_filename):
'''
input_filename: The original input file name
return: New filename with "new" inserted before the file extension
'''
base, ext = os.path.splitext(input_filename)
new_filename = f"{base}_new_deci{ext}"
return new_filename
def generate_new_filename_interpolation(input_filename):
'''
input_filename: The original input file name
return: New filename with "new" inserted before the file extension
'''
base, ext = os.path.splitext(input_filename)
new_filename = f"{base}_new_inter{ext}"
return new_filename
def down_sample(filtered_signal, decimation_factor):
'''
x: input signal
M: decimation factor
return: downsampled signal
'''
return filtered_signal[::decimation_factor]
def decimate(input_filename, decimation_factor):
samplerate, input_signal = wavfile.read(input_filename)
time_indices = np.arange(-20 * decimation_factor, 20 * decimation_factor + 1)
sinc_filter = np.sin(np.pi / decimation_factor * time_indices) / (np.pi * time_indices)
sinc_filter[time_indices == 0] = 1 / decimation_factor
filtered_signal = my_convolution(input_signal, sinc_filter, mode='same')
downsampled_signal = down_sample(filtered_signal, decimation_factor)
new_samplerate = int(samplerate / decimation_factor)
normalized_signal = np.int16(
downsampled_signal / np.max(np.abs(downsampled_signal)) * 32767) # Normalize signal to int16 range
output_filename = generate_new_filename_decimation(input_filename)
wavfile.write(output_filename, new_samplerate, normalized_signal)
return output_filename
def my_interpolation_LPF(L, LPF_type):
'''
L: interpolation factor
n: number of samples
h: impulse response of the interpolation filter
LPF_type: 'shanon', 'ZOH', 'FOH'
return: impulse response of the interpolation filter
'''
if LPF_type == 'shanon':
n = np.arange(-20 * L, 20 * L + 1)
h = L * np.sin(np.pi / L * n) / (np.pi * n)
h[n == 0] = 1
elif LPF_type == 'ZOH':
h = np.ones(L)
elif LPF_type == 'FOH':
impulse_response_step = np.ones(L)
impulse_response_triangle = my_convolution(impulse_response_step, impulse_response_step, mode='full')
h = impulse_response_triangle / np.max(impulse_response_triangle)
else:
raise ValueError("Invalid filter type. Choose 'shanon', 'ZOH', or 'FOH'.")
return h
def up_sample(input_signal, interpolation_factor):
up_sampled_signal = np.zeros(interpolation_factor * len(input_signal))
up_sampled_signal[::interpolation_factor] = input_signal
return up_sampled_signal
def interpolate(input_filename, L, filter_type):
'''
x: input signal
L: interpolation factor
filter_type: 'shanon', 'ZOH', 'FOH'
return:new wav file with upsampled signal include interpolation filter
'''
samplerate, signals_sample = wavfile.read(input_filename)
upsampled_signal = up_sample(signals_sample, L)
interpolation_filter = my_interpolation_LPF(L, filter_type)
filtered_signal = my_convolution(upsampled_signal, interpolation_filter, mode='same')
new_samplerate = int(samplerate * L)
normalized_signal = np.int16(filtered_signal / np.max(np.abs(filtered_signal)) * 32767)
output_filename = generate_new_filename_interpolation(input_filename)
wavfile.write(output_filename, new_samplerate, normalized_signal)
return output_filename
​
​
​
​
​
Speed Down/Up
import struct
import io
def process_wav_data(input_wav_data, speed_factor):
"""
Process the WAV file data: read, change speed, and return modified data.
:param input_wav_data: bytes object containing the entire WAV file
:param speed_factor: float, speed factor to apply (e.g., 0.5 to slow down, 2 to speed up)
:return: bytes object containing the modified WAV file
"""
def read_wav_header(data):
"""Read the WAV file header from the data."""
if data[:4] != b'RIFF' or data[8:12] != b'WAVE':
raise ValueError("Not a valid WAV file")
channels = struct.unpack_from('<H', data, 22)[0]
sample_rate = struct.unpack_from('<I', data, 24)[0]
bits_per_sample = struct.unpack_from('<H', data, 34)[0]
return channels, sample_rate, bits_per_sample
def write_wav_header(f, channels, sample_rate, bits_per_sample, data_size):
"""Write a valid WAV header to the file-like object."""
f.write(b'RIFF')
f.write(struct.pack('<I', data_size + 36)) # File size - 8
f.write(b'WAVE')
f.write(b'fmt ')
f.write(struct.pack('<I', 16)) # Size of fmt chunk
f.write(struct.pack('<H', 1)) # Audio format (1 for PCM)
f.write(struct.pack('<H', channels))
f.write(struct.pack('<I', sample_rate))
f.write(struct.pack('<I', sample_rate * channels * bits_per_sample // 8)) # Byte rate
f.write(struct.pack('<H', channels * bits_per_sample // 8)) # Block align
f.write(struct.pack('<H', bits_per_sample))
f.write(b'data')
f.write(struct.pack('<I', data_size))
def change_speed(data, speed_factor, sample_width):
"""Change the speed of the audio data by the given speed factor."""
num_samples = len(data) // sample_width
new_num_samples = int(num_samples / speed_factor)
new_data = bytearray()
for i in range(new_num_samples):
sample_index = int(i * speed_factor) * sample_width
if sample_index + sample_width <= len(data):
new_data.extend(data[sample_index:sample_index + sample_width])
return bytes(new_data)
try:
# Edge case: Check if speed_factor is valid
if speed_factor <= 0:
raise ValueError("Speed factor must be greater than 0.")
channels, sample_rate, bits_per_sample = read_wav_header(input_wav_data)
sample_width = bits_per_sample // 8
# Process the audio data
audio_data = input_wav_data[44:] # Skip header
processed_data = change_speed(audio_data, speed_factor, sample_width)
# Edge case: Check if processing resulted in empty data
if not processed_data:
raise ValueError("Processing resulted in empty audio data.")
# Create a new in-memory file to store the result
output_buffer = io.BytesIO()
# Write the new header
write_wav_header(output_buffer, channels, sample_rate, bits_per_sample, len(processed_data))
# Write the processed audio data
output_buffer.write(processed_data)
# Get the final result as bytes
result = output_buffer.getvalue()
return result
except Exception as e:
print(f"Error processing WAV data: {e}")
return None
​
​
​
​
​