# Copyright 2017 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""
Model definitions for simple speech recognition.
Brainchip information:
- Original file:
https://github.com/tensorflow/tensorflow/tree/master/tensorflow/examples/speech_commands
"""
__all__ = ["AudioProcessor"]
import hashlib
import math
import os.path
import random
import re
import sys
import tarfile
import glob
import urllib
import numpy as np
import tensorflow as tf
from tensorflow.python.ops import gen_audio_ops as audio_ops
MAX_NUM_WAVS_PER_CLASS = 2**27 - 1 # ~134M
SILENCE_LABEL = '_silence_'
SILENCE_INDEX = 0
UNKNOWN_WORD_LABEL = '_unknown_'
UNKNOWN_WORD_INDEX = 1
BACKGROUND_NOISE_DIR_NAME = '_background_noise_'
RANDOM_SEED = 59185
def _prepare_model_settings(sample_rate, clip_duration_ms, window_size_ms,
window_stride_ms, feature_bin_count):
"""Calculates common settings needed for all models.
Args:
sample_rate: Number of audio samples per second.
clip_duration_ms: Length of each audio clip to be analyzed.
window_size_ms: Duration of frequency analysis window.
window_stride_ms: How far to move in time between frequency windows.
feature_bin_count: Number of frequency bins to use for analysis.
Returns:
Dictionary containing common settings.
Raises:
ValueError: If the preprocessing mode isn't recognized.
"""
desired_samples = int(sample_rate * clip_duration_ms / 1000)
window_size_samples = int(sample_rate * window_size_ms / 1000)
window_stride_samples = int(sample_rate * window_stride_ms / 1000)
length_minus_window = (desired_samples - window_size_samples)
if length_minus_window < 0:
spectrogram_length = 0
else:
spectrogram_length = 1 + int(
length_minus_window / window_stride_samples)
fingerprint_width = feature_bin_count
fingerprint_size = fingerprint_width * spectrogram_length
return {
'desired_samples': desired_samples,
'window_size_samples': window_size_samples,
'window_stride_samples': window_stride_samples,
'spectrogram_length': spectrogram_length,
'fingerprint_width': fingerprint_width,
'fingerprint_size': fingerprint_size,
}
def _prepare_words_list(wanted_words):
"""Prepends common tokens to the custom word list.
Args:
wanted_words: List of strings containing the custom words.
Returns:
List with the standard silence and unknown tokens added.
"""
return [SILENCE_LABEL, UNKNOWN_WORD_LABEL] + wanted_words
def _which_set(filename, validation_percentage, testing_percentage):
"""Determines which data partition the file should belong to.
We want to keep files in the same training, validation, or testing sets even
if new ones are added over time. This makes it less likely that testing
samples will accidentally be reused in training when long runs are restarted
for example. To keep this stability, a hash of the filename is taken and used
to determine which set it should belong to. This determination only depends on
the name and the set proportions, so it won't change as other files are added.
It's also useful to associate particular files as related (for example words
spoken by the same person), so anything after '_nohash_' in a filename is
ignored for set determination. This ensures that 'bobby_nohash_0.wav' and
'bobby_nohash_1.wav' are always in the same set, for example.
Args:
filename: File path of the data sample.
validation_percentage: How much of the data set to use for validation.
testing_percentage: How much of the data set to use for testing.
Returns:
String, one of 'training', 'validation', or 'testing'.
"""
base_name = os.path.basename(filename)
# We want to ignore anything after '_nohash_' in the file name when
# deciding which set to put a wav in, so the data set creator has a way of
# grouping wavs that are close variations of each other.
hash_name = re.sub(r'_nohash_.*$', '', base_name)
# This looks a bit magical, but we need to decide whether this file should
# go into the training, testing, or validation sets, and we want to keep
# existing files in the same set even if more files are subsequently
# added.
# To do that, we need a stable way of deciding based on just the file name
# itself, so we do a hash of that and then use that to generate a
# probability value that we use to assign it.
hash_name_hashed = hashlib.sha1(tf.compat.as_bytes(hash_name)).hexdigest()
percentage_hash = ((int(hash_name_hashed, 16) %
(MAX_NUM_WAVS_PER_CLASS + 1)) *
(100.0 / MAX_NUM_WAVS_PER_CLASS))
if percentage_hash < validation_percentage:
result = 'validation'
elif percentage_hash < (testing_percentage + validation_percentage):
result = 'testing'
else:
result = 'training'
return result
[docs]
class AudioProcessor():
"""Handles loading, partitioning, and preparing audio training data."""
def __init__(self,
sample_rate,
clip_duration_ms,
window_size_ms,
window_stride_ms,
feature_bin_count,
data_url=None,
data_dir=None,
silence_percentage=0,
unknown_percentage=0,
wanted_words=None,
validation_percentage=0,
testing_percentage=0):
self.data_dir = None
self.background_data = None
self.model_settings = _prepare_model_settings(sample_rate,
clip_duration_ms,
window_size_ms,
window_stride_ms,
feature_bin_count)
if data_dir:
self.data_dir = data_dir
self.maybe_download_and_extract_dataset(data_url, data_dir)
self.prepare_data_index(silence_percentage, unknown_percentage,
wanted_words, validation_percentage,
testing_percentage)
self.prepare_background_data()
self.prepare_processing_graph()
[docs]
@staticmethod
def maybe_download_and_extract_dataset(data_url, dest_directory):
"""Download and extract data set tar file.
If the data set we're using doesn't already exist, this function
downloads it from the TensorFlow.org website and unpacks it into a
directory.
If the data_url is none, don't download anything and expect the data
directory to contain the correct files already.
Args:
data_url: Web location of the tar file containing the data set.
dest_directory: File path to extract data to.
"""
if not data_url:
return
if not os.path.exists(dest_directory):
os.makedirs(dest_directory)
filename = data_url.split('/')[-1]
filepath = os.path.join(dest_directory, filename)
if not os.path.exists(filepath):
def _progress(count, block_size, total_size):
sys.stdout.write('\r>> Downloading %s %.1f%%' %
(filename, float(count * block_size) /
float(total_size) * 100.0))
sys.stdout.flush()
try:
filepath, _ = urllib.request.urlretrieve(
data_url, filepath, _progress)
except Exception as e:
raise RuntimeError(
f"Failed to download URL: {data_url} to folder: "
f"{filepath}\nPlease make sure you have enough free"
f" space and an internet connection") from e
print()
statinfo = os.stat(filepath)
print(
f"Successfully downloaded {filename} ({statinfo.st_size} bytes)"
)
with tarfile.open(filepath, 'r:gz') as tar:
tar.extractall(dest_directory)
[docs]
def prepare_data_index(self, silence_percentage, unknown_percentage,
wanted_words, validation_percentage,
testing_percentage):
"""Prepares a list of the samples organized by set and label.
The training loop needs a list of all the available data, organized by
which partition it should belong to, and with ground truth labels attached.
This function analyzes the folders below the `data_dir`, figures out the
right
labels for each file based on the name of the subdirectory it belongs to,
and uses a stable hash to assign it to a data set partition.
Args:
silence_percentage: How much of the resulting data should be background.
unknown_percentage: How much should be audio outside the wanted classes.
wanted_words: Labels of the classes we want to be able to recognize.
validation_percentage: How much of the data set to use for validation.
testing_percentage: How much of the data set to use for testing.
Returns:
Dictionary containing a list of file information for each set partition,
and a lookup map for each class to determine its numeric index.
Raises:
Exception: If expected files are not found.
"""
# Make sure the shuffling and picking of unknowns is deterministic.
random.seed(RANDOM_SEED)
wanted_words_index = {}
for index, wanted_word in enumerate(wanted_words):
wanted_words_index[wanted_word] = index + 2
self.data_index = {'validation': [], 'testing': [], 'training': []}
unknown_index = {'validation': [], 'testing': [], 'training': []}
all_words = {}
# Look through all the subfolders to find audio samples
search_path = os.path.join(self.data_dir, '*', '*.wav')
for wav_path in glob.glob(search_path):
_, word = os.path.split(os.path.dirname(wav_path))
word = word.lower()
# Treat the '_background_noise_' folder as a special case, since we
# expect it to contain long audio samples we mix in to improve
# training.
if word == BACKGROUND_NOISE_DIR_NAME:
continue
all_words[word] = True
set_index = _which_set(wav_path, validation_percentage,
testing_percentage)
# If it's a known class, store its detail, otherwise add it to the
# list we'll use to train the unknown label.
if word in wanted_words_index:
self.data_index[set_index].append({
'label': word,
'file': wav_path
})
else:
unknown_index[set_index].append({
'label': word,
'file': wav_path
})
if not all_words:
raise Exception('No .wavs found at ' + search_path)
for index, wanted_word in enumerate(wanted_words):
if wanted_word not in all_words:
raise Exception('Expected to find ' + wanted_word +
' in labels but only found ' +
', '.join(all_words.keys()))
# We need an arbitrary file to load as the input for the silence
# samples. It's multiplied by zero later, so the content doesn't matter.
silence_wav_path = self.data_index['training'][0]['file']
for set_index in ['validation', 'testing', 'training']:
set_size = len(self.data_index[set_index])
silence_size = int(math.ceil(set_size * silence_percentage / 100))
for _ in range(silence_size):
self.data_index[set_index].append({
'label': SILENCE_LABEL,
'file': silence_wav_path
})
# Pick some unknowns to add to each partition of the data set.
random.shuffle(unknown_index[set_index])
unknown_size = int(math.ceil(set_size * unknown_percentage / 100))
self.data_index[set_index].extend(
unknown_index[set_index][:unknown_size])
# Make sure the ordering is random.
for set_index in ['validation', 'testing', 'training']:
random.shuffle(self.data_index[set_index])
# Prepare the rest of the result data structure.
self.words_list = _prepare_words_list(wanted_words)
self.word_to_index = {}
for word in all_words:
if word in wanted_words_index:
self.word_to_index[word] = wanted_words_index[word]
else:
self.word_to_index[word] = UNKNOWN_WORD_INDEX
self.word_to_index[SILENCE_LABEL] = SILENCE_INDEX
[docs]
def prepare_background_data(self):
"""Searches a folder for background noise audio, and loads it into
memory.
It's expected that the background audio samples will be in a subdirectory
named '_background_noise_' inside the 'data_dir' folder, as .wavs that match
the sample rate of the training data, but can be much longer in duration.
If the '_background_noise_' folder doesn't exist at all, this isn't an
error, it's just taken to mean that no background noise augmentation should
be used. If the folder does exist, but it's empty, that's treated as an
error.
Returns:
List of raw PCM-encoded audio samples of background noise.
Raises:
Exception: If files aren't found in the folder.
"""
self.background_data = []
background_dir = os.path.join(self.data_dir, BACKGROUND_NOISE_DIR_NAME)
if not os.path.exists(background_dir):
return self.background_data
search_path = os.path.join(self.data_dir, BACKGROUND_NOISE_DIR_NAME,
'*.wav')
for wav_path in glob.glob(search_path):
wav_loader = tf.io.read_file(wav_path)
wav_decoder = tf.audio.decode_wav(wav_loader, desired_channels=1)
wav_data = wav_decoder.audio.numpy().flatten()
self.background_data.append(wav_data)
if not self.background_data:
raise Exception('No background wav files were found in ' +
search_path)
return []
[docs]
def prepare_processing_graph(self):
"""Builds a TensorFlow graph to apply the input distortions.
Creates a graph that loads a WAVE file, decodes it, scales the volume,
shifts it in time, adds in background noise, calculates a spectrogram, and
then builds an MFCC fingerprint from that.
"""
@tf.function
def processing_graph(wav_filename, foreground_volume,
time_shift_padding, time_shift_offset,
background_data, background_volume,
model_settings):
desired_samples = model_settings['desired_samples']
wav_loader = tf.io.read_file(wav_filename)
wav_decoder = tf.audio.decode_wav(wav_loader,
desired_channels=1,
desired_samples=desired_samples)
# Allow the audio sample's volume to be adjusted.
scaled_foreground = tf.multiply(wav_decoder.audio,
foreground_volume)
# Shift the sample's start position, and pad any gaps with zeros.
padded_foreground = tf.pad(scaled_foreground, time_shift_padding)
sliced_foreground = tf.slice(padded_foreground, time_shift_offset,
[desired_samples, -1])
# Mix in background noise.
background_mul = tf.multiply(background_data, background_volume)
background_add = tf.add(background_mul, sliced_foreground)
background_clamp = tf.clip_by_value(background_add, -1.0, 1.0)
# Run the spectrogram and MFCC ops to get a 2D 'fingerprint' of the
# audio.
spectrogram = audio_ops.audio_spectrogram(
background_clamp,
window_size=model_settings['window_size_samples'],
stride=model_settings['window_stride_samples'],
magnitude_squared=True)
# The number of buckets in each FFT row in the spectrogram will
# depend on how many input samples there are in each window. This
# can be quite large, with a 160 sample window producing 127 buckets
# for example. We don't need this level of detail for
# classification, so we often want to shrink them down to produce a
# smaller result. That's what this section implements. One method is
# to use average pooling to merge adjacent buckets, but a more
# sophisticated approach is to apply the MFCC algorithm to shrink
# the representation.
output = audio_ops.mfcc(
spectrogram,
wav_decoder.sample_rate,
dct_coefficient_count=model_settings['fingerprint_width'])
return output
self.processing_graph = processing_graph
[docs]
def get_data(self, how_many, offset, background_frequency,
background_volume_range, time_shift, mode):
"""Gather samples from the data set, applying transformations as needed.
When the mode is 'training', a random selection of samples will be returned,
otherwise the first N clips in the partition will be used. This ensures that
validation always uses the same samples, reducing noise in the metrics.
Args:
how_many: Desired number of samples to return. -1 means the entire
contents of this partition.
offset: Where to start when fetching deterministically.
background_frequency: How many clips will have background noise, 0.0 to
1.0.
background_volume_range: How loud the background noise will be.
time_shift: How much to randomly shift the clips by in time.
mode: Which partition to use, must be 'training', 'validation', or
'testing'.
Returns:
List of sample data for the transformed samples, and list of label indexes
Raises:
ValueError: If background samples are too short.
"""
# Raise an error if data are not present, i.e. data directory is None
if self.data_dir is None:
raise RuntimeError(
"Calling 'get_data' method requires to have a valid "
"data directory. Here, data_dir=None")
# Pick one of the partitions to choose samples from.
candidates = self.data_index[mode]
if how_many == -1:
sample_count = len(candidates)
else:
sample_count = max(0, min(how_many, len(candidates) - offset))
# Data and labels will be populated and returned.
data = np.zeros((sample_count, self.model_settings['fingerprint_size']))
labels = np.zeros(sample_count)
desired_samples = self.model_settings['desired_samples']
use_background = self.background_data and (mode == 'training')
pick_deterministically = (mode != 'training')
# Use the processing graph we created earlier to repeatedly to generate
# the final output sample data we'll use in training.
for i in range(offset, offset + sample_count):
# Pick which audio sample to use.
if how_many == -1 or pick_deterministically:
sample_index = i
else:
sample_index = np.random.randint(len(candidates))
sample = candidates[sample_index]
# If we're time shifting, set up the offset for this sample.
if time_shift > 0:
time_shift_amount = np.random.randint(-time_shift, time_shift)
else:
time_shift_amount = 0
if time_shift_amount > 0:
time_shift_padding = [[time_shift_amount, 0], [0, 0]]
time_shift_offset = [0, 0]
else:
time_shift_padding = [[0, -time_shift_amount], [0, 0]]
time_shift_offset = [-time_shift_amount, 0]
input_dict = {
'wav_filename': tf.constant(sample['file']),
'time_shift_padding': tf.constant(time_shift_padding),
'time_shift_offset': tf.constant(time_shift_offset)
}
# Choose a section of background noise to mix in.
if use_background or sample['label'] == SILENCE_LABEL:
background_index = np.random.randint(len(self.background_data))
background_samples = self.background_data[background_index]
if len(background_samples
) <= self.model_settings['desired_samples']:
raise ValueError(
'Background sample is too short! Need more than %d'
' samples but only %d were found' %
(self.model_settings['desired_samples'],
len(background_samples)))
background_offset = np.random.randint(
0,
len(background_samples) -
self.model_settings['desired_samples'])
background_clipped = background_samples[background_offset:(
background_offset + desired_samples)]
background_reshaped = background_clipped.reshape(
[desired_samples, 1])
if sample['label'] == SILENCE_LABEL:
background_volume = np.random.uniform(0, 1)
elif np.random.uniform(0, 1) < background_frequency:
background_volume = np.random.uniform(
0, background_volume_range)
else:
background_volume = 0
else:
background_reshaped = np.zeros([desired_samples, 1])
background_volume = 0
input_dict['background_data'] = tf.constant(background_reshaped,
dtype=tf.float32)
input_dict['background_volume'] = tf.constant(background_volume,
dtype=tf.float32)
# If we want silence, mute out the main sample but leave the
# background.
if sample['label'] == SILENCE_LABEL:
input_dict['foreground_volume'] = tf.constant(0.0)
else:
input_dict['foreground_volume'] = tf.constant(1.0)
# Run the graph to produce the output audio.
data_tensor = self.processing_graph(
**input_dict, model_settings=self.model_settings)
data[i - offset, :] = data_tensor.numpy().flatten()
label_index = self.word_to_index[sample['label']]
labels[i - offset] = label_index
return data, labels
[docs]
def get_features_for_wav(self, wav_filename):
"""Applies the feature transformation process to the input_wav.
Runs the feature generation process (generally producing a spectrogram from
the input samples) on the WAV file. This can be useful for testing and
verifying implementations being run on other platforms.
Args:
wav_filename: The path to the input audio file.
Returns:
Numpy data array containing the generated features.
"""
desired_samples = self.model_settings['desired_samples']
background_data = tf.constant(np.zeros([desired_samples, 1]),
tf.float32)
input_dict = {
'wav_filename': tf.constant(wav_filename),
'time_shift_padding': tf.constant([[0, 0], [0, 0]]),
'time_shift_offset': tf.constant([0, 0]),
'background_data': background_data,
'background_volume': tf.constant(0.0),
'foreground_volume': tf.constant(1.0)
}
# Run the graph to produce the output audio.
data_tensor = self.processing_graph(**input_dict,
model_settings=self.model_settings)
features = data_tensor.numpy()
features = features.reshape(
(self.model_settings['spectrogram_length'],
self.model_settings['fingerprint_width'], 1))
return features
[docs]
def get_augmented_data_for_wav(self,
wav_filename,
background_frequency,
background_volume_range,
time_shift,
num_augmented_samples=1):
"""Applies the feature transformation process to a wav audio file,
adding data augmentation (background noise and time shifting).
Args:
wav_filename (str): The path to the input audio file.
background_frequency: How many clips will have background noise, 0.0 to
1.0.
background_volume_range: How loud the background noise will be.
time_shift: How much to randomly shift the clips by in time.
num_augmented_samples: How many samples will be generated using data
augmentation.
Returns:
Numpy data array containing the generated features for every augmented
sample.
"""
data = np.zeros(
(num_augmented_samples, self.model_settings['fingerprint_size']))
desired_samples = self.model_settings['desired_samples']
for i in range(num_augmented_samples):
# If we're time shifting, set up the offset for this sample.
if time_shift > 0:
time_shift_amount = np.random.randint(-time_shift, time_shift)
else:
time_shift_amount = 0
if time_shift_amount > 0:
time_shift_padding = [[time_shift_amount, 0], [0, 0]]
time_shift_offset = [0, 0]
else:
time_shift_padding = [[0, -time_shift_amount], [0, 0]]
time_shift_offset = [-time_shift_amount, 0]
input_dict = {
'wav_filename': tf.constant(wav_filename),
'foreground_volume': tf.constant(1.0),
'time_shift_padding': tf.constant(time_shift_padding),
'time_shift_offset': tf.constant(time_shift_offset)
}
# Choose a section of background noise to mix in.
if self.background_data:
background_index = np.random.randint(len(self.background_data))
background_samples = self.background_data[background_index]
if len(background_samples
) <= self.model_settings['desired_samples']:
raise ValueError(
'Background sample is too short! Need more than %d'
' samples but only %d were found' %
(self.model_settings['desired_samples'],
len(background_samples)))
background_offset = np.random.randint(
0,
len(background_samples) -
self.model_settings['desired_samples'])
background_clipped = background_samples[background_offset:(
background_offset + desired_samples)]
background_reshaped = background_clipped.reshape(
[desired_samples, 1])
if np.random.uniform(0, 1) < background_frequency:
background_volume = np.random.uniform(
0, background_volume_range)
else:
background_volume = 0
else:
background_reshaped = np.zeros([desired_samples, 1])
background_volume = 0
input_dict['background_data'] = tf.constant(background_reshaped,
dtype=tf.float32)
input_dict['background_volume'] = tf.constant(background_volume,
dtype=tf.float32)
data_tensor = self.processing_graph(
**input_dict, model_settings=self.model_settings)
data[i, :] = data_tensor.numpy().flatten()
return data