#---
# trainmodel.py
#---
# original code taken from:
# Garg, Yatharth (2018). Speech-Accent-Recognition [online].
# [cited 14 Nov. 2018].
# Available from World Wide Web:
# <URL: https://github.com/yatharth1908/Speech-Accent-Recognition>.
# modified by Stavros Grigoriadis
#---
from keras.layers.core import Dense, Dropout, Flatten
from keras.layers.convolutional import MaxPooling2D, Conv2D from keras.preprocessing.image import ImageDataGenerator from keras.callbacks import EarlyStopping, TensorBoard import winsound
Converts list of languages into a binary class matrix
:param y (list): list of languages
:return (numpy array): binary class matrix '''
lang_dict = {}
for index,language in enumerate(set(y)):
lang_dict[language] = index
y = list(map(lambda x: lang_dict[x],y))
return utils.to_categorical(y, len(lang_dict))
def get_wav(language_num):
'''
Load wav file from disk and down-samples to RATE :param language_num (list): list of file names :return (numpy array): Down-sampled wav file '''
y, sr = librosa.load('../audio/{}.wav'.format(language_num)) return(librosa.core.resample(y=y,orig_sr=sr,target_sr=RATE, scale=True))
def to_mfcc(wav):
'''
Converts wav file to Mel Frequency Ceptral Coefficients :param wav (numpy array): Wav form
:return (2d numpy array: MFCC '''
return(librosa.feature.mfcc(y=wav, sr=RATE, n_mfcc=N_MFCC))
def remove_silence(wav, thresh=0.04, chunk=5000):
'''
Searches wav form for segments of silence. If wav form values are lower than 'thresh' for 'chunk' samples, the values will be removed :param wav (np array): Wav array to be filtered
:return (np array): Wav array with silence removed '''
tf_list.extend((len(wav) - len(tf_list)) * [False]) return(wav[tf_list])
def normalize_mfcc(mfcc):
return(mms.fit_transform(np.abs(mfcc)))
def make_segments(mfccs,labels):
'''
Makes segments of mfccs and attaches them to the labels :param mfccs: list of mfccs
for mfcc,label in zip(mfccs,labels):
for start in range(0, int(mfcc.shape[1] / COL_SIZE)):
segments.append(mfcc[:, start * COL_SIZE:(start + 1) * COL_SIZE])
seg_labels.append(label) return(segments, seg_labels)
def segment_one(mfcc):
'''
Creates segments from on mfcc image. If last segments is not long enough to be length of columns divided by COL_SIZE
:param mfcc (numpy array): MFCC array
:return (numpy array): Segmented MFCC array '''
segments = []
for start in range(0, int(mfcc.shape[1] / COL_SIZE)):
segments.append(mfcc[:, start * COL_SIZE:(start + 1) * COL_SIZE])
return(np.array(segments))
def create_segmented_mfccs(X_train):
'''
Creates segmented MFCCs from X_train :param X_train: list of MFCCs
:return: segmented mfccs '''
segmented_mfccs = []
for mfcc in X_train:
segmented_mfccs.append(segment_one(mfcc)) return(segmented_mfccs)
def train_model(X_train,y_train,X_validation,y_validation, batch_size=128): #64
'''
Trains 2D convolutional neural network :param X_train: Numpy array of mfccs
:param y_train: Binary matrix based on labels :return: Trained model
'''
# Get row, column, and class sizes rows = X_train[0].shape[0]
cols = X_train[0].shape[1]
val_rows = X_validation[0].shape[0]
val_cols = X_validation[0].shape[1]
num_classes = len(y_train[0])
print('X_Train shape rows:',rows) print('X_train1 shape cols:', cols) print('num_classes:',num_classes)
# input image dimensions to feed into 2D ConvNet Input layer input_shape = (rows, cols, 1)
X_train = X_train.reshape(X_train.shape[0], rows, cols, 1 ) X_validation =
X_validation.reshape(X_validation.shape[0],val_rows,val_cols,1)
print('X_train shape:', X_train.shape) print(X_train.shape[0], 'training samples')
# Initializing the CNN model = Sequential()
# Add 1st Layer Convolution, input_shape = (13,30,1), MFCCs coming in 13x30x1
# input shape matches the data shape coming into the network # Output filter of dimension 32 in the convolution,
# Kernel size: 3x3, # Activation ReLU,
# Data_format = "channels_last" which means that the ordering of the dimensions
# in the inputs have the form of (batch, height, width, channels) model.add(Conv2D(32, kernel_size=(3,3), activation='relu',
data_format="channels_last", input_shape=input_shape))
# Max pooling operation with a pool size of 2x2 is applied # to down scale the spatial dimension
model.add(MaxPooling2D(pool_size=(2, 2)))
# Add 2nd convolutional layer,
# Output filter of dimension 64 in the convolution, # Kernel size: 3x3,
# Activation ReLU,
model.add(Conv2D(64,kernel_size=(3,3), activation='relu'))
#model.add(Conv2D(64, kernel_size=(3, 3), activation='sigmoid'))
# Max pooling operation with a pool size of 2x2 is applied # to down scale the spatial dimension
model.add(MaxPooling2D(pool_size=(2, 2)))
# Dropout operation with a rate of 0.25 to avoid overfitting model.add(Dropout(0.25))
# Flattening work in a single array, 1 dimension model.add(Flatten())
# Fully Connected
# A Regularly densely-connected layer is added with 128 units # Activation function of ReLU
model.add(Dense(128, activation='relu')) #model.add(Dense(128, activation='sigmoid'))
# Dropout operation with a rate of 0.5 to avoid overfitting
model.add(Dense(num_classes, activation='softmax'))
# Compiling the CNN
# optimizer is reverse propagation # readjusting the weights
# loss how to computer the error
model.compile(loss='categorical_crossentropy', optimizer='adadelta',
metrics=['accuracy'])
# Stops training if accuracy does not change at least 0.005 over 10 epochs
es = EarlyStopping(monitor='acc', min_delta=.005, patience=10, verbose=1, mode='auto')
# Creates log file for graphical interpretation using TensorBoard tb = TensorBoard(log_dir='../logs', histogram_freq=0,
batch_size=32, write_graph=True, write_grads=True,
write_images=True, embeddings_freq=0, embed-dings_layer_names=None,
embeddings_metadata=None)
# Image shifting
datagen = ImageDataGenerator(width_shift_range=0.05)
# Fit model using ImageDataGenerator # Training the CNN
model.fit_generator(datagen.flow(X_train, y_train, batch_size=batch_size), :param model_filename: Filename
:return: None '''
model.save('../models/{}.h5'.format(model_filename)) # creates a HDF5 file 'my_model.h5'
if __name__ == '__main__':
'''
Console command example:
python trainmodel.py data_info2L.csv model2l10_9010_relu '''
# Filter metadata to retrieve only files desired filtered_df = getsplit.filter_df(df)
# Train test split
X_train, X_test, y_train, y_test = gets-plit.split_people(filtered_df)
# Get statistics
train_count = Counter(y_train) test_count = Counter(y_test) print("Entering main")
acc_to_beat = test_count.most_common(1)[0][1] / float(np.sum(list(test_count.values())))
# To categorical
y_train = to_categorical(y_train) y_test = to_categorical(y_test)
# Get resampled wav files using multiprocessing if DEBUG:
print('Loading wav files....')
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
start_loading_wavs = time.time() X_train = pool.map(get_wav, X_train) X_test = pool.map(get_wav, X_test) end_loading_wavs = time.time()
print("\nTotal time needed for Loading wav files: ",
datetime.timedelta(seconds=(end_loading_wavs - start_loading_wavs)))
# Convert to MFCC if DEBUG:
print('Converting to MFCC....')
start_converting_mfcc = time.time() X_train = pool.map(to_mfcc, X_train) X_test = pool.map(to_mfcc, X_test)
# Create segments from MFCCs
X_train, y_train = make_segments(X_train, y_train)
X_validation, y_validation = make_segments(X_test, y_test)
# Randomize training segments
X_train, _, y_train, _ = train_test_split(X_train, y_train, test_size=0)
end_converting_mfcc = time.time()
print("\nTotal time needed for Converting to MFCC: ", datetime.timedelta(seconds=(end_converting_mfcc -
start_converting_mfcc)))
start_training_model = time.time()
# Train model
model = train_model(np.array(X_train), np.array(y_train), np.array(X_validation),np.array(y_validation))
# Make predictions on full X_test MFCCs y_predicted =
accu-racy.predict_class_all(create_segmented_mfccs(X_test), model)
end_training_model = time.time()
print("\nTotal time needed for Training Model: ", datetime.timedelta(seconds=(end_training_model - start_training_model)))
# Print statistics
print('Training samples:', train_count) print('Testing samples:', test_count) print('Accuracy to beat:', acc_to_beat)
print('Confusion matrix of total samples:\n',
np.sum(accuracy.confusion_matrix(y_predicted, y_test),axis=1))
print('Confusion matrix:\n',accuracy.confusion_matrix(y_predicted, y_test))
print('Accuracy:', accuracy.get_accuracy(y_predicted,y_test))
# Save model
save_model(model, model_filename) end = time.time()
print("\nTotal time needed: ", datetime.timedelta(seconds=(end - start)))
winsound.PlaySound("Success", winsound.SND_FILENAME)
#---
# getsplit.py
#---
# original code taken from:
# Garg, Yatharth (2018). Speech-Accent-Recognition [online].
# [cited 14 Nov. 2018].
# Available from World Wide Web:
# <URL: https://github.com/yatharth1908/Speech-Accent-Recognition>.
Function to filter audio files based on df columns df column options:
[age,age_of_english_onset,age_sex,birth_place,english_learning_method,
eng-lish_residence,length_of_english_residence,native_language,other_langu ages,sex]
:param df (DataFrame): Full unfiltered DataFrame :return (DataFrame): Filtered DataFrame
'''
chinese = df[df.native_language == 'chinese']
spanish = df[df.native_language == 'spanish']
english = df[df.native_language == 'english']
arabic = df[df.native_language == 'arabic']
#chinese = chinese[chinese.length_of_english_residence < 10]
#spanish = spanish[spanish.length_of_english_residence < 10]
#arabic = arabic[arabic.length_of_english_residence < 10]
df = df.append(chinese)
:param df (DataFrame): Pandas DataFrame of audio files to be split :param test_size (float): Percentage of total files to be split into test
:return X_train, X_test, y_train, y_test (tuple): Xs are list of df['language_num'] and Ys are df['native_language']
test_size = 10% train_size = 90%
'''
return
train_test_split(df['language_num'],df['native_language'],test_size=te st_size,random_state=1234)
if __name__ == '__main__':
'''
Console command example:
python bio_data.csv '''
csv_file = sys.argv[1]
df = pd.read_csv(csv_file) filtered_df = filter_df(df) print(split_people(filtered_df))
#---
# accuracy.py
#---
# original code taken from:
# Garg, Yatharth (2018). Speech-Accent-Recognition [online].
# [cited 14 Nov. 2018].
# Available from World Wide Web:
# <URL: https://github.com/yatharth1908/Speech-Accent-Recognition>.
:return: Predicted class of MFCC segment group '''
MFCCs =
MFCCs.reshape(MFCCs.shape[0],MFCCs.shape[1],MFCCs.shape[2],1) y_predicted = model.predict_classes(MFCCs,verbose=0) return(Counter(list(y_predicted)).most_common(1)[0][0])
def predict_prob_class_audio(MFCCs, model):
'''
Predict class based on MFCC samples' probabilities :param MFCCs: Numpy array of MFCCs
:param model: Trained model
:return: Predicted class of MFCC segment group '''
MFCCs =
MFCCs.reshape(MFCCs.shape[0],MFCCs.shape[1],MFCCs.shape[2],1) y_predicted = model.predict_proba(MFCCs,verbose=0)
return(np.argmax(np.sum(y_predicted,axis=0)))
def predict_class_all(X_train, model):
predictions.append(predict_class_audio(mfcc, model))
#predictions.append(predict_prob_class_audio(mfcc, model)) return predictions
def confusion_matrix(y_predicted,y_test):
'''
Create confusion matrix
:param y_predicted: list of predictions
:param y_test: numpy array of shape (len(y_test), number of classes). 1.'s at index of actual, otherwise 0.
:return: numpy array. confusion matrix '''
confusion_matrix =
np.zeros((len(y_test[0]),len(y_test[0])),dtype=int ) for index, predicted in enumerate(y_predicted):
confusion_matrix[np.argmax(y_test[index])][predicted] += 1 return(confusion_matrix)
def get_accuracy(y_predicted,y_test):
'''
Get accuracy
:param y_predicted: numpy array of predictions :param y_test: numpy array of actual
:return: accuracy '''
c_matrix = confusion_matrix(y_predicted,y_test)
return( np.sum(c_matrix.diagonal()) / float(np.sum(c_matrix))) if __name__ == '__main__':
pass
#---
# predict.py
#---
# original code taken from:
# Garg, Yatharth (2018). Speech-Accent-Recognition [online].
# [cited 14 Nov. 2018].
# Available from World Wide Web:
# <URL: https://github.com/yatharth1908/Speech-Accent-Recognition>.
# modified by Stavros Grigoriadis
#--- :param language_num (list): list of file names :return (numpy array): Down-sampled wav file '''
y, sr =
li-brosa.load('../Prediction_File/{}.wav'.format(language_num)) return(librosa.core.resample(y=y,orig_sr=sr,target_sr=RATE, scale=True))
def create_segmented_mfccs(X_train):
'''
Creates segmented MFCCs from X_train :param X_train: list of MFCCs
:return: segmented mfccs '''
segmented_mfccs = []
for mfcc in X_train:
segmented_mfccs.append(segment_one(mfcc)) return(segmented_mfccs)
def to_mfcc(wav):
'''
Converts wav file to Mel Frequency Ceptral Coefficients
:param wav (numpy array): Wav form :return (2d numpy array: MFCC '''
return(librosa.feature.mfcc(y=wav, sr=RATE, n_mfcc=N_MFCC))
def segment_one(mfcc):
'''
Creates segments from on mfcc image. If last segments is not long enough to be length of columns divided by COL_SIZE
:param mfcc (numpy array): MFCC array
:return (numpy array): Segmented MFCC array '''
segments = []
for start in range(0, int(mfcc.shape[1] / COL_SIZE)):
segments.append(mfcc[:, start * COL_SIZE:(start + 1) * COL_SIZE])
return(np.array(segments))
def create_segmented_mfccs(X_train):
'''
Creates segmented MFCCs from X_train :param X_train: list of MFCCs
:return: segmented mfccs '''
segmented_mfccs = []
for mfcc in X_train:
segmented_mfccs.append(segment_one(mfcc)) return(segmented_mfccs)
# Load Model
# Filter metadata to retrieve only files desired filtered_df = getsplit.filter_df(df)
# Train test split
X_predict, X_test, y_train, y_test = gets-plit.split_people(filtered_df)
X_predict = map(get_pred_wav, X_predict)
X_predict = map(to_mfcc, X_predict) y_predicted =
accu-racy.predict_class_all(create_segmented_mfccs(X_predict), new_model)
if y_predicted == [0]:
print ("Chinese Accent Found") if y_predicted == [1]:
print ("Spanish Accent Found") if y_predicted == [2]:
print ("English Accent Found") if y_predicted == [3]:
print ("Arabic Accent Found")