Commit 81501062 authored by Bertrand Néron's avatar Bertrand Néron
Browse files

add support to direted graph and weighted graph

new data structure to support weigthed graph
add 2 concrete implementaion of graph directed and undirected
parent 6008dde4
from typing import Iterator, Dict
import itertools import itertools
from abc import ABCMeta from abc import ABCMeta, abstractmethod
from typing import Iterator, Dict, Set, Sequence, Union, Tuple
class Graph(metaclass=ABCMeta): class Node:
"""
Graph node representation
"""
def __init__(self): _id = itertools.count(0)
self.nodes: set[Node] = set()
self.vertices: Dict[Node: set[Node]]
def add_node(self): def __init__(self, **kwargs) -> None:
pass self.id: int = next(self._id)
for k, v in kwargs:
setattr(self, k, v)
def del_node(self):
pass def __hash__(self) -> int:
# to be usable in set an object must be hashable
# so we need to implement __hash__
# which must return an int unique per object
# here we have the Node identifier
return self.id
def __str__(self) -> str:
return f"node_{self.id}"
def __repr__(self) -> str:
return str(self)
class NDGraph: class Edge:
""" """
To handle Non Directed Graph modelize edge between two nodes, each edge can support properties
A graph is a collection of Nodes linked by edges
there are basically to way to implement a graph
- The graph handles Nodes, each nodes know it's neighbors
- The graph handles nodes and the edges between nodes
below I implemented the 2nd version.
""" """
def __init__(self, nodes: Sequence['Node']) -> None: def __init__(self, src: Node, target: Node, directed=True, **kwargs) -> None:
self.nodes: Set['Node'] = {n for n in nodes} self.src = src
self.vertices: Dict['Node': Set['Node']] = {n: set() for n in self.nodes} self.target = target
self.directed = directed
for k, v in kwargs.items():
setattr(self, k, v)
def __str__(self):
"""string representation of this edge"""
return f"{self.src} -{'>' if self.directed else ''} {self.target}"
def add_node(self, node: 'Node') -> None:
def properties(self) -> Dict:
"""the edge properties"""
return {k: v for k, v in self.__dict__.items()}
class Graph(metaclass=ABCMeta):
"""
Base class for graphs.
A graph is a collection of Nodes linked by edges
"""
def __init__(self) -> None:
self._nodes: {}
def add_node(self, node) -> None:
""" """
Add a node to the graph Add a node to the graph
...@@ -39,12 +69,11 @@ class NDGraph: ...@@ -39,12 +69,11 @@ class NDGraph:
:type node: :class:`Node` :type node: :class:`Node`
:return: None :return: None
""" """
self.nodes.add(node) if node not in self._nodes:
if node not in self.vertices: self._nodes[node] = {}
self.vertices[node] = set()
def del_node(self, node: 'Node') -> None: def del_node(self, node: Node) -> None:
""" """
Remove Node from Graph Remove Node from Graph
...@@ -52,33 +81,12 @@ class NDGraph: ...@@ -52,33 +81,12 @@ class NDGraph:
:type node: :class:`Node` :type node: :class:`Node`
:return: None :return: None
""" """
self.nodes.remove(node) for nbh in self.neighbors(node):
neighbors = self.vertices[node] del self._nodes[nbh][node]
for nbh in neighbors: del self._nodes[node]
self.vertices[nbh].remove(node)
del self.vertices[node]
def add_edge(self, node_1: 'Node', nodes: Sequence['Node']): def neighbors(self, node: Node) -> Iterator[Node]:
"""
Add vertice between node1 and all nodes in nodes
:param node_1: the reference node
:type node_1: :class:`Node`
:param nodes: the nodes connected to node_1
:type nodes: Sequence of :class:`Node`
:return: Node
"""
if node_1 not in self.nodes:
raise ValueError("the node_1 must be in Graph")
for n in nodes:
if n not in self.nodes:
raise ValueError("node must be add to Graph before creating edge")
self.vertices[node_1].add(n)
self.vertices[n].add(node_1)
def neighbors(self, node: 'Node') -> Set['Node']:
""" """
return the nodes connected to node return the nodes connected to node
...@@ -86,174 +94,164 @@ class NDGraph: ...@@ -86,174 +94,164 @@ class NDGraph:
:type node: :class:`Node` :type node: :class:`Node`
:return: a set of :class:`Node` :return: a set of :class:`Node`
""" """
return {n for n in self.vertices[node]} for n in self._nodes[node]:
yield n
def get_weight(self, node_1, node_2): def nodes(self) -> Iterator[Node]:
""" """
Iterates over all nodes belonging to the graph
:param node_1: :return: iterator
:param node_2:
:return:
""" """
return 1 for n in self._nodes:
yield n
def edges(self, node: Node) -> Iterator[Edge]:
"""
Iterates over edge connecting to/from this node
:param node:
:return:
"""
for e in self._nodes[node].values():
yield e
class Node: def has_edge(self, edge) -> bool:
"""
:param edge:
:return: True if it exists an edge connecting edge nodes src and target
"""
return edge.src in self._nodes and edge.target in self._nodes[edge.src]
_id = itertools.count(0) @abstractmethod
def add_edge(self, src: Node, target: Node, **edge_prop) -> None:
"""
def __init__(self) -> None: :param edge:
self.id: int = next(self._id) :return:
"""
pass
def __hash__(self) -> int: @abstractmethod
# to be usable in set an object must be hashable def del_edge(self, src: Node, target: Node) -> None:
# so we need to implement __hash__ """
# which must return an int unique per object remove the edge between nodes src and target
# here we ad the identifier of the Node :param src:
return self.id :param target:
:return:
"""
pass
def __str__(self):
return f"node_{self.id}"
def __repr__(self): @staticmethod
return str(self) @abstractmethod
def is_directed() -> bool:
pass
class Edge: def order(self):
"""
The Order of a graph is the number of Nodes in the graph
:return:
"""
return len(self._nodes)
def __init__(self, src, target, weight): @abstractmethod
self.src = src def size(self):
self.target = target """
self.weight = weight The size of a graph is the number of edges in the graph.
:return:
"""
size = 0
for n in self.nodes():
size += len(self._nodes[n])
return size
class NDWGraph: class UnDirectedGraph(Graph):
""" """
To handle Non Directed Graph To handle UnDirected Graph
A graph is a collection of Nodes linked by edges A graph is a collection of Nodes linked by edges
there are basically to way to implement a graph
- The graph handles Nodes, each nodes know it's neighbors
- The graph handles nodes and the edges between nodes
below I implemented the 2nd version.
""" """
def __init__(self, node) -> None: def __init__(self) -> None:
self.nodes: Set[Node] = {} self._nodes = {}
@staticmethod
def is_directed() -> bool:
return False
def add_node(self, node: Node) -> None: def size(self) -> int:
""" """
Add a node to the graph :return: The number of edges of this graph
:param node: the node to add
:type node: :class:`Node`
:return: None
""" """
if node not in self.nodes: return super().size() // 2
self.nodes[node] = {}
def del_node(self, node: Node) -> None: def add_edge(self, node_1, node_2, **prop):
""" """
Remove Node from Graph Add vertex between node1 and all nodes in nodes
:param node: the node to add :return: Node
:type node: :class:`Node`
:return: None
""" """
neighbors = self.nodes[node] self.add_node(node_1)
for nbh in neighbors: self.add_node(node_2)
del self.vertices[nbh][node] self._nodes[node_1][node_2] = Edge(node_1, node_2, directed=False, **prop)
del self.nodes[node] self._nodes[node_2][node_1] = Edge(node_2, node_1, directed=False, **prop)
def add_edge(self, node_1: Node, node_2: Node, weight: Union[int, float]): def del_edge(self, src: Node, target: Node) -> None:
""" """
Add vertex between node1 and all nodes in nodes remove edge between nodes src and target
:param src:
:param node_1: the reference node :param target:
:param node_2: the nodes connected to node_1 :return:
:param weight: the weight of the edge between node_1 and node_2
:return: Node
""" """
for n in node_1, node_2: del self._nodes[src][target]
if n not in self.nodes: del self._nodes[target][src]
raise ValueError(f"node {n.id} not found in Graph. The node must be in Graph.")
self.nodes[node_1][node_2] = weight
self.nodes[node_2][node_1] = weight
def neighbors(self, node): class DirectedGraph(Graph):
""" """
return the nodes connected to node To handle Directed Graph
A graph is a collection of Nodes linked by edges
"""
:param node: the reference node def __init__(self) -> None:
:type node: :class:`Node` self._nodes = {}
:return: a set of :class:`Node`
"""
return {n for n, w in self.nodes[node].items()}
def get_weight(self, node_1, node_2): @staticmethod
""" def is_directed() -> bool:
return True
:param node_1:
:param node_2: def size(self):
:return:
""" """
return self.nodes[node_1][node_2] :return: The number of edges of this graph
"""
return super().size()
def DFS(graph: NDGraph, node: Node) -> Iterator[Node]: def add_edge(self, src: Node, target: Node, **prop) -> None:
""" """
**D**epth **F**irst **S**earch. Add edge from node *src* to node *target*
We start the path from the node given as argument,
This node is labeled as 'visited' :param src: the src node
The neighbors of this node which have not been already 'visited' nor 'to visit' are labelled as 'to visit' :param target: the target node
We pick the last element to visit and visit it :param prop: the edge properties
(The neighbors of this node which have not been already 'visited' nor 'to visit' are labelled as 'to visit'). :return: None
on so on until there no nodes to visit anymore. """
self.add_node(src)
:param graph: self.add_node(target)
:param node: self._nodes[src][target] = Edge(src, target, directed=True, **prop)
:return:
"""
to_visit = [node]
visited = set()
while to_visit:
n = to_visit.pop(-1)
visited.add(n)
new_to_visit = graph.neighbors(n) - visited - set(to_visit)
to_visit.extend(new_to_visit)
yield n
def BFS(graph: NDGraph, node: Node) -> Iterator[Node]: def del_edge(self, src: Node, target: Node) -> Node:
""" """
**B**readth **F**irst **s**earch Remove edge between nodes src and target
We start the path from the node given as argument, :param src:
This node is labeled as 'visited' :param target:
The neighbors of this node which have not been already 'visited' nor 'to visit' are labelled as 'to visit' :return:
we pick the **first** element of the nodes to visit and visit it. """
(The neighbors of this node which have not been already 'visited' nor 'to visit' are labelled as 'to visit') del self._nodes[src][target]
on so on until there no nodes to visit anymore.
:param graph:
:param node:
:return:
"""
to_visit = [node]
visited = set()
parent = None
while to_visit:
n = to_visit.pop(0)
visited.add(n)
new_to_visit = graph.neighbors(n) - visited - set(to_visit)
to_visit.extend(new_to_visit)
if not parent:
weight = 0
else:
weight = graph.get_weight(parent, n)
parent = n
yield n, weight
from typing import Iterator, List, Union, Tuple
from .helpers import LIFO, FIFO
from .graph_3 import Node, Edge, Graph
def _traversing(to_visit: Union[FIFO, LIFO], graph: Graph, node: Node) -> Iterator[Tuple[Node, List[Edge]]]:
"""
function that traverse the graph starting from node
:param to_visit:
:param graph:
:param node:
:return: an iterator at each step return the visiting node and the path
"""
to_visit.add(node)
visited = set()
parent = {}
path = []
while to_visit:
node = to_visit.pop()
# it is important to add node in visited right now
# and not a the end of the block
# otherwise node is not anymore in to_visit and not yet in visited
# so when we explore neighbors we add it again in to_visit
visited.add(node)
for edge in graph.edges(node):
if edge.target not in visited and edge.target not in to_visit:
parent[edge.target] = edge
to_visit.add(edge.target)
if node in parent:
path.append(parent[node])
yield node, path
def DFS(graph: Graph, node: Node) -> Iterator[Tuple[Node, List[Edge]]]:
"""
**D**\ epth **F**\ irst **S**\ earch.
We start the path from the node given as argument,
This node is labeled as 'visited'
The neighbors of this node which have not been already 'visited' nor 'to visit' are labelled as 'to visit'
We pick the last element to visit and visit it
(The neighbors of this node which have not been already 'visited' nor 'to visit' are labelled as 'to visit').
on so on until there no nodes to visit anymore.
:param graph: The graph to traverse
:param node: The starting node
:return: iterator on nodes
"""
return _traversing(FIFO(), graph, node)
def BFS(graph: Graph, node: Node) -> Iterator[Tuple[Node, List[Edge]]]:
"""
**B**\ readth **F**\ irst **s**\ earch
We start the path from the node given as argument,
This node is labeled as 'visited'
The neighbors of this node which have not been already 'visited' nor 'to visit' are labelled as 'to visit'
we pick the **first** element of the nodes to visit and visit it.
(The neighbors of this node which have not been already 'visited' nor 'to visit' are labelled as 'to visit')
on so on until there no nodes to visit anymore.
:param graph:
:param node:
:return:
"""
return _traversing(LIFO(), graph, node)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment