Source code for lib.resultGraph.graphLib

from logs import logDecorator as lD
import json, os
import matplotlib.pyplot as plt # This comes before networkx
import networkx as nx
from networkx.drawing.nx_pydot import graphviz_layout
from datetime import datetime as dt
from lib.databaseIO import pgIO

config = json.load(open('../config/config.json'))
logBase = config['logging']['logBase'] + '.lib.resultGraph.graphLib'

[docs]@lD.log(logBase + '.generateGraph') def generateGraph(logger): '''generate a directed graph from the modules config generate a networkX.Graph object by reading the contents of the ``config/modules/`` folder. Parameters ---------- logger : {logging.logger} logging element Returns ------- networkX.Graph object Graph of which object is created when ''' try: graph = nx.DiGraph() folder = '../config/modules' files = [f for f in os.listdir(folder) if f.endswith('.json')] for f in files: data = json.load(open(os.path.join(folder, f))) inp = list(data['inputs'].keys()) out = list(data['outputs'].keys()) f = f.replace('.json', '') graph.add_node( f, type='module', summary='' ) # Add the incoming edges for n in inp: if n not in graph.nodes: summary = json.dumps(data['inputs'][n]) graph.add_node( n, type = data['inputs'][n]['type'], summary = summary) graph.add_edge(n, f) # Add the outgoing edges for n in out: if n not in graph.nodes: summary = json.dumps(data['outputs'][n]) graph.add_node( n, type = data['outputs'][n]['type'], summary = summary) graph.add_edge(f, n) except Exception as e: logger.error('Unable to generate the graph: {}'.format(e)) return graph
[docs]@lD.log(logBase + '.plotGraph') def plotGraph(logger, graph, fileName=None): '''plot the graph Parameters ---------- logger : {logging.logger} logging element graph : {networkX.Graph object} The graph that needs to be plotted fileName : {str}, optional name of the file where to save the graph (the default is None, which results in no graph being generated) ''' try: plt.figure() moduleNodes = [m for m, d in graph.nodes(data=True) if ('module' == d['type'])] otherNodes = [m for m, d in graph.nodes(data=True) if ('module' != d['type'])] lables = {m:m for m in graph.nodes} pos = graphviz_layout(graph, prog='dot') nx.draw_networkx_nodes(graph, pos, nodelist=moduleNodes, node_color='orange', node_size=500) nx.draw_networkx_nodes(graph, pos, nodelist=otherNodes, node_color='cyan', node_size=500) nx.draw_networkx_edges(graph, pos, arrows=True) nx.draw_networkx_labels(graph, pos, lables, font_size=10) if fileName is not None: plt.savefig(fileName) plt.close() print('Graph saved ...') except Exception as e: logger.error('Unable to plot the graph: {}'.format(e)) return
[docs]@lD.log(logBase + '.generateSubGraph') def generateSubGraph(logger, graph, keyNode): '''generate a subgraph that contains all prior nodes Parameters ---------- logger : {logging.logger} logging element graph : {networkX.Graph object} [description] keyNode : {str} Name of the node whose ancestors need to be geenrated. Returns ------- networkX.Graph object graph containing the particular node and its ancistors. ''' try: newGraph = nx.DiGraph() nodes = list(nx.ancestors(graph, keyNode)) nodes.append(keyNode) for n, d in graph.nodes(data=True): if (n in nodes): newGraph.add_node( n, **d ) for n1, n2 in graph.edges: if (n1 in nodes) and (n2 in nodes): newGraph.add_edge(n1, n2) except Exception as e: logger.error('Unable to generate the right subgraph: {}'.format(e)) return newGraph
[docs]@lD.log(logBase + '.graphToSerialized') def graphToSerialized(logger, graph): '''serializes a graph Takes a networkX.Graph object and converts it into a serialized set of nodes and edges. Parameters ---------- logger : {logging.logger} logging element graph : {networkX.Graph object} A networkX graph object that is to be serialized Returns ------- tuple of serialized lists This is a tuple of nodes and edges in a serialized format that can be later directly inserted into a database. ''' progName = config['logging']['logBase'] now = dt.now() nodes = [] for n, d in graph.nodes(data=True): nodes.append([ progName, # program name now, # current datetime n, # node name d['type'], # 'module', 'csv'. ... d['summary']]) edges = [] for n1, n2 in graph.edges: edges.append([progName, now, n1, n2]) return nodes, edges
[docs]@lD.log(logBase + '.serializedToGraph') def serializedToGraph(logger, nodes, edges): '''deserialize a graph serialized earlier Take serialized versions of the nodes and edges which is produced by the function ``graphToSerialized`` and convert that into a normal ``networkX``. Parameters ---------- logger : {logging.logger} logging element nodes : {list} serialized versions of the nodes of a graph edges : {list} A list of edges in a serialized form Returns ------- networkX.Graph object Takes a list of serialized nodes and edges and converts it into a networkX.Graph object ''' graph = nx.DiGraph() for _, _, n, t, s in nodes: graph.add_node( n, type = t, summary = s) for _, _, e1, e2 in edges: graph.add_edge( e1, e2) return graph
[docs]@lD.log(logBase + '.uploadGraph') def uploadGraph(logger, graph, dbName=None): '''upload the supplied graph to a database Given a graph, this function is going to upload the graph into a particular database. In case a database is not specified, this will try to upload the data into the default database. Parameters ---------- logger : {logging.logger} logging element graph : {networkX graph} The graph that needs to be uploaded into the database dbName : {str}, optional The name of the database in which to upload the data into. This is the identifier within the ``db.json`` configuration file. (the default is ``None``, which would use the default database specified within the same file) ''' try: nodes, edges = graphToSerialized(graph) queryNodes = '''insert into graphs.nodes values %s''' queryEdges = '''insert into graphs.edges values %s''' pgIO.commitDataList(queryNodes, nodes, dbName=dbName) pgIO.commitDataList(queryEdges, edges, dbName=dbName) except Exception as e: logger.error() return