• Ei tuloksia

#---

# trainmodel.py

#---

# original code taken from:

# Garg, Yatharth (2018). Speech-Accent-Recognition [online].

# [cited 14 Nov. 2018].

# Available from World Wide Web:

# <URL: https://github.com/yatharth1908/Speech-Accent-Recognition>.

# modified by Stavros Grigoriadis

#---

from keras.layers.core import Dense, Dropout, Flatten

from keras.layers.convolutional import MaxPooling2D, Conv2D from keras.preprocessing.image import ImageDataGenerator from keras.callbacks import EarlyStopping, TensorBoard import winsound

Converts list of languages into a binary class matrix

:param y (list): list of languages

:return (numpy array): binary class matrix '''

lang_dict = {}

for index,language in enumerate(set(y)):

lang_dict[language] = index

y = list(map(lambda x: lang_dict[x],y))

return utils.to_categorical(y, len(lang_dict))

def get_wav(language_num):

'''

Load wav file from disk and down-samples to RATE :param language_num (list): list of file names :return (numpy array): Down-sampled wav file '''

y, sr = librosa.load('../audio/{}.wav'.format(language_num)) return(librosa.core.resample(y=y,orig_sr=sr,target_sr=RATE, scale=True))

def to_mfcc(wav):

'''

Converts wav file to Mel Frequency Ceptral Coefficients :param wav (numpy array): Wav form

:return (2d numpy array: MFCC '''

return(librosa.feature.mfcc(y=wav, sr=RATE, n_mfcc=N_MFCC))

def remove_silence(wav, thresh=0.04, chunk=5000):

'''

Searches wav form for segments of silence. If wav form values are lower than 'thresh' for 'chunk' samples, the values will be removed :param wav (np array): Wav array to be filtered

:return (np array): Wav array with silence removed '''

tf_list.extend((len(wav) - len(tf_list)) * [False]) return(wav[tf_list])

def normalize_mfcc(mfcc):

return(mms.fit_transform(np.abs(mfcc)))

def make_segments(mfccs,labels):

'''

Makes segments of mfccs and attaches them to the labels :param mfccs: list of mfccs

for mfcc,label in zip(mfccs,labels):

for start in range(0, int(mfcc.shape[1] / COL_SIZE)):

segments.append(mfcc[:, start * COL_SIZE:(start + 1) * COL_SIZE])

seg_labels.append(label) return(segments, seg_labels)

def segment_one(mfcc):

'''

Creates segments from on mfcc image. If last segments is not long enough to be length of columns divided by COL_SIZE

:param mfcc (numpy array): MFCC array

:return (numpy array): Segmented MFCC array '''

segments = []

for start in range(0, int(mfcc.shape[1] / COL_SIZE)):

segments.append(mfcc[:, start * COL_SIZE:(start + 1) * COL_SIZE])

return(np.array(segments))

def create_segmented_mfccs(X_train):

'''

Creates segmented MFCCs from X_train :param X_train: list of MFCCs

:return: segmented mfccs '''

segmented_mfccs = []

for mfcc in X_train:

segmented_mfccs.append(segment_one(mfcc)) return(segmented_mfccs)

def train_model(X_train,y_train,X_validation,y_validation, batch_size=128): #64

'''

Trains 2D convolutional neural network :param X_train: Numpy array of mfccs

:param y_train: Binary matrix based on labels :return: Trained model

'''

# Get row, column, and class sizes rows = X_train[0].shape[0]

cols = X_train[0].shape[1]

val_rows = X_validation[0].shape[0]

val_cols = X_validation[0].shape[1]

num_classes = len(y_train[0])

print('X_Train shape rows:',rows) print('X_train1 shape cols:', cols) print('num_classes:',num_classes)

# input image dimensions to feed into 2D ConvNet Input layer input_shape = (rows, cols, 1)

X_train = X_train.reshape(X_train.shape[0], rows, cols, 1 ) X_validation =

X_validation.reshape(X_validation.shape[0],val_rows,val_cols,1)

print('X_train shape:', X_train.shape) print(X_train.shape[0], 'training samples')

# Initializing the CNN model = Sequential()

# Add 1st Layer Convolution, input_shape = (13,30,1), MFCCs coming in 13x30x1

# input shape matches the data shape coming into the network # Output filter of dimension 32 in the convolution,

# Kernel size: 3x3, # Activation ReLU,

# Data_format = "channels_last" which means that the ordering of the dimensions

# in the inputs have the form of (batch, height, width, channels) model.add(Conv2D(32, kernel_size=(3,3), activation='relu',

data_format="channels_last", input_shape=input_shape))

# Max pooling operation with a pool size of 2x2 is applied # to down scale the spatial dimension

model.add(MaxPooling2D(pool_size=(2, 2)))

# Add 2nd convolutional layer,

# Output filter of dimension 64 in the convolution, # Kernel size: 3x3,

# Activation ReLU,

model.add(Conv2D(64,kernel_size=(3,3), activation='relu'))

#model.add(Conv2D(64, kernel_size=(3, 3), activation='sigmoid'))

# Max pooling operation with a pool size of 2x2 is applied # to down scale the spatial dimension

model.add(MaxPooling2D(pool_size=(2, 2)))

# Dropout operation with a rate of 0.25 to avoid overfitting model.add(Dropout(0.25))

# Flattening work in a single array, 1 dimension model.add(Flatten())

# Fully Connected

# A Regularly densely-connected layer is added with 128 units # Activation function of ReLU

model.add(Dense(128, activation='relu')) #model.add(Dense(128, activation='sigmoid'))

# Dropout operation with a rate of 0.5 to avoid overfitting

model.add(Dense(num_classes, activation='softmax'))

# Compiling the CNN

# optimizer is reverse propagation # readjusting the weights

# loss how to computer the error

model.compile(loss='categorical_crossentropy', optimizer='adadelta',

metrics=['accuracy'])

# Stops training if accuracy does not change at least 0.005 over 10 epochs

es = EarlyStopping(monitor='acc', min_delta=.005, patience=10, verbose=1, mode='auto')

# Creates log file for graphical interpretation using TensorBoard tb = TensorBoard(log_dir='../logs', histogram_freq=0,

batch_size=32, write_graph=True, write_grads=True,

write_images=True, embeddings_freq=0, embed-dings_layer_names=None,

embeddings_metadata=None)

# Image shifting

datagen = ImageDataGenerator(width_shift_range=0.05)

# Fit model using ImageDataGenerator # Training the CNN

model.fit_generator(datagen.flow(X_train, y_train, batch_size=batch_size), :param model_filename: Filename

:return: None '''

model.save('../models/{}.h5'.format(model_filename)) # creates a HDF5 file 'my_model.h5'

if __name__ == '__main__':

'''

Console command example:

python trainmodel.py data_info2L.csv model2l10_9010_relu '''

# Filter metadata to retrieve only files desired filtered_df = getsplit.filter_df(df)

# Train test split

X_train, X_test, y_train, y_test = gets-plit.split_people(filtered_df)

# Get statistics

train_count = Counter(y_train) test_count = Counter(y_test) print("Entering main")

acc_to_beat = test_count.most_common(1)[0][1] / float(np.sum(list(test_count.values())))

# To categorical

y_train = to_categorical(y_train) y_test = to_categorical(y_test)

# Get resampled wav files using multiprocessing if DEBUG:

print('Loading wav files....')

pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())

start_loading_wavs = time.time() X_train = pool.map(get_wav, X_train) X_test = pool.map(get_wav, X_test) end_loading_wavs = time.time()

print("\nTotal time needed for Loading wav files: ",

datetime.timedelta(seconds=(end_loading_wavs - start_loading_wavs)))

# Convert to MFCC if DEBUG:

print('Converting to MFCC....')

start_converting_mfcc = time.time() X_train = pool.map(to_mfcc, X_train) X_test = pool.map(to_mfcc, X_test)

# Create segments from MFCCs

X_train, y_train = make_segments(X_train, y_train)

X_validation, y_validation = make_segments(X_test, y_test)

# Randomize training segments

X_train, _, y_train, _ = train_test_split(X_train, y_train, test_size=0)

end_converting_mfcc = time.time()

print("\nTotal time needed for Converting to MFCC: ", datetime.timedelta(seconds=(end_converting_mfcc -

start_converting_mfcc)))

start_training_model = time.time()

# Train model

model = train_model(np.array(X_train), np.array(y_train), np.array(X_validation),np.array(y_validation))

# Make predictions on full X_test MFCCs y_predicted =

accu-racy.predict_class_all(create_segmented_mfccs(X_test), model)

end_training_model = time.time()

print("\nTotal time needed for Training Model: ", datetime.timedelta(seconds=(end_training_model - start_training_model)))

# Print statistics

print('Training samples:', train_count) print('Testing samples:', test_count) print('Accuracy to beat:', acc_to_beat)

print('Confusion matrix of total samples:\n',

np.sum(accuracy.confusion_matrix(y_predicted, y_test),axis=1))

print('Confusion matrix:\n',accuracy.confusion_matrix(y_predicted, y_test))

print('Accuracy:', accuracy.get_accuracy(y_predicted,y_test))

# Save model

save_model(model, model_filename) end = time.time()

print("\nTotal time needed: ", datetime.timedelta(seconds=(end - start)))

winsound.PlaySound("Success", winsound.SND_FILENAME)

#---

# getsplit.py

#---

# original code taken from:

# Garg, Yatharth (2018). Speech-Accent-Recognition [online].

# [cited 14 Nov. 2018].

# Available from World Wide Web:

# <URL: https://github.com/yatharth1908/Speech-Accent-Recognition>.

Function to filter audio files based on df columns df column options:

[age,age_of_english_onset,age_sex,birth_place,english_learning_method,

eng-lish_residence,length_of_english_residence,native_language,other_langu ages,sex]

:param df (DataFrame): Full unfiltered DataFrame :return (DataFrame): Filtered DataFrame

'''

chinese = df[df.native_language == 'chinese']

spanish = df[df.native_language == 'spanish']

english = df[df.native_language == 'english']

arabic = df[df.native_language == 'arabic']

#chinese = chinese[chinese.length_of_english_residence < 10]

#spanish = spanish[spanish.length_of_english_residence < 10]

#arabic = arabic[arabic.length_of_english_residence < 10]

df = df.append(chinese)

:param df (DataFrame): Pandas DataFrame of audio files to be split :param test_size (float): Percentage of total files to be split into test

:return X_train, X_test, y_train, y_test (tuple): Xs are list of df['language_num'] and Ys are df['native_language']

test_size = 10% train_size = 90%

'''

return

train_test_split(df['language_num'],df['native_language'],test_size=te st_size,random_state=1234)

if __name__ == '__main__':

'''

Console command example:

python bio_data.csv '''

csv_file = sys.argv[1]

df = pd.read_csv(csv_file) filtered_df = filter_df(df) print(split_people(filtered_df))

#---

# accuracy.py

#---

# original code taken from:

# Garg, Yatharth (2018). Speech-Accent-Recognition [online].

# [cited 14 Nov. 2018].

# Available from World Wide Web:

# <URL: https://github.com/yatharth1908/Speech-Accent-Recognition>.

:return: Predicted class of MFCC segment group '''

MFCCs =

MFCCs.reshape(MFCCs.shape[0],MFCCs.shape[1],MFCCs.shape[2],1) y_predicted = model.predict_classes(MFCCs,verbose=0) return(Counter(list(y_predicted)).most_common(1)[0][0])

def predict_prob_class_audio(MFCCs, model):

'''

Predict class based on MFCC samples' probabilities :param MFCCs: Numpy array of MFCCs

:param model: Trained model

:return: Predicted class of MFCC segment group '''

MFCCs =

MFCCs.reshape(MFCCs.shape[0],MFCCs.shape[1],MFCCs.shape[2],1) y_predicted = model.predict_proba(MFCCs,verbose=0)

return(np.argmax(np.sum(y_predicted,axis=0)))

def predict_class_all(X_train, model):

predictions.append(predict_class_audio(mfcc, model))

#predictions.append(predict_prob_class_audio(mfcc, model)) return predictions

def confusion_matrix(y_predicted,y_test):

'''

Create confusion matrix

:param y_predicted: list of predictions

:param y_test: numpy array of shape (len(y_test), number of classes). 1.'s at index of actual, otherwise 0.

:return: numpy array. confusion matrix '''

confusion_matrix =

np.zeros((len(y_test[0]),len(y_test[0])),dtype=int ) for index, predicted in enumerate(y_predicted):

confusion_matrix[np.argmax(y_test[index])][predicted] += 1 return(confusion_matrix)

def get_accuracy(y_predicted,y_test):

'''

Get accuracy

:param y_predicted: numpy array of predictions :param y_test: numpy array of actual

:return: accuracy '''

c_matrix = confusion_matrix(y_predicted,y_test)

return( np.sum(c_matrix.diagonal()) / float(np.sum(c_matrix))) if __name__ == '__main__':

pass

#---

# predict.py

#---

# original code taken from:

# Garg, Yatharth (2018). Speech-Accent-Recognition [online].

# [cited 14 Nov. 2018].

# Available from World Wide Web:

# <URL: https://github.com/yatharth1908/Speech-Accent-Recognition>.

# modified by Stavros Grigoriadis

#--- :param language_num (list): list of file names :return (numpy array): Down-sampled wav file '''

y, sr =

li-brosa.load('../Prediction_File/{}.wav'.format(language_num)) return(librosa.core.resample(y=y,orig_sr=sr,target_sr=RATE, scale=True))

def create_segmented_mfccs(X_train):

'''

Creates segmented MFCCs from X_train :param X_train: list of MFCCs

:return: segmented mfccs '''

segmented_mfccs = []

for mfcc in X_train:

segmented_mfccs.append(segment_one(mfcc)) return(segmented_mfccs)

def to_mfcc(wav):

'''

Converts wav file to Mel Frequency Ceptral Coefficients

:param wav (numpy array): Wav form :return (2d numpy array: MFCC '''

return(librosa.feature.mfcc(y=wav, sr=RATE, n_mfcc=N_MFCC))

def segment_one(mfcc):

'''

Creates segments from on mfcc image. If last segments is not long enough to be length of columns divided by COL_SIZE

:param mfcc (numpy array): MFCC array

:return (numpy array): Segmented MFCC array '''

segments = []

for start in range(0, int(mfcc.shape[1] / COL_SIZE)):

segments.append(mfcc[:, start * COL_SIZE:(start + 1) * COL_SIZE])

return(np.array(segments))

def create_segmented_mfccs(X_train):

'''

Creates segmented MFCCs from X_train :param X_train: list of MFCCs

:return: segmented mfccs '''

segmented_mfccs = []

for mfcc in X_train:

segmented_mfccs.append(segment_one(mfcc)) return(segmented_mfccs)

# Load Model

# Filter metadata to retrieve only files desired filtered_df = getsplit.filter_df(df)

# Train test split

X_predict, X_test, y_train, y_test = gets-plit.split_people(filtered_df)

X_predict = map(get_pred_wav, X_predict)

X_predict = map(to_mfcc, X_predict) y_predicted =

accu-racy.predict_class_all(create_segmented_mfccs(X_predict), new_model)

if y_predicted == [0]:

print ("Chinese Accent Found") if y_predicted == [1]:

print ("Spanish Accent Found") if y_predicted == [2]:

print ("English Accent Found") if y_predicted == [3]:

print ("Arabic Accent Found")