import numpy as np
from . import Utility_Functions as ut
import matplotlib.pyplot as plt
[docs]class Graph:
"""Graph instance must be initialised with the weighted adjacency matrix in 2D numpy array format.
On initialization it computes in-degrees, out-degrees, reciprocated degrees, out-strengths, in-strengths, reciprocated strengths and
triadic statistics such as occurrences, intensities and fluxes.
:param adjacency: Weighted adjacency matrix in 2D numpy array format.
:type adjacency: np.ndarray
"""
def __init__(
self,
adjacency = None
):
self.adjacency = adjacency
self.binary_adjacency = ut.binarize(adjacency)
self.dseq_right = ut.deg_right(self.binary_adjacency)
self.dseq_left = ut.deg_left(self.binary_adjacency)
self.dseq_rec = ut.deg_rec(self.binary_adjacency)
self.dseq_out = ut.deg_out(self.binary_adjacency)
self.dseq_in = ut.deg_in(self.binary_adjacency)
self.stseq_right = ut.st_right(self.binary_adjacency,self.adjacency)
self.stseq_left = ut.st_left(self.binary_adjacency,self.adjacency)
self.stseq_rec_out = ut.st_rec_out(self.binary_adjacency,self.adjacency)
self.stseq_rec_in = ut.st_rec_in(self.binary_adjacency,self.adjacency)
self.stseq_out = ut.deg_out(self.adjacency)
self.stseq_in = ut.deg_in(self.adjacency)
self.adj_right_emp,self.adj_left_emp,self.adj_rec_emp,self.adj_unrec_emp = ut.gen_binary_adjacencies(self.binary_adjacency)
self.w_adj_right_emp,self.w_adj_left_emp,self.w_adj_rec_emp,self.w_adj_unrec_emp = ut.gen_weighted_adjacencies(self.binary_adjacency,self.adjacency)
self.w_adj_rec_out_emp, self.w_adj_rec_in_emp = ut.gen_rec_weighted_adjacencies(self.binary_adjacency,self.adjacency)
self.Nm_emp =ut.triadic_occurrences(self.adj_right_emp,self.adj_left_emp,self.adj_rec_emp,self.adj_unrec_emp)
self.Im_emp =ut.triadic_intensities(self.w_adj_right_emp,self.w_adj_left_emp,self.w_adj_rec_emp,self.w_adj_unrec_emp)
self.Fm_emp = ut.triadic_fluxes(self.adj_right_emp,self.adj_left_emp,self.adj_rec_emp,self.adj_unrec_emp,self.w_adj_right_emp,self.w_adj_left_emp,self.w_adj_rec_out_emp,self.w_adj_rec_in_emp)
self.n_edges = ut.n_edges_func(self.binary_adjacency)
self.n_suppliers = len(self.dseq_out)
self.n_nodes = len(self.dseq_out)
self.n_users = len(self.dseq_in)
self.L_rec = ut.L_rec(self.binary_adjacency)
self.L_max = self.n_suppliers*(self.n_suppliers-1)
self.n_observations = self.n_suppliers*self.n_users
self.implemented_models = ['DBCM','RBCM','DBCM+CReMa','RBCM+CRWCM']
self.model_binary_adjacency = None
self.model_weighted_adjacency = None
self.ll = None
self.ll_binary = None
self.jacobian = None
self.norm = None
self.aic = None
self.aic_binary = None
self.args = None
self.norm_rel = None
self.avg_motif_occurrence_array = None
self.std_motif_occurrence_array = None
self.percentiles_inf_motif_occurrence_array = None
self.percentiles_sup_motif_occurrence_array = None
self.avg_motif_intensity_array = None
self.std_motif_intensity_array = None
self.percentiles_inf_motif_intensity_array = None
self.percentiles_sup_motif_intensity_array = None
self.fij_matrix = None
[docs] def solver(
self,
model,
imported_params = np.array([]),
imported_top_params = np.array([]),
use_guess = np.array([]),
maxiter = 30,
verbose=0.,
tol = 1e-06
):
"""Optimize chosen model for Graph instance and compute log-likelihoods, jacobian, infinite norms and relative norms.
The available models are DBCM and RBCM for the binary optimisation and DBCM+CReMa and RBCM+CRWCM for the mixture models.
:param model: Chosen model
:type model: string
:param imported_params: If used, uses wanted parameters as solution, default is empty array.
:type imported_params: np.ndarray
:param imported_top_params: If used for mixture models, uses wanted parameters as solution for the binary problem, default is empty array.
:type imported_params: np.ndarray
:param use_guess: If used, uses wanted parameters as starters in the optimization, default is empty array.
:type use_guess: np.ndarray
:param maxiter: Maximum Iterations of solver function, default is 30.
:type maxiter: int
:param verbose: True if you want to see every n*print_steps iterations, default is 0.
:type verbose: boolean
:param tol: tolerance for infinite norm in the optimization process, default is 1e-06.
:type tol: float
"""
self.model = model
if model == 'DBCM':
self.args = (self.dseq_out,self.dseq_in)
elif model == 'RBCM':
self.args = (self.dseq_right,self.dseq_left,self.dseq_rec)
elif model == 'DBCM+CReMa':
self.args_top = (self.dseq_out,self.dseq_in)
self.args_weighted = (self.stseq_out,self.stseq_in)
elif model == 'RBCM+CRWCM':
self.args_top = (self.dseq_right,self.dseq_left,self.dseq_rec)
self.args_weighted =(self.stseq_right,self.stseq_left,self.stseq_rec_out,self.stseq_rec_in)
else:
raise TypeError('Only implemented models are "DBCM", "RBCM", "DBCM+CReMa" and "RBCM+CRWCM"!')
if model == "DBCM":
if len(imported_params) == 0:
self.params = ut.solve_DBCM(*self.args,use_guess=use_guess,maxiter=maxiter,tol=tol)
else:
self.params = imported_params
self.jacobian = ut.jac_DBCM(self.params,*self.args)
self.norm = np.linalg.norm(self.jacobian,ord=np.inf)
self.ll = -ut.ll_DBCM(self.params,*self.args)
if model == "RBCM":
if len(imported_params) == 0:
self.params = ut.solve_RBCM(*self.args,use_guess=use_guess,maxiter=maxiter,tol=tol)
else:
self.params = imported_params
self.jacobian = ut.jac_RBCM(self.params,*self.args)
self.norm = np.linalg.norm(self.jacobian,ord=np.inf)
self.ll = -ut.ll_RBCM(self.params,*self.args)
if model == 'DBCM+CReMa':
if len(imported_params) == 0 and len(imported_top_params)==0:
self.topological_params = ut.solve_DBCM(*self.args_top,maxiter=maxiter)
if len(self.topological_params)!=2*self.n_nodes:
raise TypeError('uncorrect dimension for topological params')
self.weighted_params = ut.solve_CReMa_after_DBCM(*self.args_weighted,self.topological_params,use_guess=use_guess,maxiter=maxiter,tol=tol)
self.params = np.concatenate((self.topological_params,self.weighted_params))
elif len(imported_params) == 0 and len(imported_top_params)!=0:
self.topological_params = imported_top_params
if len(self.topological_params)!=2*self.n_nodes:
raise TypeError('uncorrect dimension for topological params')
self.weighted_params = ut.solve_CReMa_after_DBCM(*self.args_weighted,self.topological_params,use_guess=use_guess,maxiter=maxiter,tol=tol)
self.params = np.concatenate((self.topological_params,self.weighted_params))
else:
self.params = imported_params
self.weighted_params = self.params[2*self.n_nodes:]
self.topological_params = self.params[:2*self.n_nodes]
ll_weighted = -ut.ll_CReMa_after_DBCM(self.weighted_params,*self.args_weighted,self.topological_params)
ll_topological = -ut.ll_DBCM(self.topological_params,*self.args_top)
self.ll = ll_topological + ll_weighted
jac_weighted = -ut.jac_CReMa_after_DBCM(self.weighted_params,*self.args_weighted,self.topological_params)
jac_topological = -ut.jac_DBCM(self.topological_params,*self.args_top)
relative_error_weighted = -ut.relative_error_CReMa_after_DBCM(self.weighted_params,*self.args_weighted,self.topological_params)
relative_error_topological = -ut.relative_error_DBCM(self.topological_params,*self.args_top)
self.jacobian = np.concatenate((jac_topological,jac_weighted))
self.relative_error = np.concatenate((relative_error_topological,relative_error_weighted))
self.norm = np.linalg.norm(self.jacobian,ord=np.inf)
self.norm_top = np.linalg.norm(jac_topological,ord=np.inf)
self.norm_w = np.linalg.norm(jac_weighted,ord=np.inf)
self.norm_rel = np.linalg.norm(self.relative_error,ord=np.inf)
# print('norm rel:',self.norm_rel)
if model == 'RBCM+CRWCM':
if len(imported_params) == 0 and len(imported_top_params)==0:
self.topological_params = ut.solve_RBCM(*self.args_top,maxiter=maxiter,tol=tol)
if len(self.topological_params)!=3*self.n_nodes:
raise TypeError('uncorrect dimension for topological params')
self.weighted_params = ut.solve_CRWCM_after_RBCM(*self.args_weighted,self.topological_params,use_guess=use_guess,maxiter=maxiter,tol=tol)
self.params = np.concatenate((self.topological_params,self.weighted_params))
elif len(imported_params) == 0 and len(imported_top_params)!=0:
self.topological_params = imported_top_params
if len(self.topological_params)!=3*self.n_nodes:
raise TypeError('uncorrect dimension for topological params')
self.weighted_params = ut.solve_CRWCM_after_RBCM(*self.args_weighted,self.topological_params,use_guess=use_guess,maxiter=maxiter,tol=tol)
self.params = np.concatenate((self.topological_params,self.weighted_params))
else:
self.params = imported_params
self.weighted_params = self.params[3*self.n_nodes:]
self.topological_params = self.params[:3*self.n_nodes]
ll_weighted = -ut.ll_CRWCM_after_RBCM(self.weighted_params,*self.args_weighted,self.topological_params)
ll_topological = -ut.ll_RBCM(self.topological_params,*self.args_top)
self.ll = ll_topological + ll_weighted
jac_weighted = -ut.jac_CRWCM_after_RBCM(self.weighted_params,*self.args_weighted,self.topological_params)
jac_topological = -ut.jac_RBCM(self.topological_params,*self.args_top)
relative_error_weighted = -ut.relative_error_CRWCM_after_RBCM(self.weighted_params,*self.args_weighted,self.topological_params)
relative_error_topological = -ut.relative_error_RBCM(self.topological_params,*self.args_top)
self.jacobian = np.concatenate((jac_topological,jac_weighted))
self.relative_error = np.concatenate((relative_error_topological,relative_error_weighted))
self.norm = np.linalg.norm(self.jacobian,ord=np.inf)
self.norm_top = np.linalg.norm(jac_topological,ord=np.inf)
self.norm_w = np.linalg.norm(jac_weighted,ord=np.inf)
self.norm_rel = np.linalg.norm(self.relative_error,ord=np.inf)
# print('norm rel:',self.norm_rel)
[docs] def numerical_triadic_zscores(self,n_ensemble= 1000,percentiles=(2.5,97.5)):
"""Compute triadic network structures on a statistical ensemble of networks. The computed statistics are
triadic occurrences for DBCM and RBCM, and occurrences, intensities and fluxes for DBCM+CReMa and RBCM+CRWCM.
After computing triadic statistics, also z-scores and significance scores are computed.
:param n_ensemble: Chosen model, default is 1000.
:type n_ensemble: float
:param percentiles: Computed percentiles for estimated confidence interval in tuple format, default is (2.5,97.5) correspondent to a 95% CI.
:type percentiles: tuple
"""
if self.model== 'DBCM':
res_Nm = ut.occurrence_ensembler_DBCM(self.params, n_ensemble ,percentiles)
self.avg_Nm = res_Nm[0]
self.std_Nm = res_Nm[1]
self.pdown_Nm = res_Nm[2]
self.pup_Nm = res_Nm[3]
self.zscores_Nm, self.zscores_down_Nm, self.zscores_up_Nm = ut.compute_zscores(self.Nm_emp,self.avg_Nm,self.pdown_Nm,self.pup_Nm,self.std_Nm)
norm_zscores_Nm = np.linalg.norm(self.zscores_Nm)
self.normalized_z_Nm = self.zscores_Nm/norm_zscores_Nm
elif self.model== 'RBCM':
res_Nm = ut.occurrence_ensembler_RBCM(self.params, n_ensemble ,percentiles)
self.avg_Nm = res_Nm[0]
self.std_Nm = res_Nm[1]
self.pdown_Nm = res_Nm[2]
self.pup_Nm = res_Nm[3]
self.zscores_Nm, self.zscores_down_Nm, self.zscores_up_Nm = ut.compute_zscores(self.Nm_emp,self.avg_Nm,self.pdown_Nm,self.pup_Nm,self.std_Nm)
norm_zscores_Nm = np.linalg.norm(self.zscores_Nm)
self.normalized_z_Nm = self.zscores_Nm/norm_zscores_Nm
elif self.model == 'DBCM+CReMa':
res_Nm, res_Im, res_Fm = ut.occurrence_intensity_fluxes_ensembler_DBCM_CReMa(self.params, n_ensemble ,percentiles)
self.avg_Nm = res_Nm[0]
self.std_Nm = res_Nm[1]
self.pdown_Nm = res_Nm[2]
self.pup_Nm = res_Nm[3]
self.avg_Im = res_Im[0]
self.std_Im = res_Im[1]
self.pdown_Im = res_Im[2]
self.pup_Im = res_Im[3]
self.avg_Fm = res_Fm[0]
self.std_Fm = res_Fm[1]
self.pdown_Fm = res_Fm[2]
self.pup_Fm = res_Fm[3]
self.zscores_Nm, self.zscores_down_Nm, self.zscores_up_Nm = ut.compute_zscores(self.Nm_emp,self.avg_Nm,self.pdown_Nm,self.pup_Nm,self.std_Nm)
self.zscores_Im, self.zscores_down_Im, self.zscores_up_Im = ut.compute_zscores(self.Im_emp,self.avg_Im,self.pdown_Im,self.pup_Im,self.std_Im)
self.zscores_Fm, self.zscores_down_Fm, self.zscores_up_Fm = ut.compute_zscores(self.Fm_emp,self.avg_Fm,self.pdown_Fm,self.pup_Fm,self.std_Fm)
norm_zscores_Nm = np.linalg.norm(self.zscores_Nm)
norm_zscores_Im = np.linalg.norm(self.zscores_Im)
norm_zscores_Fm = np.linalg.norm(self.zscores_Fm)
self.normalized_z_Nm = self.zscores_Nm/norm_zscores_Nm
self.normalized_z_Im = self.zscores_Im/norm_zscores_Im
self.normalized_z_Fm = self.zscores_Fm/norm_zscores_Fm
elif self.model == 'RBCM+CRWCM':
res_Nm, res_Im, res_Fm = ut.occurrence_intensity_fluxes_ensembler_RBCM_CRWCM(self.params, n_ensemble ,percentiles)
self.avg_Nm = res_Nm[0]
self.std_Nm = res_Nm[1]
self.pdown_Nm = res_Nm[2]
self.pup_Nm = res_Nm[3]
self.avg_Im = res_Im[0]
self.std_Im = res_Im[1]
self.pdown_Im = res_Im[2]
self.pup_Im = res_Im[3]
self.avg_Fm = res_Fm[0]
self.std_Fm = res_Fm[1]
self.pdown_Fm = res_Fm[2]
self.pup_Fm = res_Fm[3]
self.zscores_Nm, self.zscores_down_Nm, self.zscores_up_Nm = ut.compute_zscores(self.Nm_emp,self.avg_Nm,self.pdown_Nm,self.pup_Nm,self.std_Nm)
self.zscores_Im, self.zscores_down_Im, self.zscores_up_Im = ut.compute_zscores(self.Im_emp,self.avg_Im,self.pdown_Im,self.pup_Im,self.std_Im)
self.zscores_Fm, self.zscores_down_Fm, self.zscores_up_Fm = ut.compute_zscores(self.Fm_emp,self.avg_Fm,self.pdown_Fm,self.pup_Fm,self.std_Fm)
norm_zscores_Nm = np.linalg.norm(self.zscores_Nm)
norm_zscores_Im = np.linalg.norm(self.zscores_Im)
norm_zscores_Fm = np.linalg.norm(self.zscores_Fm)
self.normalized_z_Nm = self.zscores_Nm/norm_zscores_Nm
self.normalized_z_Im = self.zscores_Im/norm_zscores_Im
self.normalized_z_Fm = self.zscores_Fm/norm_zscores_Fm
[docs] def plot_zscores(self,color = 'blue', linestyle='-',marker='o',alpha = 0.3,export_path_zscores='',
export_path_significance='',type='z-scores',label=None,show = False):
"""Plot function for z-scores. If the model is DBCM or RBCM it plots z-scores for triadic occurrences.
If it is DBCM+CReMa and RBCM+CRWCM, it plots z-scores for triadic occurrences, intensities and fluxes.
:param color: Color of profiles and CIs, default is 'blue'.
:type color: string
:param linestyle: Linestyle for profiles.
:type linestyle: string
:param marker: Marker for profiles
:type marker: string
:param alpha: Degree of transparency for CI.
:type alpha: float
:param export_path_zscores: Export path for z-scores plot.
:type alpha: string
:param export_path_zscores: Export path for significance plot.
:type alpha: string
:param type: Type of plot, 'z-scores', 'significance' or 'both'.
:type alpha: string
:param label: Label for the legend in the plot.
:type label: string
:param show: Arg for showing the plot in display.
:type show: boolean
"""
if type not in ['z-scores', 'significance', 'both']:
raise ValueError('only possible type are "z-scores" and "significance" or "both')
if show not in [False, True]:
raise ValueError('only possible type are "False" or "True"')
model = self.model
if label == None:
label = model
m = np.arange(13)+1
if model in ['DBCM','RBCM']:
if type in ['z-scores','both']:
fig,ax = plt.subplots()
plt.suptitle('z-score profile')
plt.plot(m,self.zscores_Nm,color =color,label=label,linestyle=linestyle,marker=marker)
plt.fill_between(m,self.zscores_down_Nm,self.zscores_up_Nm,color=color,alpha=alpha)
#ax.legend()
plt.ylabel('$z_{Nm}$')
plt.xlabel('m')
plt.tight_layout()
if export_path_zscores != '':
plt.savefig(export_path_zscores)
if show == True:
plt.show()
plt.close()
if type in ['significance','both']:
fig,ax = plt.subplots()
plt.suptitle('significance profile')
plt.plot(m,self.normalized_z_Nm,color =color,label=label,linestyle=linestyle,marker=marker)
#ax.legend()
plt.ylabel('$rz_{Nm}$')
plt.xlabel('m')
plt.tight_layout()
if export_path_significance != '':
plt.savefig(export_path_significance)
if show == True:
plt.show()
plt.close()
elif model in ['DBCM+CReMa', 'RBCM+CRWCM']:
if type in ['z-scores','both']:
fig,ax = plt.subplots(3,1,sharex=True)
plt.suptitle('z-score profile')
ax[0].plot(m,self.zscores_Nm,color =color,label=label,linestyle=linestyle,marker=marker)
ax[0].fill_between(m,self.zscores_down_Nm,self.zscores_up_Nm,color=color,alpha=alpha)
ax[1].plot(m,self.zscores_Im,color =color,label=label,linestyle=linestyle,marker=marker)
ax[1].fill_between(m,self.zscores_down_Im,self.zscores_up_Im,color=color,alpha=alpha)
ax[2].plot(m,self.zscores_Fm,color =color,label=label,linestyle=linestyle,marker=marker)
ax[2].fill_between(m,self.zscores_down_Fm,self.zscores_up_Fm,color=color,alpha=alpha)
#ax.legend()
ax[0].axhline(y=1.96,linestyle='--',color='black')
ax[0].axhline(y=-1.96,linestyle='--',color='black')
ax[1].axhline(y=1.96,linestyle='--',color='black')
ax[1].axhline(y=-1.96,linestyle='--',color='black')
ax[2].axhline(y=1.96,linestyle='--',color='black')
ax[2].axhline(y=-1.96,linestyle='--',color='black')
ax[0].set_ylabel('$z_{Nm}$',fontsize=16)
ax[2].set_ylabel('$z_{Fm}$',fontsize=16)
ax[1].set_ylabel('$z_{Im}$',fontsize=16)
ax[2].set_xlabel('m',fontsize=16)
plt.tight_layout()
if export_path_zscores != '':
plt.savefig(export_path_zscores)
if show == True:
plt.show()
plt.close()
if type in ['significance','both']:
fig,ax = plt.subplots(3,1,sharex=True)
plt.suptitle('significance profile')
ax[0].plot(m,self.normalized_z_Nm,color =color,label=label,linestyle=linestyle,marker=marker)
ax[1].plot(m,self.normalized_z_Im,color =color,label=label,linestyle=linestyle,marker=marker)
ax[2].plot(m,self.normalized_z_Fm,color =color,label=label,linestyle=linestyle,marker=marker)
#ax.legend()
ax[0].set_ylabel('$rz_{Nm}$',fontsize=16)
ax[2].set_ylabel('$rz_{Fm}$',fontsize=16)
ax[1].set_ylabel('$rz_{Im}$',fontsize=16)
ax[2].set_xlabel('m',fontsize=16)
plt.tight_layout()
if export_path_significance != '':
plt.savefig(export_path_significance)
if show == True:
plt.show()
plt.close()