I extract the data from the GDF file using Matlab/Biosig with filtering [4,40]Hz and downsample it at 125 Hz.
function [X1,X2,y1,y2] = BCICIV_2a()
clc
clear
biosig_installer;
%import data
for i=1:9
[s{i}, h{i}] = sload(strcat('/home/mouad/Project/EEGNet/Dataset/BCICIV_2a/X/A0',num2str(i),'T.gdf'),0 ,'OVERFLOWDETECTION:OFF');
[se{i}, he{i}] = sload(strcat('/home/mouad/Project/EEGNet/Dataset/BCICIV_2a/X/A0',num2str(i),'E.gdf'),0 ,'OVERFLOWDETECTION:OFF');
end
X1=extraction(s,h);
X2=extraction(se,he);
for i=1:9
load(strcat('/home/mouad/Project/EEGNet/Dataset/BCICIV_2a/y/A0',num2str(i),'T.mat'));
y1{i}=classlabel;
load(strcat('/home/mouad/Project/EEGNet/Dataset/BCICIV_2a/y/A0',num2str(i),'E.mat'));
y2{i}=classlabel;
end
end
function X=extraction(s,h)
fs=250;
temp=zeros(288,1,2*fs+1,25);
[P1,Q1]=butter(4,(2*[4 40])/250,'bandpass');
X=cell(1,9)
for i=1:9
pos=h{i}.EVENT.POS(find(h{i}.EVENT.TYP==769 | h{i}.EVENT.TYP==770 | h{i}.EVENT.TYP==771 | h{i}.EVENT.TYP==772 | h{i}.EVENT.TYP==783));
%(#samples, 1, #timepoints, #channels)
pb=pos+0.5*fs;
pe=pos+2.5*fs;
%X{i}=temp;
for j=1:size(pb)
t=s{i}(pb(j):pe(j),:);
for k=1:25
t(:,k)=filter(P1,Q1,t(:,k));
end
X{i}(j,1,:,:)=t(1:2:251,:);
end
end
end
And a modified python file.
import numpy as np
from sklearn.metrics import roc_auc_score, precision_score, recall_score, accuracy_score
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torch.nn.functional as F
import torch.optim as optim
import scipy.io
db_address={'BCI_IV_2a':'data_transform/BCICIV_2a_3.mat'}
def generate_db_data(nb_folds):
out=scipy.io.loadmat('data_transform/BCICIV_2a_3.mat')
#extract of train data
X_train_set=out['X1']
X_test_set=out['X2']
#extract results
y_train_set=out['y1']
y_test_set=out['y2']
L=range(0,9)
L.remove(nb_folds-1)
X_train=np.concatenate(X_train_set[0][L], axis=0).astype('float32')
X_val=X_train_set[0][nb_folds-1].astype('float32')
X_test=X_test_set[0][nb_folds-1].astype('float32')
y_train=np.concatenate(y_train_set[0][L], axis=0).astype('int16')-1
y_val=y_train_set[0][nb_folds-1].astype('int16')-1
y_test=y_test_set[0][nb_folds-1].astype('int16')-1
y_train=y_train.reshape((2304))
y_val=y_val.reshape((288))
y_test=y_test.reshape((288))
return X_train , y_train , X_val , y_val , X_test , y_test
def generate_ran_data():
X_train = np.random.rand(100, 1, 120, 64).astype('float32') # np.random.rand generates between [0, 1)
y_train = np.round(np.random.rand(100).astype('int16')) # binary data, so we round it to 0 or 1.
X_val = np.random.rand(100, 1, 120, 64).astype('float32')
y_val = np.round(np.random.rand(100).astype('int16'))
X_test = np.random.rand(100, 1, 120, 64).astype('float32')
y_test = np.round(np.random.rand(100).astype('int16'))
return X_train , y_train , X_val , y_val , X_test , y_test
class EEGNet(nn.Module):
def __init__(self):
super(EEGNet, self).__init__()
self.T = 251
# Layer 1
self.conv1 = nn.Conv2d(1, 16, (1, 25), padding = 0)#dim ch
self.batchnorm1 = nn.BatchNorm2d(16, False)
# Layer 2
self.padding1 = nn.ZeroPad2d((16, 17, 0, 1))
self.conv2 = nn.Conv2d(1, 4, (2, 32))
self.batchnorm2 = nn.BatchNorm2d(4, False)
self.pooling2 = nn.MaxPool2d(2, 4)
# Layer 3
self.padding2 = nn.ZeroPad2d((2, 1, 4, 3))
self.conv3 = nn.Conv2d(4, 4, (8, 4))
self.batchnorm3 = nn.BatchNorm2d(4, False)
self.pooling3 = nn.MaxPool2d((2, 4))
# FC Layer
# NOTE: This dimension will depend on the number of timestamps per sample in your data.
# I have 120 timepoints.
self.fc1 = nn.Linear(4*2*15, 4)
def forward(self, x):
# Layer 1
x = F.elu(self.conv1(x))
x = self.batchnorm1(x)
x = F.dropout(x, 0.25)
x = x.permute(0, 3, 1, 2)
# Layer 2
x = self.padding1(x)
x = F.elu(self.conv2(x))
x = self.batchnorm2(x)
x = F.dropout(x, 0.25)
x = self.pooling2(x)
# Layer 3
x = self.padding2(x)
x = F.elu(self.conv3(x))
x = self.batchnorm3(x)
x = F.dropout(x, 0.25)
x = self.pooling3(x)
# FC Layer
x = x.view(-1, 4*2*15)
x = F.softmax(self.fc1(x),dim=-1)
return x
def evaluate(model, X, Y, params = ["acc"]):
results = []
batch_size = 288
predicted = []
for i in range(len(X)/batch_size):
s = i*batch_size
e = i*batch_size+batch_size
inputs = Variable(torch.from_numpy(X_train[s:e]))
pred = model(inputs)
predicted.append(pred.data.cpu().numpy())
inputs = Variable(torch.from_numpy(X))
predicted = model(inputs)
predicted = predicted.data.cpu().numpy()
Y_p=(np.argmax(np.round(predicted), axis=1))
#print Y_p , Y , '\n'
for param in params:
if param == 'acc':
results.append(accuracy_score(Y, Y_p ))
if param == "auc":
results.append(roc_auc_score(Y, predicted))
if param == "recall":
results.append(recall_score(Y, np.round(predicted)))
if param == "precision":
results.append(precision_score(Y, np.round(predicted)))
if param == "fmeasure":
precision = precision_score(Y, np.round(predicted))
recall = recall_score(Y, np.round(predicted))
results.append(2*precision*recall/ (precision+recall))
return results
def run_folds(nb_folds):
global X_train , y_train , X_val , y_val , X_test , y_test
X_train , y_train , X_val , y_val , X_test , y_test = generate_db_data(1)
net = EEGNet()
print net.forward(Variable(torch.from_numpy(np.expand_dims(X_train[0], 0))))
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters())
batch_size = 32
results_loss=[]
results_train=[]
results_val=[]
results_test=[]
for epoch in range(2000): # loop over the dataset multiple times
print "\nEpoch ", epoch
running_loss = 0.0
for i in range(len(X_train)/batch_size-1):
s = i*batch_size
e = i*batch_size+batch_size
inputs = torch.from_numpy(X_train[s:e])
labels = torch.LongTensor(np.array([y_train[s:e]]).T*1.0)
# wrap them in Variable
inputs, labels = Variable(inputs), Variable(labels)
labels=labels[:,0]
#labels=labels
# zero the parameter gradients
optimizer.zero_grad()
#(inputs)
# forward + backward + optimize
outputs = net(inputs)
loss = criterion(outputs, labels)
loss.backward()
optimizer.step()
running_loss += loss.data[0]
# Validation accuracy
params = ["acc"]
#evaluate(net, X_train, y_train, params)
results_loss.append(running_loss)
results_train.append(evaluate(net, X_train, y_train, params))
results_val.append(evaluate(net, X_val, y_val, params))
results_test.append(evaluate(net, X_test, y_test, params))
print params
print "Training Loss ", results_loss[epoch]
print "Train - ", results_train[epoch]
print "Validation - ", results_val[epoch]
print "Test - ", results_test[epoch]
print "\n"
return net , results_loss , results_train , results_val , results_test
net , results_loss , results_train , results_val , results_test=run_folds(3)
print "out"
I'am testing it for a week and I tried many batch size length.