import numpy as np
import dask.bag as db
import pandas as pd
from neuronunit import tests
from neuronunit.models.reduced import ReducedModel
from neuronunit.optimization.model_parameters import model_params, path_params
import numpy
from neuronunit.optimization import model_parameters as modelp
from itertools import repeat
import copy
import math
import quantities as pq
import numpy as np
from pyneuroml import pynml
from deap import base
from neuronunit.optimization.data_transport_container import DataTC
import os
import pickle
from itertools import repeat
import neuronunit
import multiprocessing
npartitions = multiprocessing.cpu_count()
from collections import Iterable
from numba import jit
from sklearn.model_selection import ParameterGrid
from itertools import repeat
from collections import OrderedDict
import logging
logger = logging.getLogger('__main__')
from neuronunit.tests.fi import RheobaseTestP# as discovery
from neuronunit.tests.fi import RheobaseTest# as discovery
import dask.bag as db
# The rheobase has been obtained seperately and cannot be db mapped.
# Nested DB mappings dont work.
from itertools import repeat
# DEAP mutation strategies:
# https://deap.readthedocs.io/en/master/api/tools.html#deap.tools.mutESLogNormal
[docs]class WSListIndividual(list):
"""Individual consisting of list with weighted sum field"""
def __init__(self, *args, **kwargs):
"""Constructor"""
self.rheobase = None
super(WSListIndividual, self).__init__(*args, **kwargs)
[docs]class WSFloatIndividual(float):
"""Individual consisting of list with weighted sum field"""
def __init__(self, *args, **kwargs):
"""Constructor"""
self.rheobase = None
super(WSFloatIndividual, self).__init__()
[docs]def mint_generic_model(backend):
LEMS_MODEL_PATH = path_params['model_path']
return ReducedModel(LEMS_MODEL_PATH,name = str('vanilla'),backend = str(backend))
[docs]@jit
def write_opt_to_nml(path,param_dict):
'''
Write optimimal simulation parameters back to NeuroML.
'''
orig_lems_file_path = path_params['model_path']
more_attributes = pynml.read_lems_file(orig_lems_file_path,
include_includes=True,
debug=False)
for i in more_attributes.components:
new = {}
if str('izhikevich2007Cell') in i.type:
for k,v in i.parameters.items():
units = v.split()
if len(units) == 2:
units = units[1]
else:
units = 'mV'
new[k] = str(param_dict[k]) + str(' ') + str(units)
i.parameters = new
fopen = open(path+'.nml','w')
more_attributes.export_to_file(fopen)
fopen.close()
return
[docs]def bridge_judge(test_and_models):
# Temporarily patch sciunit judge code, which seems to be broken.
#
#
(test, dtc) = test_and_models
obs = test.observation
backend_ = dtc.backend
model = mint_generic_model(backend_)
model.set_attrs(**dtc.attrs)
pred = test.generate_prediction(model)
if pred is not None:
if hasattr(dtc,'prediction'):# is not None:
dtc.prediction[test] = pred
dtc.observation[test] = test.observation['mean']
else:
dtc.prediction = None
dtc.observation = None
dtc.prediction = {}
dtc.prediction[test] = pred
dtc.observation = {}
dtc.observation[test] = test.observation['mean']
#dtc.prediction = pred
score = test.compute_score(obs,pred)
if not hasattr(dtc,'agreement'):
dtc.agreement = None
dtc.agreement = {}
try:
dtc.agreement[str(test)] = np.abs(test.observation['mean'] - pred['mean'])
except:
try:
dtc.agreement[str(test)] = np.abs(test.observation['value'] - pred['value'])
except:
try:
dtc.agreement[str(test)] = np.abs(test.observation['mean'] - pred['value'])
except:
pass
#print(score.norm_score)
else:
score = None
return score, dtc
[docs]def get_rh(dtc,rtest):
place_holder = {}
place_holder['n'] = 86
place_holder['mean'] = 10*pq.pA
place_holder['std'] = 10*pq.pA
place_holder['value'] = 10*pq.pA
rtest = RheobaseTestP(observation=place_holder,name='a Rheobase test')
dtc.rheobase = None
backend_ = dtc.backend
model = mint_generic_model(backend_)
model.set_attrs(**dtc.attrs)
#model = mint_generic_model()
dtc.rheobase = rtest.generate_prediction(model)#['value']
if dtc.rheobase is None:
dtc.rheobase = - 1.0
return dtc
[docs]def dtc_to_rheo(dtc):
# If test taking data, and objects are present (observations etc).
# Take the rheobase test and store it in the data transport container.
dtc.scores = {}
dtc.score = {}
backend_ = dtc.backend
model = mint_generic_model(backend_)
model.set_attrs(**dtc.attrs)
rtest = [ t for t in dtc.tests if str('RheobaseTestP') == t.name ]
if len(rtest):
rtest = rtest[0]
dtc.rheobase = rtest.generate_prediction(model)
#print(dtc.rheobase)
if dtc.rheobase is not None and dtc.rheobase !=-1.0:
dtc.rheobase = dtc.rheobase['value']
obs = rtest.observation
score = rtest.compute_score(obs,dtc.rheobase)
dtc.scores[str('RheobaseTestP')] = 1.0 - score.norm_score
if dtc.score is not None:
dtc = score_proc(dtc,rtest,copy.copy(score))
rtest.params['injected_square_current']['amplitude'] = dtc.rheobase
else:
dtc.rheobase = - 1.0
dtc.scores[str('RheobaseTestP')] = 1.0
else:
# otherwise, if no observation is available, or if rheobase test score is not desired.
# Just generate rheobase predictions, giving the models the freedom of rheobase
# discovery without test taking.
dtc = get_rh(dtc,rtest)
return dtc
[docs]def score_proc(dtc,t,score):
dtc.score[str(t)] = {}
#print(score.keys())
if hasattr(score,'norm_score'):
dtc.score[str(t)]['value'] = copy.copy(score.norm_score)
if hasattr(score,'prediction'):
if type(score.prediction) is not type(None):
dtc.score[str(t)][str('prediction')] = score.prediction
dtc.score[str(t)][str('observation')] = score.observation
boolean_means = bool('mean' in score.observation.keys() and 'mean' in score.prediction.keys())
boolean_value = bool('value' in score.observation.keys() and 'value' in score.prediction.keys())
if boolean_means:
dtc.score[str(t)][str('agreement')] = np.abs(score.observation['mean'] - score.prediction['mean'])
if boolean_value:
dtc.score[str(t)][str('agreement')] = np.abs(score.observation['value'] - score.prediction['value'])
dtc.agreement = dtc.score
return dtc
[docs]def switch_logic(tests):
# move this logic into sciunit tests
for t in tests:
t.passive = None
t.active = None
active = False
passive = False
if str('RheobaseTest') == t.name:
active = True
passive = False
elif str('RheobaseTestP') == t.name:
active = True
passive = False
elif str('InjectedCurrentAPWidthTest') == t.name:
active = True
passive = False
elif str('InjectedCurrentAPAmplitudeTest') == t.name:
active = True
passive = False
elif str('InjectedCurrentAPThresholdTest') == t.name:
active = True
passive = False
elif str('RestingPotentialTest') == t.name:
passive = True
active = False
elif str('InputResistanceTest') == t.name:
passive = True
active = False
elif str('TimeConstantTest') == t.name:
passive = True
active = False
elif str('CapacitanceTest') == t.name:
passive = True
active = False
t.passive = passive
t.active = active
return tests
[docs]def active_values(keyed,rheobase):
DURATION = 1000.0*pq.ms
DELAY = 100.0*pq.ms
keyed['injected_square_current'] = {}
keyed['injected_square_current']['delay']= DELAY
keyed['injected_square_current']['duration'] = DURATION
if type(rheobase) is type({str('k'):str('v')}):
keyed['injected_square_current']['amplitude'] = float(rheobase['value'])*pq.pA
else:
keyed['injected_square_current']['amplitude'] = rheobase
return keyed
[docs]def passive_values(keyed):
DURATION = 500.0*pq.ms
DELAY = 200.0*pq.ms
keyed['injected_square_current'] = {}
keyed['injected_square_current']['delay']= DELAY
keyed['injected_square_current']['duration'] = DURATION
keyed['injected_square_current']['amplitude'] = -10*pq.pA
return keyed
[docs]def allocate_worst(dtc,tests):
# If the model fails tests, and cannot produce model driven data
# Allocate the worst score available.
for t in tests:
dtc.scores[str(t)] = 1.0
dtc.score[str(t)] = 1.0
return dtc
[docs]def nunit_evaluation_df(dtc):
# Inputs single data transport container modules, and neuroelectro observations that
# inform test error error_criterion
# Outputs Neuron Unit evaluation scores over error criterion
# same method as below but with data frame.
tests = dtc.tests
dtc = copy.copy(dtc)
dtc.model_path = path_params['model_path']
LEMS_MODEL_PATH = path_params['model_path']
df = pd.DataFrame(index=list(tests),columns=['observation','prediction','disagreement'])#,columns=list(reduced_cells.keys()))
if dtc.rheobase == -1.0 or type(dtc.rheobase) is type(None):
dtc = allocate_worst(tests,dtc)
else:
for k,t in enumerate(tests):
if str('RheobaseTest') != t.name and str('RheobaseTestP') != t.name:
t.params = dtc.vtest[k]
score, dtc= bridge_judge((t,dtc))
if score is not None:
if score.norm_score is not None:
dtc.scores[str(t)] = 1.0 - score.norm_score
df.iloc[k]['observation'] = t.observation['mean']
try:
agreement = np.abs(t.observation['mean'] - pred['value'])
df.iloc[k]['prediction'] = pred['value']
df.iloc[k]['disagreement'] = agreement
except:
agreement = np.abs(t.observation['mean'] - pred['mean'])
df.iloc[k]['prediction'] = pred['mean']
df.iloc[k]['disagreement'] = agreement
else:
print('gets to None score type')
# compute the sum of sciunit score components.
dtc.summed = dtc.get_ss()
dtc.df = df
return dtc
[docs]def nunit_evaluation(dtc):
# Inputs single data transport container modules, and neuroelectro observations that
# inform test error error_criterion
# Outputs Neuron Unit evaluation scores over error criterion
tests = dtc.tests
dtc = copy.copy(dtc)
dtc.model_path = path_params['model_path']
LEMS_MODEL_PATH = path_params['model_path']
#df = pd.DataFrame(index=list(tests),columns=['observation','prediction','disagreement'])#,columns=list(reduced_cells.keys()))
if dtc.rheobase == -1.0 or type(dtc.rheobase) is type(None):
dtc = allocate_worst(tests,dtc)
else:
for k,t in enumerate(tests):
if str('RheobaseTest') != t.name and str('RheobaseTestP') != t.name:
t.params = dtc.vtest[k]
score, dtc= bridge_judge((t,dtc))
if score is not None:
if score.norm_score is not None:
dtc.scores[str(t)] = 1.0 - score.norm_score
else:
print('gets to None score type')
# compute the sum of sciunit score components.
dtc.summed = dtc.get_ss()
#dtc.df = df
return dtc
[docs]def evaluate(dtc):
error_length = len(dtc.scores.keys())
# assign worst case errors, and then over write them with situation informed errors as they become available.
fitness = [ 1.0 for i in range(0,error_length) ]
for k,t in enumerate(dtc.scores.keys()):
fitness[k] = dtc.scores[str(t)]
return tuple(fitness,)
[docs]def get_trans_list(param_dict):
trans_list = []
for i,k in enumerate(list(param_dict.keys())):
trans_list.append(k)
return trans_list
[docs]def add_constant(hold_constant, pop, td):
hold_constant = OrderedDict(hold_constant)
for k in hold_constant.keys():
td.append(k)
for p in pop:
for v in hold_constant.values():
p.append(v)
return pop,td
[docs]def update_dtc_pop(pop, td):
'''
inputs a population of genes/alleles, the population size MU, and an optional argument of a rheobase value guess
outputs a population of genes/alleles, a population of individual object shells, ie a pickleable container for gene attributes.
Rationale, not every gene value will result in a model for which rheobase is found, in which case that gene is discarded, however to
compensate for losses in gene population size, more gene samples must be tested for a successful return from a rheobase search.
If the tests return are successful these new sampled individuals are appended to the population, and then their attributes are mapped onto
corresponding virtual model objects.
'''
if pop[0].backend is not None:
_backend = pop[0].backend
if isinstance(pop, Iterable):# and type(pop[0]) is not type(str('')):
xargs = zip(pop,repeat(td),repeat(_backend))
npart = np.min([multiprocessing.cpu_count(),len(pop)])
bag = db.from_sequence(xargs, npartitions = npart)
dtcpop = list(bag.map(transform).compute())
assert len(dtcpop) == len(pop)
else:
for p in pop:
p.td = td
p.backend = str(_backend)
# above replaces need for this line:
xargs = (pop,td,repeat(backend))
# In this case pop is not really a population but an individual
# but parsimony of naming variables
# suggests not to change the variable name to reflect this.
dtcpop = [ transform(xargs) ]
assert exec('dtcpop[0].backend is '+str(_backend)+')')
return dtcpop
[docs]def run_ga(explore_edges, max_ngen, test, free_params = None, hc = None, NSGA = None, MU = None, seed_pop = None, model_type = str('RAW')):
from bluepyopt.deapext.optimisations import SciUnitOptimization
# Inputs:
# - MODEL_PARAMS, is a dictionary of model parameter ranges (the boundaries that define regions where parameters are free to vary).
# You may not want all the parameters to be allowed to vary, so the optinal key word argument: free_params
# specifies the free_parameters, and every parameter not in that list is held constant.
# - free_params is type dictionary it takes a list of the dictionary keys for the parameters that are free to vary.
# - MU type int is the population size and
# - NGEN type int, is the number of generations to evaluate.
# MU*NGEN = total maximum number of models that could be evaluated, although DEAP is smart and
# doesn't necessarily evaluate every model in MU*NGEN
# - NSGA, type Boolean tells the optimizer to use the Pareto Front based approach (alternative is select Best).
# - string Model_type tells the Optimizer what simulator backend model combinartion to use.
# - DEAP population type list seed_pop pertains to starting the GA, with an informed guess of where to look.
# For example if you did a coarse grained grid search first, and want the first genes to look in the location
# of the coarse grained optima first.
# - use_test is a list of NU tests (a NeuronUnit tests suite), or a singular test.
# Outputs:
# - ga_out a dictionary of GA optimization results
# - stats,
# - and the pareto-front the paretofront (key 'pf')
ss = {}
for k in free_params:
ss[k] = explore_edges[k]
if type(MU) == type(None):
MU = 2**len(list(free_params))
# make sure that the gene population size is divisible by 4.
if NSGA == True:
selection = str('selNSGA')
else:
selection = str('selIBEA')
max_ngen = int(np.floor(max_ngen))
DO = SciUnitOptimization(offspring_size = MU, error_criterion = test, boundary_dict = ss, backend = model_type, hc = hc,selection = selection, seed_pop= seed_pop)#, selection = selection, boundary_dict = ss, elite_size = 2, hc=hc)
if seed_pop is not None:
# This is a re-run condition.
DO.setnparams(nparams = len(free_params), boundary_dict = ss)
DO.seed_pop = seed_pop
DO.setup_deap()
# This run condition should not need same arguments as above.
ga_out = DO.run(max_ngen = max_ngen)#offspring_size = MU, )
return ga_out, DO
[docs]def init_pop(pop, td, tests):
from neuronunit.optimization.exhaustive_search import update_dtc_grid
dtcpop = list(update_dtc_pop(pop, td))
for d in dtcpop:
d.tests = tests
if hasattr(pop[0],'backend'):
d.backend = pop[0].backend
if hasattr(pop[0],'hc'):
constant = pop[0].hc
for d in dtcpop:
if constant is not None:
if len(constant):
d.constants = constant
d.add_constant()
return pop, dtcpop
[docs]def obtain_rheobase(pop, td, tests):
'''
Calculate rheobase for a given population pop
Ordered parameter dictionary td
and rheobase test rt
'''
pop, dtcpop = init_pop(pop, td, tests)
dtcpop = list(map(dtc_to_rheo,dtcpop))
for ind,d in zip(pop,dtcpop):
if type(d.rheobase) is not type(1.0):
ind.rheobase = d.rheobase
d.rheobase = d.rheobase
else:
ind.rheobase = -1.0
d.rheobase = -1.0
return pop, dtcpop
[docs]def new_genes(pop,dtcpop,td):
'''
some times genes explored will not return
un-usable simulation parameters
genes who have no rheobase score
will be discarded.
BluePyOpt needs a stable
gene number however
This method finds how many genes have
been discarded, and tries to build new genes
from the existing distribution of gene values, by mimicing a normal random distribution
of genes that are not deleted.
if a rheobase value cannot be found for a given set of dtc model more_attributes
delete that model, or rather, filter it out above, and make new genes based on
the statistics of remaining genes.
it's possible that they wont be good models either, so keep trying in that event.
a new model from the mean of the pre-existing model attributes.
'''
impute_gene = [] # impute individual, not impute index
ind = WSListIndividual()
for t in td:
mean = np.mean([ d.attrs[t] for d in dtcpop ])
std = np.std([ d.attrs[t] for d in dtcpop ])
sample = numpy.random.normal(loc=mean, scale=2*std, size=1)[0]
ind.append(sample)
dtc = DataTC()
LEMS_MODEL_PATH = str(neuronunit.__path__[0])+str('/models/NeuroML2/LEMS_2007One.xml')
dtc.attrs = {}
for i,j in enumerate(ind):
dtc.attrs[str(td[i])] = j
dtc.backend = dtcpop[0].backend
dtc.tests = dtcpop[0].tests
dtc = dtc_to_rheo(dtc)
ind.rheobase = dtc.rheobase
return ind,dtc
[docs]def serial_route(pop,td,tests):
'''
parallel list mapping only works with an iterable collection.
Serial route is intended for single items.
'''
if type(dtc.rheobase) is type(None):
print('Error Score bad model')
for t in tests:
dtc.scores = {}
dtc.get_ss()
else:
dtc = format_test((dtc,tests))
dtc = nunit_evaluation((dtc,tests))
return pop, dtc
[docs]def filtered(pop,dtcpop):
dtcpop = [ dtc for dtc in dtcpop if dtc.rheobase!=-1.0 ]
pop = [ p for p in pop if p.rheobase!=-1.0 ]
dtcpop = [ dtc for dtc in dtcpop if dtc.rheobase is not None ]
pop = [ p for p in pop if p.rheobase is not None ]
assert len(pop) == len(dtcpop)
return (pop,dtcpop)
[docs]def parallel_route(pop,dtcpop,tests,td):
for d in dtcpop:
d.tests = copy.copy(tests)
dtcpop = list(map(format_test,dtcpop))
#import pdb; pdb.set_trace()
npart = np.min([multiprocessing.cpu_count(),len(dtcpop)])
dtcbag = db.from_sequence(dtcpop, npartitions = npart)
dtcpop = list(dtcbag.map(nunit_evaluation).compute())
for i,d in enumerate(dtcpop):
if not hasattr(pop[i],'dtc'):
pop[i] = WSListIndividual(pop[i])
pop[i].dtc = None
d.get_ss()
pop[i].dtc = copy.copy(d)
invalid_dtc_not = [ i for i in pop if not hasattr(i,'dtc') ]
return pop, dtcpop
[docs]def make_up_lost(pop,dtcpop,td):
before = len(pop)
(pop,dtcpop) = filtered(pop,dtcpop)
after = len(pop)
assert after>0
delta = before-after
if delta:
cnt = 0
while cnt < delta:
ind,dtc = new_genes(pop,dtcpop,td)
if dtc.rheobase != -1.0:
pop.append(ind)
dtcpop.append(dtc)
cnt += 1
return pop, dtcpop
import dask.bag as db
[docs]def test_runner(pop,td,tests,single_spike=True):
if single_spike:
pop, dtcpop = obtain_rheobase(pop, td, tests)
pop, dtcpop = make_up_lost(pop,dtcpop,td)
# there are many models, which have no actual rheobase current injection value.
# filter, filters out such models,
# gew genes, add genes to make up for missing values.
# delta is the number of genes to replace.
else:
pop, dtcpop = init_pop(pop, td, tests)
pop,dtcpop = parallel_route(pop,dtcpop,tests,td)
for ind,d in zip(pop,dtcpop):
ind.dtc = d
if not hasattr(ind,'fitness'):
ind.fitness = copy.copy(pop[0].fitness)
return pop,dtcpop
[docs]def update_deap_pop(pop, tests, td, backend = None,hc = None):
'''
Inputs a population of genes (pop).
Returned neuronunit scored DataTransportContainers (dtcpop).
This method converts a population of genes to a population of Data Transport Containers,
Which act as communicatable data types for storing model attributes.
Rheobase values are found on the DTCs
DTCs for which a rheobase value of x (pA)<=0 are filtered out
DTCs are then scored by neuronunit, using neuronunit models that act in place.
'''
#pop = copy.copy(pop)
if hc is not None:
pop[0].hc = None
pop[0].hc = hc
if backend is not None:
pop[0].backend = None
pop[0].backend = backend
pop, dtcpop = test_runner(pop,td,tests)
for p,d in zip(pop,dtcpop):
p.dtc = d
return pop
[docs]def create_subset(nparams = 10, boundary_dict = None):
# used by GA to find subsets in parameter space.
if type(boundary_dict) is type(None):
raise ValueError('A parameter range dictionary was not supplied \
and the program doesnt know what value to explore.')
mp = modelp.model_params
key_list = list(mp.keys())
reduced_key_list = key_list[0:nparams]
else:
key_list = list(boundary_dict.keys())
reduced_key_list = key_list[0:nparams]
subset = { k:boundary_dict[k] for k in reduced_key_list }
return subset