Source code for src.classification.applePy.classifier

import os
import math
import time

import numpy as np
import seaborn as sn
import matplotlib.pyplot as plt

from copy import deepcopy

from scipy.stats import randint

from mne import Epochs, concatenate_epochs, read_epochs
from mne.io import read_raw_eeglab, read_epochs_eeglab
from mne.channels import make_standard_montage
from mne.event import find_events

from sklearn import metrics
from sklearn.model_selection import GridSearchCV, RandomizedSearchCV
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import KFold, LeaveOneOut, GroupKFold

from pyriemann.estimation import Covariances

from classification.applePy.channel_selection import ElectrodeSelection
from classification.applePy.pipeline_catalogue import Pipeline_catalogue
from classification.applePy.sources_estimator import Sources_estimator
from classification.applePy.cnn import CNN

CONST_MAX_1by1_ELECTRODES = 30
CONST_MAX_5by5_ELECTRODES = 50
CONST_DEFAULT_RANDOMIZEDSEARCH_NBITER = 10


[docs]class ApplePyClassifier(BaseEstimator, TransformerMixin): """ Global class dealing with automated classification. Inherits BaseEstimator and TransformerMix, in order to make it adaptable to other estimators from other libraries. This class contains : \n - A pipeline catalogue with all the pipelines and the important information about the parameters to fit \n - All the predictions and the prediction probabilities for all pipelines \n - All the correct answers \n - All the scores, confusion matrices, precisions, recalls, and ROC infos for all matrices \n - The dataset and the labels, as well as source dataset \n - The group indices \n - The eventual test dataset, labels and groups \n """ def __init__(self, used_pipelines=None): self.classifier_log = [] self.pipeline_catalogue = Pipeline_catalogue(used_pipelines) self.catalogue = self.pipeline_catalogue.catalogue self.parameters = self.pipeline_catalogue.parameters_to_fit self.predictions = {key: [] for key in list(self.catalogue.keys())} self.predictions_proba = {key: [] for key in list(self.catalogue.keys())} self.expected_answers = [] self.scores = {key: [] for key in list(self.catalogue.keys())} self.scores_fold = {key: [] for key in list(self.catalogue.keys())} self.confusion_matrices = {key: [] for key in list(self.catalogue.keys())} self.precisions = {key: [] for key in list(self.catalogue.keys())} self.recalls = {key: [] for key in list(self.catalogue.keys())} self.roc_infos = {key: [] for key in list(self.catalogue.keys())} self.dataset = None self.labels = None self.dataset_sources = None self.labels_sources = None self.electrodes = None self.groups = None self.nb_subj = 1 self.nb_paradigms = None self.vertices = None self.event_names = None self.test_dataset = [] self.test_labels = [] self.test_groups = [] """ Pipelines """
[docs] def modify_add_pipeline(self, name, pipeline, parameters): """ Modifies or adds a new pipeline to the catalogue. \n Parameters \n ---------- \n name : string \n name of the pipeline \n pipeline : instance of pipeline \n the pipeline to be used \n parameters : see doc for pipeline catalogue \n the notation for the parameters to fit \n """ new_pipeline = True if name in self.catalogue.keys(): new_pipeline = False self.pipeline_catalogue.modify_add_pipeline(name, pipeline, parameters) if new_pipeline: self.predictions[name] = [] self.predictions_proba[name] = []
[docs] def delete_pipelines(self, pipeline_names): """ Deletes a pipeline by calling the method from the pipeline catalogue. Additionally, deletes all occurences of the pipeline. \n Parameters \n ---------- \n name : string \n name of the pipeline \n """ for name in pipeline_names: names = list(self.catalogue.keys()) names.sort() if name not in names: text = "Pipeline " + name + " does not exist." raise Exception(text) self.pipeline_catalogue.delete_pipeline(name) del self.predictions[name] del self.predictions_proba[name] del self.scores[name] del self.scores_fold[name] del self.confusion_matrices[name] del self.precisions[name] del self.recalls[name] del self.roc_infos[name]
[docs] def modify_parameters(self, name, parameters): """ Modifies a pipeline's parameters by calling the method from the pipeline catalogue. \n Parameters \n ---------- \n name : string \n name of the pipeline \n parameters : see doc for pipeline catalogue \n the notation for the parameters to fit \n """ self.pipeline_catalogue.modify_parameters(name, parameters)
[docs] def restore(self): """ Restores the classifier by restoring the predictions, predictions probabilities, expected answers, scores, confusion matrices, precisions, recalls and roc info """ self.predictions = {key: [] for key in list(self.catalogue.keys())} self.predictions_proba = {key: [] for key in list(self.catalogue.keys())} self.expected_answers = [] self.scores = {key: [] for key in list(self.catalogue.keys())} self.confusion_matrices = {key: [] for key in list(self.catalogue.keys())} self.precisions = {key: [] for key in list(self.catalogue.keys())} self.recalls = {key: [] for key in list(self.catalogue.keys())} self.roc_infos = {key: [] for key in list(self.catalogue.keys())}
""" File reading """
[docs] def read_one_file(self, file_path, file_name, destination, bads=None, picks=None, filtering=(1, 45), tmin=0, tmax=0.5, ICA=False, resample=False, baseline=None, event_ids=None, reference=None): """ Reads one non pre-epoched (raw) set file and saves the result in a -epo.fif file. \n Parameters \n ---------- \n file_path : path to the file to be opened \n file_name : name of the file \n destination : path where the -epo.fif file will be saved \n bads : list of electrodes to be rejected \n picks : list of electrodes to be worked on \n filtering : tuple containing the higher and lower frequencies to filter the data \n tmin, tmax : tmin, tmax for delimiting the epochs in time \n ICA : Boolean, whether or not to apply Independent Component Analysis \n resample : boolean, whether or not to resample the data at 512 Hz \n baseline : the baseline to be applied to the data \n event_ids : The id of the event to consider \n reference : the name of the reference to be applied to the data \n """ print("Reading ", file_name) eeg = read_raw_eeglab(file_path, preload=True) if reference is not None: eeg.set_eeg_reference(reference, projection=False) if resample: eeg.resample(512, npad='auto') if filtering[0] is None: filtering[0] = eeg.info['highpass'] if filtering[1] is None: filtering[1] = eeg.info['lowpass'] eeg.filter(filtering[0], filtering[1]) if filtering[0] < 50 and filtering[1] > 50: eeg.notch_filter(np.asarray([47.5, 50, 52.5])) try: events = find_events(eeg) except ValueError: print("Invalid events in ", file_name) raise Exception() eeg.drop_channels(eeg.info['bads']) if bads is not None: eeg.drop_channels(bads) if picks is not None: eeg.pick_channels(picks) if ICA: ica = ICA(method="extended-infomax", random_state=1) ica.fit(eeg) ica.plot_components(inst=eeg) ica.apply(eeg) epochs = Epochs(eeg, events, event_id=event_ids, tmin=tmin, tmax=tmax, baseline=baseline) file_name = file_name[:-4] + '-epo.fif' fname = os.path.join(destination, file_name) epochs.save(fname)
[docs] def prepare_nonepoched_dataset(self, directory, nb_subj, tmin, tmax, bads=None, picks=None, filtering=[None, None], ICA=False, resample=False, baseline=None, event_ids=[None, None], reference=None): """ Prepare a subjects x epochs x channels x times dataset from raw files \n Parameters \n ---------- \n directory : the path to the directory containing the dataset \n nb_subj : the number of subjects to be considered \n tmin, tmax : tmin, tmax for delimiting the epochs in time \n bads : list of electrodes to be rejected \n picks : list of electrodes to be worked on \n filtering : tuple containing the higher and lower frequencies to filter the data \n tmin, tmax : tmin, tmax for delimiting the epochs in time \n ICA : Boolean, whether or not to apply Independent Component Analysis \n resample : boolean, whether or not to resample the data at 512 Hz \n baseline : the baseline to be applied to the data \n event_ids : The id of the event to consider \n reference : the name of the reference to be applied to the data \n """ # MNE loading parameters # montage = make_standard_montage("standard_1005") # Load raw EEG data # ================= data_root_folder = directory paradigms = os.listdir(directory) # epoched_raw_eeg_dataset = [[] for _ in range(nb_subj)] # epoched_raw_eeg_sources = [[] for _ in range(nb_subj)] # labels = [[] for _ in range(nb_subj)] # labels_sources = [[] for _ in range(nb_subj)] file_names = [] # epochs = [] # groups = [] directory_new = os.path.join(directory, "epoched") if not os.path.exists(directory_new): os.makedirs(directory_new) for label, paradigm in enumerate(paradigms): data_class_folder = os.path.join(data_root_folder, paradigm) epoched_data_class_folder = os.path.join(directory_new, paradigm) list_of_files = [] list_of_dirs = [] event_id = event_ids[label] for raw_eeg_file in os.listdir(data_class_folder): if ".set" in raw_eeg_file: list_of_files.append(raw_eeg_file) if "directory" in raw_eeg_file: list_of_dirs.append(raw_eeg_file) list_of_files.sort() list_of_dirs.sort() subject_i = 0 if not os.path.exists(epoched_data_class_folder): os.makedirs(epoched_data_class_folder) for subject_i, raw_eeg_file in enumerate(list_of_files): raw_file_path = os.path.join(data_class_folder, raw_eeg_file) self.read_one_file(raw_file_path, raw_eeg_file, epoched_data_class_folder, bads, picks, filtering, tmin, tmax, ICA=ICA, resample=resample, baseline=baseline, event_ids=event_id, reference=reference) for subject_j, raw_eeg_dir in enumerate(list_of_dirs): raw_path_subject = os.path.join(data_class_folder, raw_eeg_dir) epoched_raw_path_subject = os.path.join(epoched_data_class_folder, raw_eeg_dir) if not os.path.exists(epoched_raw_path_subject): os.makedirs(epoched_raw_path_subject) subject_files = [] for raw_eeg_file in os.listdir(raw_path_subject): if ".set" in raw_eeg_file: subject_files.append(raw_eeg_file) # nb_files = len(subject_files) if subject_i + subject_j < nb_subj: file_names.append(raw_eeg_dir) for raw_eeg_file in subject_files: raw_file_path = os.path.join(raw_path_subject, raw_eeg_file) self.read_one_file(raw_file_path, raw_eeg_file, epoched_raw_path_subject, bads, picks, filtering, tmin, tmax, ICA=ICA, resample=resample, baseline=baseline, event_ids=event_id, reference=reference)
[docs] def read_all_files(self, directory, nb_subj=None, divided_dataset=True, tmin=0, tmax=0.5, bads=None, picks=None, filtering=[None, None], pre_epoched=True, ICA=False, resample=False, baseline=None, event_ids=[None, None], reference=None): """ Create a subjects x epochs x channels x times dataset from epoched files. \n Parameters \n ---------- \n directory : the path to the directory containing the dataset \n nb_subj : the number of subjects to be considered \n divided_dataset : boolean; whether or not the dataset is divided by classes \n tmin, tmax : tmin, tmax for delimiting the epochs in time \n bads : list of electrodes to be rejected \n picks : list of electrodes to be worked on \n filtering : tuple containing the higher and lower frequencies to filter the data \n pre_epoched : boolean; whether or not the dataset has been pre-epoched \n tmin, tmax : tmin, tmax for delimiting the epochs in time \n ICA : Boolean, whether or not to apply Independent Component Analysis \n resample : boolean, whether or not to resample the data at 512 Hz \n baseline : the baseline to be applied to the data \n event_ids : The id of the event to consider \n reference : the name of the reference to be applied to the data \n """ if nb_subj is None: nb_subj = self.count_subjects(directory) if pre_epoched and not divided_dataset: self.read_preepoched_oneFilePerSubj(directory, nb_subj, tmin, tmax, bads=bads, picks=picks, filtering=filtering, ICA=ICA, resample=resample, baseline=baseline, event_ids=event_ids, reference=None) return self.classifier_log.extend( [("directory", directory), ("nb of subjects", nb_subj), ("tmin", tmin), ("tmax", tmax), ("bads", bads), ("picks", picks), ("filtering", filtering), ('pre_epoched', pre_epoched), ('ICA', ICA), ('resample', resample), ('baseline', baseline), ('event_ids', event_ids), ('reference', reference)]) # MNE loading parameters montage = make_standard_montage("standard_1005") if not pre_epoched: if divided_dataset: self.prepare_nonepoched_dataset(directory, nb_subj, tmin, tmax, bads=bads, picks=picks, filtering=filtering, ICA=ICA, resample=resample, baseline=baseline, event_ids=event_ids, reference=reference) # print("would have prepared") else: self.prepare_nonepoched_dataset_oneFilePerSubj(directory, nb_subj, tmin, tmax, bads=bads, picks=picks, filtering=filtering, ICA=ICA, resample=resample, baseline=baseline, event_ids=event_ids, reference=reference) directory = os.path.join(directory, "epoched") # Load raw EEG data # ================= data_root_folder = directory paradigms = os.listdir(directory) epoched_raw_eeg_dataset = [[] for _ in range(nb_subj)] epoched_raw_eeg_sources = [[] for _ in range(nb_subj)] labels = [[] for _ in range(nb_subj)] labels_sources = [[] for _ in range(nb_subj)] file_names = [] # epochs = [] groups = [] enumerated_paradigms = enumerate(paradigms) self.nb_paradigms = len(paradigms) self.classifier_log.append(('nb of paradigms', self.nb_paradigms)) for label, paradigm in enumerated_paradigms: event_id = event_ids[label] data_class_folder = os.path.join(data_root_folder, paradigm) list_of_files = [] list_of_dirs = [] for raw_eeg_file in os.listdir(data_class_folder): if (".set" in raw_eeg_file) or ("-epo.fif" in raw_eeg_file): list_of_files.append(raw_eeg_file) if "directory" in raw_eeg_file: list_of_dirs.append(raw_eeg_file) list_of_files.sort() list_of_dirs.sort() subject_i = -1 for subject_i, raw_eeg_file in enumerate(list_of_files): if subject_i < nb_subj: file_names.append(raw_eeg_file) raw_file_path = os.path.join(data_class_folder, raw_eeg_file) if pre_epoched: epoched_eeg = read_epochs_eeglab(raw_file_path, montage=montage, event_id=event_id) if reference is not None: epoched_eeg.set_eeg_reference(reference, projection=False) if filtering[0] is None: filtering[0] = epoched_eeg.info['highpass'] if filtering[1] is None: filtering[1] = epoched_eeg.info['lowpass'] epoched_eeg.filter(filtering[0], filtering[1]) if picks is not None: epoched_eeg.pick_channels(picks) epoched_eeg.crop(tmin=tmin, tmax=tmax) if resample: epoched_eeg.resample(512) else: epoched_eeg = read_epochs( raw_file_path) # not read_epochs_eeglab because it doesn't work for .fif, and because it's already preprocessed epoched_sources = deepcopy(epoched_eeg) epoched_data = epoched_eeg.get_data() epoched_labels = np.zeros(epoched_data.shape[0], dtype=np.int) + label epoched_labels_sources = label epoched_raw_eeg_dataset[subject_i].append(epoched_data) labels[subject_i].append(epoched_labels) epoched_raw_eeg_sources[subject_i].append(epoched_sources) labels_sources.append(epoched_labels_sources) for subject_j, raw_eeg_dir in enumerate(list_of_dirs): raw_path_subject = os.path.join(data_class_folder, raw_eeg_dir) subject_files = [] for raw_eeg_file in os.listdir(raw_path_subject): if (".set" in raw_eeg_file) or ("-epo.fif" in raw_eeg_file): subject_files.append(raw_eeg_file) nb_files = len(subject_files) if subject_i + subject_j + 1 < nb_subj: file_names.append(raw_eeg_dir) epoched_raw_eeg_dataset[subject_i + subject_j + 1].append([]) labels[subject_i + subject_j + 1].append([]) epoched_raw_eeg_sources[subject_i + subject_j + 1].append([]) for raw_eeg_file in subject_files: raw_file_path = os.path.join(raw_path_subject, raw_eeg_file) if pre_epoched: epoched_eeg = read_epochs_eeglab(raw_file_path, montage=montage, event_id=event_id) if reference is not None: epoched_eeg.set_eeg_reference(reference, projection=False) if filtering[0] is None: filtering[0] = epoched_eeg.info['highpass'] if filtering[1] is None: filtering[1] = epoched_eeg.info['lowpass'] epoched_eeg.filter(filtering[0], filtering[1]) if picks is not None: epoched_eeg.pick_channels(picks) epoched_eeg.crop(tmin=tmin, tmax=tmax) else: epoched_eeg = read_epochs(raw_file_path) epoched_sources = deepcopy(epoched_eeg) epoched_data = epoched_eeg.get_data() epoched_labels = np.zeros(epoched_data.shape[0], dtype=np.int) + label epoched_labels_sources = label epoched_raw_eeg_dataset[subject_i + subject_j + 1][label].append(epoched_data) labels[subject_i + subject_j + 1][label].append(epoched_labels) epoched_raw_eeg_sources[subject_i + subject_j + 1][label].append(epoched_sources) labels_sources.append(epoched_labels_sources) if nb_files > 1: epoched_raw_eeg_dataset[subject_i + subject_j + 1][label] = np.asarray( np.concatenate(epoched_raw_eeg_dataset[subject_i + subject_j + 1][label])) labels[subject_i + subject_j + 1][label] = np.asarray( np.concatenate(labels[subject_i + subject_j + 1][label])) epoched_raw_eeg_sources[subject_i + subject_j + 1][label] = concatenate_epochs( epoched_raw_eeg_sources[subject_i + subject_j + 1][label]) else: epoched_raw_eeg_dataset[subject_i + subject_j + 1][label] = np.asarray( epoched_raw_eeg_dataset[subject_i + subject_j + 1][label][0]) labels[subject_i + subject_j + 1][label] = np.asarray( labels[subject_i + subject_j + 1][label][0]) epoched_raw_eeg_sources[subject_i + subject_j + 1][label] = \ epoched_raw_eeg_sources[subject_i + subject_j + 1][label][0] for subject_i in range(len(epoched_raw_eeg_dataset)): if self.nb_paradigms == 2: first = epoched_raw_eeg_dataset[subject_i][0].shape[0] second = epoched_raw_eeg_dataset[subject_i][1].shape[0] minimum = min([first, second]) epoched_raw_eeg_dataset[subject_i][0] = epoched_raw_eeg_dataset[subject_i][0][:minimum] epoched_raw_eeg_dataset[subject_i][1] = epoched_raw_eeg_dataset[subject_i][1][:minimum] labels[subject_i][0] = labels[subject_i][0][:minimum] labels[subject_i][1] = labels[subject_i][1][:minimum] epoched_raw_eeg_dataset[subject_i] = np.concatenate(np.asarray(epoched_raw_eeg_dataset[subject_i]), axis=0) for k in range(epoched_raw_eeg_dataset[subject_i].shape[0]): groups.append(subject_i) labels[subject_i] = np.concatenate(np.asarray(labels[subject_i]), axis=0) epoched_raw_eeg_sources = np.asarray(epoched_raw_eeg_sources) labels_sources = np.asarray(labels_sources) if picks is not None: electrodes = picks else: electrodes = epoched_raw_eeg_sources[0][0].info['ch_names'] epoched_raw_eeg_dataset = np.asarray(epoched_raw_eeg_dataset) # labels_sources = np.asarray(labels_sources) labels = np.asarray(labels) self.dataset = epoched_raw_eeg_dataset self.dataset_sources = epoched_raw_eeg_sources self.labels = labels self.electrodes = electrodes self.groups = np.asarray(groups) self.vertices = [] labs = np.concatenate(self.labels) unique, counts = np.unique(labs, return_counts=True) occurences = dict(zip(unique, counts)) self.classifier_log.append(("occurences from each class : ", occurences))
[docs] def read_preepoched_oneFilePerSubj(self, directory, nb_subj, tmin, tmax, bads=None, picks=None, filtering=[None, None], ICA=False, resample=False, baseline=None, event_ids=[None, None], reference=None): """ Create a subjects x epochs x channels x times dataset from epoched files. \n Parameters \n ---------- \n directory : the path to the directory containing the dataset \n nb_subj : the number of subjects to be considered \n tmin, tmax : tmin, tmax for delimiting the epochs in time \n bads : list of electrodes to be rejected \n picks : list of electrodes to be worked on \n filtering : tuple containing the higher and lower frequencies to filter the data \n tmin, tmax : tmin, tmax for delimiting the epochs in time \n ICA : Boolean, whether or not to apply Independent Component Analysis \n resample : boolean, whether or not to resample the data at 512 Hz \n baseline : the baseline to be applied to the data \n event_ids : The id of the event to consider \n reference : the name of the reference to be applied to the data \n """ self.classifier_log.extend( [("directory", directory), ("nb_subj", nb_subj), ("tmin", tmin), ("tmax", tmax), ("bads", bads), ("picks", picks), ("filtering", filtering), ('pre_epoched', True), ('ICA', ICA), ('resample', resample), ('baseline', baseline), ('event_ids', event_ids), ('reference', reference)]) # MNE loading parameters montage = make_standard_montage("standard_1005") self.nb_paradigms = len(event_ids) # self.classifier_log.append(('nb of paradigms', nb_paradigms)) # Load raw EEG data # ================= data_root_folder = directory epoched_raw_eeg_dataset = [[] for _ in range(nb_subj)] epoched_raw_eeg_sources = [[] for _ in range(nb_subj)] labels = [[] for _ in range(nb_subj)] labels_sources = [[] for _ in range(nb_subj)] file_names = [] # epochs = [] groups = [] list_of_files = [] list_of_dirs = [] for raw_eeg_file in os.listdir(data_root_folder): if (".set" in raw_eeg_file) or ("-epo.fif" in raw_eeg_file): list_of_files.append(raw_eeg_file) if "directory" in raw_eeg_file: list_of_dirs.append(raw_eeg_file) list_of_files.sort() list_of_dirs.sort() # subject_i = -1 for subject_i, raw_eeg_file in enumerate(list_of_files): if subject_i < nb_subj: file_names.append(raw_eeg_file) raw_file_path = os.path.join(data_root_folder, raw_eeg_file) # data_class_folder for condition in range(len(event_ids)): label = condition epoched_eeg = read_epochs_eeglab(raw_file_path, event_id=event_ids[condition]) # montage=montage if reference is not None: epoched_eeg.set_eeg_reference(reference, projection=False) if filtering[0] is None: filtering[0] = epoched_eeg.info['highpass'] if filtering[1] is None: filtering[1] = epoched_eeg.info['lowpass'] epoched_eeg.filter(filtering[0], filtering[1]) if picks is not None: epoched_eeg.pick_channels(picks) epoched_eeg.crop(tmin=tmin, tmax=tmax) epoched_sources = deepcopy(epoched_eeg) epoched_data = epoched_eeg.get_data() epoched_labels = np.zeros(epoched_data.shape[0], dtype=np.int) + label epoched_labels_sources = label epoched_raw_eeg_dataset[subject_i].append(epoched_data) labels[subject_i].append(epoched_labels) epoched_raw_eeg_sources[subject_i].append(epoched_sources) labels_sources.append(epoched_labels_sources) for subject_j, raw_eeg_dir in enumerate(list_of_dirs): raw_path_subject = os.path.join(raw_file_path, raw_eeg_dir) subject_files = [] for raw_eeg_file_path in os.listdir(raw_path_subject): if (".set" in raw_eeg_file_path) or ("-epo.fif" in raw_eeg_file_path): subject_files.append(raw_eeg_file_path) nb_files = len(subject_files) if subject_i + subject_j + 1 < nb_subj: file_names.append(raw_eeg_dir) epoched_raw_eeg_dataset[subject_i + subject_j + 1].append([]) labels[subject_i + subject_j + 1].append([]) epoched_raw_eeg_sources[subject_i + subject_j + 1].append([]) for raw_eeg_subject_file in subject_files: raw_file_path = os.path.join(raw_path_subject, raw_eeg_subject_file) for condition in range(len(event_ids)): label = condition epoched_eeg = read_epochs_eeglab(raw_file_path, montage=montage, event_id=event_ids[condition]) if reference is not None: epoched_eeg.set_eeg_reference(reference, projection=False) if filtering[0] is None: filtering[0] = epoched_eeg.info['highpass'] if filtering[1] is None: filtering[1] = epoched_eeg.info['lowpass'] epoched_eeg.filter(filtering[0], filtering[1]) if picks is not None: epoched_eeg.pick_channels(picks) epoched_eeg.crop(tmin=tmin, tmax=tmax) epoched_sources = deepcopy(epoched_eeg) epoched_data = epoched_eeg.get_data() epoched_labels = np.zeros(epoched_data.shape[0], dtype=np.int) + label epoched_labels_sources = label epoched_raw_eeg_dataset[subject_i + subject_j + 1][label].append(epoched_data) labels[subject_i + subject_j + 1][label].append(epoched_labels) epoched_raw_eeg_sources[subject_i + subject_j + 1][label].append(epoched_sources) labels_sources.append(epoched_labels_sources) if nb_files > 1: epoched_raw_eeg_dataset[subject_i + subject_j + 1][label] = np.asarray( np.concatenate(epoched_raw_eeg_dataset[subject_i + subject_j + 1][label])) labels[subject_i + subject_j + 1][label] = np.asarray( np.concatenate(labels[subject_i + subject_j + 1][label])) epoched_raw_eeg_sources[subject_i + subject_j + 1][label] = concatenate_epochs( epoched_raw_eeg_sources[subject_i + subject_j + 1][label]) else: epoched_raw_eeg_dataset[subject_i + subject_j + 1][label] = np.asarray( epoched_raw_eeg_dataset[subject_i + subject_j + 1][label][0]) labels[subject_i + subject_j + 1][label] = np.asarray( labels[subject_i + subject_j + 1][label][0]) epoched_raw_eeg_sources[subject_i + subject_j + 1][label] = \ epoched_raw_eeg_sources[subject_i + subject_j + 1][label][0] for subject_i in range(len(epoched_raw_eeg_dataset)): if self.nb_paradigms == 2: first = epoched_raw_eeg_dataset[subject_i][0].shape[0] second = epoched_raw_eeg_dataset[subject_i][1].shape[0] minimum = min([first, second]) epoched_raw_eeg_dataset[subject_i][0] = epoched_raw_eeg_dataset[subject_i][0][:minimum] epoched_raw_eeg_dataset[subject_i][1] = epoched_raw_eeg_dataset[subject_i][1][:minimum] labels[subject_i][0] = labels[subject_i][0][:minimum] labels[subject_i][1] = labels[subject_i][1][:minimum] epoched_raw_eeg_dataset[subject_i] = np.concatenate(np.asarray(epoched_raw_eeg_dataset[subject_i]), axis=0) for k in range(epoched_raw_eeg_dataset[subject_i].shape[0]): groups.append(subject_i) labels[subject_i] = np.concatenate(np.asarray(labels[subject_i]), axis=0) # epoched_raw_eeg_sources = np.asarray(epoched_raw_eeg_sources) labels_sources = np.asarray(labels_sources) electrodes = epoched_raw_eeg_sources[0][0].info['ch_names'] epoched_raw_eeg_dataset = np.asarray(epoched_raw_eeg_dataset) # labels_sources = np.asarray(labels_sources) labels = np.asarray(labels) self.dataset = epoched_raw_eeg_dataset self.dataset_sources = epoched_raw_eeg_sources self.labels = labels self.electrodes = electrodes self.groups = np.asarray(groups) self.vertices = [] labs = np.concatenate(self.labels) unique, counts = np.unique(labs, return_counts=True) occurences = dict(zip(unique, counts)) self.classifier_log.append(("occurences from each class : ", occurences))
[docs] def prepare_nonepoched_dataset_oneFilePerSubj(self, directory, nb_subj, tmin, tmax, bads=None, picks=None, filtering=[None, None], ICA=False, resample=False, baseline=None, event_ids=[None, None]): """ Prepare a subjects x epochs x channels x times dataset from raw files \n Parameters \n ---------- \n directory : the path to the directory containing the dataset \n nb_subj : the number of subjects to be considered \n tmin, tmax : tmin, tmax for delimiting the epochs in time \n bads : list of electrodes to be rejected \n picks : list of electrodes to be worked on \n filtering : tuple containing the higher and lower frequencies to filter the data \n tmin, tmax : tmin, tmax for delimiting the epochs in time \n ICA : Boolean, whether or not to apply Independent Component Analysis \n resample : boolean, whether or not to resample the data at 512 Hz \n baseline : the baseline to be applied to the data \n event_ids : The id of the event to consider \n reference : the name of the reference to be applied to the data \n """ # MNE loading parameters # montage = make_standard_montage("standard_1005") # Load raw EEG data # ================= data_root_folder = directory # epoched_raw_eeg_dataset = [[] for _ in range(nb_subj)] # epoched_raw_eeg_sources = [[] for _ in range(nb_subj)] # labels = [[] for _ in range(nb_subj)] # labels_sources = [[] for _ in range(nb_subj)] file_names = [] # epochs = [] # groups = [] directory_new = os.path.join(directory, "epoched") if not os.path.exists(directory_new): os.makedirs(directory_new) for paradigm in range(len(event_ids)): epoched_data_class_folder = os.path.join(directory_new, str(paradigm)) list_of_files = [] list_of_dirs = [] event_id = event_ids[paradigm] for raw_eeg_file in os.listdir(data_root_folder): if ".set" in raw_eeg_file: list_of_files.append(raw_eeg_file) if "directory" in raw_eeg_file: list_of_dirs.append(raw_eeg_file) list_of_files.sort() list_of_dirs.sort() subject_i = 0 if not os.path.exists(epoched_data_class_folder): os.makedirs(epoched_data_class_folder) for subject_i, raw_eeg_file in enumerate(list_of_files): raw_file_path = os.path.join(data_root_folder, raw_eeg_file) self.read_one_file(raw_file_path, raw_eeg_file, epoched_data_class_folder, bads, picks, filtering, tmin, tmax, ICA=ICA, resample=resample, baseline=baseline, event_ids=event_id) for subject_j, raw_eeg_dir in enumerate(list_of_dirs): raw_path_subject = os.path.join(data_root_folder, raw_eeg_dir) epoched_raw_path_subject = os.path.join(epoched_data_class_folder, raw_eeg_dir) if not os.path.exists(epoched_raw_path_subject): os.makedirs(epoched_raw_path_subject) subject_files = [] for raw_eeg_file in os.listdir(raw_path_subject): if ".set" in raw_eeg_file: subject_files.append(raw_eeg_file) # nb_files = len(subject_files) if subject_i + subject_j < nb_subj: file_names.append(raw_eeg_dir) for raw_eeg_file in subject_files: raw_file_path = os.path.join(raw_path_subject, raw_eeg_file) self.read_one_file(raw_file_path, raw_eeg_file, epoched_raw_path_subject, bads, picks, filtering, tmin, tmax, ICA=ICA, resample=resample, baseline=baseline, event_ids=event_id)
""" Computation """
[docs] def estimate_sources(self, raw_dataset, info, tmin_noise, tmax_noise, trans=None, sourceSpaces=None, bemSolution=None, mixedSourceSpaces=None, loose=1, snr=3, fixed=False): """ Estimates the sources for the dataset using a Sources_estimator object. \n Parameters \n --------- \n raw_dataset : the dataset to be estimated \n info : info dictionary for the EEG recordings (see Epochs.info) \n tmin_noise, tmax_noise : tmin and tmax for delimiting the noise estimation \n trans=None : path to the eventual coregistration file \n sourceSpaces : path to the eventual source spaces file \n bemSolution : path to the eventual bem solution file \n mixedSourceSpaces : path to the eventual mixed source spaces file \n loose : between 0 and 1. "value that weights the source variances of the dipole components that are parallel (tangential) to the cortical surface" \n snr = signal to noise ratio value \n fixed : Boolean, whether or not to use fixed source orientations normal to the cortical mantle \n """ sources_estimator = Sources_estimator(raw_dataset, info, tmin_noise, tmax_noise, trans=trans, sourceSpaces=sourceSpaces, bemSolution=bemSolution, mixedSourceSpaces=mixedSourceSpaces, loose=loose, snr=snr, fixed=fixed) labels, labeled_sources = sources_estimator.estimate_sources() self.dataset_sources = labeled_sources return labels
[docs] def estimate_covariance_matrices(self, dataset): """ Creates the simple covariance matrices for the raw data. \n Result format = nb_subj x nb_epochs x nb_channels x nb_channels \n Parameters \n ---------- \n dataset : the dataset on which to estimate the covariance matrices. \n """ covariance_matrices = [] if self.nb_subj == 1: covariance_matrices.append(Covariances("oas").transform(dataset)) else: for subject in dataset: covariance_matrices.append(Covariances("oas").transform(subject)) return np.asarray(covariance_matrices)
[docs] def independent_features_selection(self, use_sources=False, channels_to_select=None, use_groups=True): """ Applies independent channel or zone selection on the data, without the pipelines. Only the selected channels or zones will be kept on the final data. \n Parameters \n ---------- \n use_sources : boolean; whether or not to use sources for feature selection \n cv : int; number of folds for cross validation \n channels_to_select : int; if any, the desired number of features \n use_groups : boolean; whether or not to use groups when dividing the data \n """ if not use_sources: dataset = self.dataset feats = self.electrodes else: dataset = self.dataset_sources feats = self.vertices dataset_cov = self.estimate_covariance_matrices(dataset.get_data()) labels = self.labels nb_subjects = dataset_cov.shape[0] nb_channels = dataset_cov[0].shape[1] if nb_subjects == 1: dataset_cov = dataset_cov[0] else: dataset_cov = np.concatenate(dataset_cov) labels = np.concatenate(labels) cv = self.create_folder(4) if nb_channels <= CONST_MAX_1by1_ELECTRODES: tries = [i for i in range(1, nb_channels + 1)] elif nb_channels <= CONST_MAX_5by5_ELECTRODES: tries = [i for i in range(1, nb_channels + 1, 5)] if nb_channels % 5 != 0: tries.append(nb_channels) else: tries = [i for i in range(1, nb_channels + 1, 10)] if nb_channels % 10 != 0: tries.append(nb_channels) if channels_to_select is None: es = GridSearchCV(ElectrodeSelection(), param_grid={"nelec": tries}, cv=cv, scoring=self.score_func_independent_feat_selection, n_jobs=-1) if use_groups: es.fit(dataset_cov, labels, self.groups) else: es.fit(dataset_cov, labels) # idx = es.best_index_ selected_estimator = es.best_estimator_ print("Number of selected features : ", selected_estimator.nelec) selected_feats_idx = selected_estimator.subelec_ selected_feats = [feats[i] for i in selected_feats_idx] additional_informations = [("independent features selection", True), ("pre_selected number of features", False), ("selected number of features", selected_estimator.nelec), ("features", selected_feats)] else: print("You have chosen to select the best ", channels_to_select, " channels.") es = ElectrodeSelection(nelec=channels_to_select) print(dataset_cov.shape) print(labels.shape) es.fit(dataset_cov, labels) selected_feats_idx = es.subelec_ selected_feats = [feats[i] for i in selected_feats_idx] additional_informations = [("independent ChanVertexSelection", True), ("pre_selected number of features", channels_to_select), ("features", selected_feats)] self.classifier_log.extend(additional_informations) print("Selected features are : ", selected_feats) # for person_idx in range(self.dataset.shape[0]): # self.dataset[person_idx] = self.dataset[person_idx][:, selected_feats_idx, :] return self.dataset
[docs] def tune_hyperparameters(self, cv, use_sources=False, use_groups=True, factor=None): """ Tune the hyperparameters for the different pipelines and replace the pipelines by their improved versions. \n Parameters \n ---------- \n cv : cross validation for tuning \n use_sources : boolean; whether or not to use the sources dataset \n use_groups : boolean; whether or not to use groups for the cross validation \n factor : int; the stair by which to augment the number of filters or electrodes \n """ self.classifier_log.extend([("hyperparameters tuning", True), ("cross validation for tuning", cv)]) max_filters = int(len(self.electrodes) / 2) if max_filters > 5: if factor is None: additional_filters = [i for i in range(10, max_filters + 1, 10)] else: additional_filters = [i for i in range(10, max_filters + 1, factor)] if max_filters % 5 != 0: additional_filters.append(max_filters) self.pipeline_catalogue.XDAWN_filters.extend(additional_filters) self.pipeline_catalogue.csp_filters.extend(additional_filters) if use_sources: dataset = self.dataset_sources else: dataset = self.dataset labels = self.labels cv = self.create_folder(cv) nb_subjects = self.nb_subj nb_channels = len(self.electrodes) if nb_subjects == 1: dataset = dataset[0] labels = labels[0] else: dataset = np.concatenate(dataset) labels = np.concatenate(labels) if factor is None: if nb_channels <= CONST_MAX_1by1_ELECTRODES: nb_elec = [i for i in range(1, nb_channels + 1)] elif nb_channels <= CONST_MAX_5by5_ELECTRODES: nb_elec = [i for i in range(1, nb_channels + 1, 5)] if nb_channels % 5 != 0: nb_elec.append(nb_channels) else: nb_elec = [i for i in range(1, nb_channels + 1, 10)] if nb_channels % 10 != 0: nb_elec.append(nb_channels) else: nb_elec = [i for i in range(1, nb_channels + 1)] if nb_channels % factor != 0: nb_elec.append(nb_channels) self.pipeline_catalogue.nb_elec = nb_elec pipelines = list(self.catalogue.keys()) pipelines.sort() grid_searches = {} random_searches = {} for pipeline in pipelines: if self.parameters[pipeline][0] == 0: pass elif self.parameters[pipeline][0] == 1: grid_searches[pipeline] = GridSearchCV(self.catalogue[pipeline], param_grid=self.parameters[pipeline][1], cv=cv) elif self.parameters[pipeline][0] == 2: distribution = randint(1, max_filters) self.parameters[pipeline][1] = {self.parameters[pipeline][1][i]: distribution for i in range(len(self.parameters[pipeline][1]))} random_searches[pipeline] = RandomizedSearchCV(self.catalogue[pipeline], param_distributions=self.parameters[pipeline][1], n_iter=self.parameters[pipeline][2], cv=cv, n_jobs=-1) for gs in grid_searches: if use_groups: grid_searches[gs].fit(dataset, labels, groups=self.groups) else: grid_searches[gs].fit(dataset, labels) selected_estimator = grid_searches[gs].best_estimator_ self.catalogue[gs] = selected_estimator for gs in random_searches: if use_groups: random_searches[gs].fit(dataset, labels, groups=self.groups) else: random_searches[gs].fit(dataset, labels) selected_estimator = random_searches[gs].best_estimator_ self.catalogue[gs] = selected_estimator print("Pipelines' hyperparameters have been tuned")
[docs] def fit(self, x_train, y_train): """ Fits all the pipeline to a provided dataset. \n Parameters \n ---------- \n x_train : the training samples \n y_train : the correct answers to the training samples \n """ pipelines = list(self.catalogue.keys()) pipelines.sort() for pipeline_name in pipelines: pipeline = self.catalogue[pipeline_name] try: pipeline.fit(x_train, y_train) except: print("Errors encountered with pipeline " + pipeline_name + ". This pipeline will be removed.") self.delete_pipelines([pipeline_name]) return self
[docs] def fit_transform(self, x, y): self.fit(x, y)
[docs] def predict(self, x_test): """ Predicts the values for a dataset. \n Parameters \n ---------- \n x_test : the test samples \n """ pipelines = list(self.catalogue.keys()) pipelines.sort() pipeline_counter = 0 round_predictions = [] for pipeline_name in pipelines: pipeline = self.catalogue[pipeline_name] preds = pipeline.predict(x_test) round_predictions.append(preds) for i in range(len(preds)): self.predictions[pipeline_name].append(preds[i]) pipeline_counter += 1 return round_predictions
[docs] def predict_proba(self, x_test): """ Predicts with probabilities the values for a dataset. \n Parameters \n ---------- \n x_test : the test samples \n """ pipelines = list(self.catalogue.keys()) pipelines.sort() pipeline_counter = 0 round_predictions = [] for pipeline_name in pipelines: pipeline = self.catalogue[pipeline_name] preds = pipeline.predict_proba(x_test) round_predictions.append(preds) preds_abs = np.argmax(preds, axis=1) for i in range(len(preds)): self.predictions_proba[pipeline_name].append(preds[i]) self.predictions[pipeline_name].append(preds_abs[i]) pipeline_counter += 1 return round_predictions
""" Scores """
[docs] def final_score(self): """ Final scoring function. For each pipeline, a score, confusion matrix, precision, recall, and ROC curve is created. """ labels = self.labels pipelines = list(self.catalogue.keys()) pipelines.sort() expected = self.expected_answers try: if self.nb_subj != 1: labels = np.concatenate(labels) for pipeline_name in pipelines: predictions_pipeline = self.predictions[pipeline_name] score = 0 precision = 0 recall = 0 confusion_matrix = [[0 for _ in range(self.nb_paradigms)] for _ in range(self.nb_paradigms)] for pred_idx in range(len(predictions_pipeline)): confusion_matrix[expected[pred_idx]][predictions_pipeline[pred_idx]] += 1 corrects = np.sum([confusion_matrix[i][i] for i in range(self.nb_paradigms)]) score = corrects / np.sum(confusion_matrix) if self.nb_paradigms == 2: if confusion_matrix[1][1] + confusion_matrix[0][1] != 0: precision = confusion_matrix[1][1] / (confusion_matrix[1][1] + confusion_matrix[0][1]) else: precision = 0 if confusion_matrix[1][1] + confusion_matrix[1][0] != 0: recall = confusion_matrix[1][1] / (confusion_matrix[1][1] + confusion_matrix[1][0]) else: recall = 0 self.precisions[pipeline_name] = precision self.recalls[pipeline_name] = recall self.scores[pipeline_name] = score self.confusion_matrices[pipeline_name] = confusion_matrix if self.nb_paradigms == 2: # ROC curve preds_proba = self.predictions_proba[pipeline_name] positive_proba = [] for j in range(len(preds_proba)): positive_proba.append(preds_proba[j][1]) fpr, tpr, threshold = metrics.roc_curve(expected, positive_proba) roc_auc = metrics.auc(fpr, tpr) self.roc_infos[pipeline_name] = [fpr, tpr, roc_auc] if self.nb_paradigms == 2: auc_scores = deepcopy(self.roc_infos) for pipeline in self.roc_infos: auc_scores[pipeline] = auc_scores[pipeline][2] self.classifier_log.append(('pipelines : ', list(self.catalogue.keys()))) self.classifier_log.append(("scores", self.scores)) if self.nb_paradigms == 2: self.classifier_log.append(("scores auc", auc_scores)) self.classifier_log.append(("recalls", self.recalls)) self.classifier_log.append(("precisions", self.precisions)) except Exception as e: print(e) exit()
[docs] def score(self, x, y): """ Return the accuracy on the given test data and labels. \n Parameters \n ---------- \n x : test samples \n y : correct answers for x \n weights : Sample weights \n """ pipelines = list(self.catalogue.keys()) pipelines.sort() scores = {key: [] for key in list(self.catalogue.keys())} for pipeline_name in pipelines: pipeline = self.catalogue[pipeline_name] score = pipeline.score(x, y) scores[pipeline_name] = score return scores
[docs] def score_all_pipelines(self, x, y): """ Return the accuracy on the given predictions and labels for all trained pipelines. \n Parameters \n ---------- \n x : test samples \n y : correct answers for x \n """ scores = [0.0 for _ in range(len(self.catalogue.keys()))] for pipeline in list(x.keys()): for i in range(len(x[pipeline])): if x[pipeline][i] == y[i]: scores[pipeline] += 1 scores[pipeline] = scores[pipeline] / len(x[pipeline]) return scores
[docs] def predict_test_dataset(self): """ Predicts the left out test dataset, computes the score and shows the results. """ self.restore() self.dataset = self.test_dataset self.labels = self.test_labels dataset = np.concatenate(self.dataset) labels = np.concatenate(self.labels) self.expected_answers = labels self.predict_proba(dataset) self.final_score() print(self.scores) self.show_results(4)
[docs] def score_func_independent_feat_selection(self, estimator, x_test, y_test): """ Scoing function for the independent feature selection. The previously trained pipeline is used to predict the test dataset. \n At the end, the predicted data are compared to the correct answers, and the percentage of correctly classified data is considered the score. \n Parameters \n ---------- \n estimator : the estimator that predicts and scores \n x_test : the test dataset \n y_test : the correct answers to the dataset \n """ x_test = estimator.transform(x_test) score = 0 pipeline = estimator.pipeline predictions = pipeline.predict(x_test) for i in range(y_test.shape[0]): if predictions[i] == y_test[i]: score += 1 score = score / y_test.shape[0] return score
[docs] def score_func(self, estimator, x_test, y_test): """ Scoing function for the independent feature selection. The previously trained pipeline is used to predict the test dataset. At the end, the predicted data are compared to the correct answers, and the percentage of correctly classified data is considered the score. \n Parameters \n ---------- \n estimator : the estimator that predicts and scores \n x_test : the test dataset \n y_test : the correct answers to the dataset \n """ estimator_steps = estimator.named_steps.keys() i = 0 for step in estimator_steps: i += 1 if i == len(estimator_steps): break x_test = estimator.named_steps[step].transform(x_test) score = 0 predictions = estimator.named_steps[step].predict(x_test) for i in range(y_test.shape[0]): if predictions[i] == y_test[i]: score += 1 score = score / y_test.shape[0] return score
""" Plot """
[docs] def plot_ROC(self, nb_lines, nb_columns): """ Plots the ROC curves for all the pipelines from the catalogue. \n Parameters \n ---------- \n nb_lines : the number of lines to be used for all the pipelines \n nb_columns : the number of columns to be used for all the pipelines \n """ pipelines = list(self.catalogue.keys()) pipelines.sort() # pipeline_counter = 0 x = 0 y = 0 fig, axs = plt.subplots(nb_lines, nb_columns) for pipeline_name in pipelines: fpr, tpr, auc = self.roc_infos[pipeline_name] # plt.subplot(nb_lines, nb_columns, pipeline_counter + 1) # title = pipeline_name + " : " + str(self.scores[pipeline_name]) if nb_lines > 1: axs[x, y].set_title(pipeline_name) axs[x, y].plot(fpr, tpr, 'b', label='AUC = %0.2f' % auc) axs[x, y].legend(loc='lower right') axs[x, y].plot([0, 1], [0, 1], 'r--') axs[x, y].set_xlim([0, 1]) axs[x, y].set_ylim([0, 1]) axs[x, y].set_ylabel('True Positive Rate') axs[x, y].set_xlabel('False Positive Rate') # pipeline_counter += 1 y += 1 if y == nb_columns: y = 0 x += 1 else: axs[y].set_title(pipeline_name) axs[y].plot(fpr, tpr, 'b', label='AUC = %0.2f' % auc) axs[y].legend(loc='lower right') axs[y].plot([0, 1], [0, 1], 'r--') axs[y].set_xlim([0, 1]) axs[y].set_ylim([0, 1]) axs[y].set_ylabel('True Positive Rate') axs[y].set_xlabel('False Positive Rate') y += 1 fig.show()
[docs] def plot_confusion(self, nb_lines, nb_columns, names): """ Plots the confusion matrices for all the pipelines from the catalogue. \n Parameters \n ---------- \n nb_lines : the number of lines to be used for all the pipelines \n nb_columns : the number of columns to be used for all the pipelines \n names : list; list containing the names of the labels for the categories. \n """ pipelines = list(self.catalogue.keys()) pipelines.sort() pipeline_counter = 0 for pipeline_name in pipelines: confusion_matrix = self.confusion_matrices[pipeline_name] plotted_confusion_matrix = [[] for _ in range(len(confusion_matrix))] for i in range(len(confusion_matrix)): for j in range(len(confusion_matrix)): plotted_confusion_matrix[i].append(confusion_matrix[i][j]/np.sum(confusion_matrix[i])) plt.subplot(nb_lines, nb_columns, pipeline_counter + 1) plt.title(pipeline_name) sn.heatmap(plotted_confusion_matrix, annot=True, cmap="YlGnBu", vmin=0, vmax=1, xticklabels=names, yticklabels=names) pipeline_counter += 1 plt.show()
[docs] def show_results(self, nb_columns): """ Shows the ROC curves and the confusion matrices for all the pipelines. \n Parameters \n ---------- \n nb_columns : the number of columns to be used for all the pipelines \n names : list; list containing the names of the labels for the categories. \n """ pipelines = list(self.catalogue.keys()) pipelines.sort() nb_lines = math.ceil(len(pipelines) / nb_columns) # Confusion self.plot_confusion(nb_lines, nb_columns, self.event_names) time.sleep(1) # ROC if self.nb_paradigms == 2: self.plot_ROC(nb_lines, nb_columns)
[docs] def plot_ROC_(self, nb_columns): pipelines = list(self.catalogue.keys()) pipelines.sort() nb_lines = math.ceil(len(pipelines) / nb_columns) # ROC if self.nb_paradigms == 2: self.plot_ROC(nb_lines, nb_columns)
[docs] def plot_confusion_(self, nb_columns): pipelines = list(self.catalogue.keys()) pipelines.sort() nb_lines = math.ceil(len(pipelines) / nb_columns) # Confusion self.plot_confusion(nb_lines, nb_columns, self.event_names)
""" Classification """
[docs] def classify(self, dataset, dataset_path=None, test_dataset_size=5, cv_value=5, independent_features_selection=False, channels_to_select=20, use_groups=True, tune_hypers=False, classify_test=False): """ Global classification method of the library. (inter-subjects)\n Reads the dataset, can apply independent features selection, can tune hyperparameters, fits, predicts, scores, and shows results.\n Parameters \n ---------- dataset : string; the path to the dataset \n test_dataset_size : int; the fraction of the dataset to be considered for testing \n pre_epoched : boolean; whether the dataset is already epoched or not\n tmin, tmax : time limits for the epochs \n bads : list of electrodes to be rejected \n picks : list of electrodes to be worked on \n filtering : tuple containing the higher and lower frequencies to filter the data \n tmin, tmax : tmin, tmax for delimiting the epochs in time \n ICA : Boolean, whether to apply Independent Component Analysis or not\n resample : boolean, whether to resample the data at 512 Hz or not \n baseline : the baseline to be applied to the data \n cv_value : int; the number of folds for cross validation. If None, the number of subjects will be used (Leave one out) \n independent_features_selection : boolean; whether to apply independent features selection or not \n channels_to_select : int or None; the number of channels to be selected or None if automatic number selection \n use_groups : boolean; whether to use groups for cross validation or not \n tune_hypers : boolean; whether to tune hyperparameters or not \n names : list; names of the categories \n classify_test : boolean; whether there should be a separate test dataset or not \n """ self.dataset = dataset self.labels = [] self.event_names = [] self.electrodes = dataset.info["ch_names"] labels_dic = {} labels_idx = 0 for event in self.dataset.events: event_label = event[2] if event_label not in labels_dic: labels_dic[event_label] = labels_idx for key in dataset.event_id.keys(): if dataset.event_id[key] == event_label: self.event_names.append(key) labels_idx += 1 self.labels.append(labels_dic[event_label]) self.labels = np.asarray(self.labels) self.nb_paradigms = len(self.dataset.event_id) try: if classify_test and test_dataset_size != 0: limit_dataset = self.dataset.shape[0] // test_dataset_size limit_dataset_0 = limit_dataset // 2 limit_dataset_1 = limit_dataset - limit_dataset_0 self.test_dataset = np.concatenate( [self.dataset[:limit_dataset_0], self.dataset[-limit_dataset_1:]]) self.dataset = self.dataset[limit_dataset_0:-limit_dataset_1] self.test_labels = np.concatenate([self.labels[:limit_dataset_0], self.labels[-limit_dataset_1:]]) self.labels = self.labels[limit_dataset_0:-limit_dataset_1] except Exception as e: print(e) if use_groups: cv = GroupKFold(cv_value) else: cv = KFold(cv_value, shuffle=True, random_state=42) if independent_features_selection: try: self.independent_features_selection(False, channels_to_select=channels_to_select, use_groups=use_groups) except Exception as e: print("Independent features selection error") print(e) if tune_hypers: try: self.tune_hyperparameters(cv_value, use_sources=False, use_groups=use_groups) except Exception as e: print("Hyperparameters tuning error") print(e) if use_groups: groups = self.groups else: groups = None dataset = np.asarray(self.dataset) labels = self.labels for train_index, test_index in cv.split(dataset, labels, groups=groups): x_train = dataset[train_index] y_train = labels[train_index] x_test = dataset[test_index] y_test = labels[test_index] self.fit(x_train, y_train) predictions_proba = self.predict_proba(x_test) scores = self.score(x_test, y_test) for pipeline in scores.keys(): self.scores_fold[pipeline].append(scores[pipeline]) predictions_proba = np.argmax(predictions_proba, axis=2) self.expected_answers.extend(y_test) self.final_score() pipelines = list(self.catalogue.keys()) pipelines.sort() print(self.classifier_log) self.classifier_log.append(('Predictions', "see 'predictions.npy' in dataset folder")) np.save(os.path.join(dataset_path, "predictions.npy"), self.predictions) self.save_program_log(dataset_path) if classify_test: if test_dataset_size == 0: text = "Test dataset size is 0. Test dataset estimation can not be applied." raise Exception(text) self.predict_test_dataset()
[docs] def classify_intraSubject(self, dataset, divided_dataset=True, nb_subj=None, test_dataset_size=5, pre_epoched=True, tmin=-0.2, tmax=0.5, bads=None, picks=None, filtering=[None, None], ICA=False, resample=False, baseline=None, event_ids=[None, None], reference=None, cv_value=5, independent_features_selection=False, channels_to_select=20, tune_hypers=False, names=[0, 1], classify_test=False, use_all_pipelines=False): """ Global classification method of the library. (intra-subject) \n Reads the dataset, can apply independent features selection, can tune hyperparameters, fits, predicts, scores, and shows results. \n Parameters \n ---------- \n dataset : string; the path to the dataset \n nb_subj : int; number of subjects to consider \n test_dataset_size : int; the fraction of the dataset to be considered for testing \n pre_epoched : boolean; whether or not the dataset is already epoched \n tmin, tmax : time limits for the epochs \n bads : list of electrodes to be rejected \n picks : list of electrodes to be worked on \n filtering : tuple containing the higher and lower frequencies to filter the data \n tmin, tmax : tmin, tmax for delimiting the epochs in time \n ICA : Boolean, whether or not to apply Independent Component Analysis \n resample : boolean, whether or not to resample the data at 512 Hz \n baseline : the baseline to be applied to the data \n cv_value : int; the number of folds for cross validation. If None, the number of subjects will be used (Leave one out) \n independent_features_selection : boolean; whether or not to apply independent features selection \n channels_to_select : int or None; the number of channels to be selected or None if automatic number selection \n tune_hypers : boolean; whether or not to tune hyperparameters \n names : list; names of the categories \n classify_test : boolean; whether or not there should be a separate test dataset \n use_all_pipelines : boolean; whether or not all the pipelines should be used for classification or only a subset \n """ if not use_all_pipelines: self.delete_pipelines(['XdawnCov', 'Xdawn', 'CSP', 'Cosp', 'HankelCov', 'CSSP', 'PSD', 'FgMDM']) if nb_subj is None: nb_subj = self.count_subjects(dataset) if test_dataset_size != 0: limit_dataset = nb_subj // test_dataset_size self.read_all_files(dataset, nb_subj, divided_dataset=divided_dataset, tmin=tmin, tmax=tmax, pre_epoched=pre_epoched, bads=bads, picks=picks, filtering=filtering, ICA=ICA, resample=resample, baseline=baseline, event_ids=event_ids, reference=reference) if classify_test: if test_dataset_size != 0: self.test_dataset = self.dataset[-limit_dataset:] self.dataset = self.dataset[:-limit_dataset] self.test_labels = self.labels[-limit_dataset:] self.labels = self.labels[:-limit_dataset] idx = nb_subj - limit_dataset for i in range(idx, nb_subj): lst_idx = np.where(self.groups == i) self.test_groups = np.append(self.test_groups, self.groups[lst_idx]) self.groups = np.delete(self.groups, lst_idx) loo = LeaveOneOut() cv = KFold(cv_value) if independent_features_selection: self.independent_features_selection(False, cv=cv_value, channels_to_select=channels_to_select, use_groups=True) if tune_hypers: self.tune_hyperparameters(cv_value, use_sources=False, use_groups=True) for train_index, test_index in loo.split(self.dataset): x_train = np.asarray(self.dataset[test_index[0]]) y_train = np.asarray(self.labels[test_index[0]]) for train, test in cv.split(x_train, y_train): self.fit(x_train[train], y_train[train]) self.predict_proba(x_train[test]) self.expected_answers.extend(y_train[test]) self.final_score() print(self.scores) pipelines = list(self.catalogue.keys()) pipelines.sort() print(pipelines) self.show_results(4, names) print(self.classifier_log) self.classifier_log.append(('Predictions', "see 'predictions.npy' in dataset folder")) np.save(os.path.join(dataset_path, "predictions.npy"), self.predictions) self.save_program_log(dataset_path) if classify_test: if test_dataset_size == 0: text = "Test dataset size is 0. Test dataset estimation can not be applied." raise Exception(text) self.predict_test_dataset()
[docs] def classify_with_CNN(self, dataset, nb_subj=None, divided_dataset=True, pre_epoched=True, tmin=0, tmax=0.5, bads=None, picks=None, filtering=[None, None], ICA=False, resample=False, baseline=None, event_ids=[None, None], reference=None, test_size=5): """ Global classification method of the library. (inter-subjects) \n Reads the dataset, can apply independent features selection, can tune hyperparameters, fits, predicts, scores, and shows results. \n Parameters \n ---------- \n dataset : string; the path to the dataset \n nb_subj : int; number of subjects to consider \n test_dataset_size : int; the fraction of the dataset to be considered for testing \n pre_epoched : boolean; whether or not the dataset is already epoched \n tmin, tmax : time limits for the epochs \n bads : list of electrodes to be rejected \n picks : list of electrodes to be worked on \n filtering : tuple containing the higher and lower frequencies to filter the data \n tmin, tmax : tmin, tmax for delimiting the epochs in time \n ICA : Boolean, whether or not to apply Independent Component Analysis \n resample : boolean, whether or not to resample the data at 512 Hz \n baseline : the baseline to be applied to the data \n cv_value : int; the number of folds for cross validation. If None, the number of subjects will be used (Leave one out) \n independent_features_selection : boolean; whether or not to apply independent features selection \n channels_to_select : int or None; the number of channels to be selected or None if automatic number selection \n use_groups : boolean; whether or not to use groups for cross validation \n tune_hypers : boolean; whether or not to tune hyperparameters \n names : list; names of the categories \n classify_test : boolean; whether or not there should be a separate test dataset \n use_all_pipelines : boolean; whether or not all the pipelines should be used for classification or only a subset \n """ # dataset_path = dataset if nb_subj is None: self.nb_subj = self.count_subjects(dataset) nb_subj = self.nb_subj else: self.nb_subj = nb_subj self.read_all_files(dataset, nb_subj, divided_dataset=divided_dataset, tmin=tmin, tmax=tmax, pre_epoched=pre_epoched, bads=bads, picks=picks, filtering=filtering, ICA=ICA, resample=resample, baseline=baseline, event_ids=event_ids, reference=reference) """ if self.nb_subj == 1: self.dataset = self.dataset[0] self.labels = self.labels[0] use_groups=False """ limit_dataset = nb_subj // test_size x_test = np.concatenate(self.dataset[0:limit_dataset]) x_train = np.concatenate(self.dataset[limit_dataset:]) y_test = np.concatenate(self.labels[0:limit_dataset]) y_train = np.concatenate(self.labels[limit_dataset:]) x_train = x_train.reshape(x_train.shape[0], x_train.shape[1], x_train.shape[2], 1) x_test = x_test.reshape(x_test.shape[0], x_test.shape[1], x_test.shape[2], 1) y_test_biss = [] y_train_biss = [] for elem in range(y_test.shape[0]): if y_test[elem] == 0: y_test_biss.append([1, 0]) elif y_test[elem] == 1: y_test_biss.append([0, 1]) else: print("error here test") for elem in range(y_train.shape[0]): if y_train[elem] == 0: y_train_biss.append([1, 0]) elif y_train[elem] == 1: y_train_biss.append([0, 1]) else: print("error here train") y_train = np.asarray(y_train_biss) y_test = np.asarray(y_test_biss) cnn = CNN() cnn.compile() cnn.train(x_train, y_train, x_test, y_test) print(cnn.evaluate(x_test, y_test)) cnn.show_results()
""" Others """
[docs] def count_subjects(self, directory): """ Count the number of subjects in the dataset. \n Parameters \n ---------- \n directory : the path to the dataset \n """ subfolders = os.listdir(directory) if subfolders[0] != "epoched": folder = subfolders[0] else: folder = subfolders[1] list_of_files = [] path = os.path.join(directory, folder) for raw_eeg_file in os.listdir(path): if (".set" in raw_eeg_file) or ("directory" in raw_eeg_file) or (".txt" in raw_eeg_file): list_of_files.append(raw_eeg_file) self.nb_subj = len(list_of_files) return len(list_of_files)
[docs] def save_program_log(self, path): """ Saves the program log in a file. \n Parameters \n ---------- \n path : string; path to the place where the file will be saved. \n """ path = os.path.join(path, "program_log.txt") file = open(path, "w") for element in self.classifier_log: text = str(element[0]) + " : " + str(element[1]) + "\n" file.write(text)
[docs] def create_folder(self, k): """ Creates the k-folds folder for the data. \n Parameters \n ---------- \n k : int \n number of folds \n """ folder = KFold(k, shuffle=True, random_state=42) return folder