AbstractLcpFactory.py 2.33 KB
Newer Older
Yoann Dufresne's avatar
Yoann Dufresne committed
1
import networkx as nx
2
import sys
Yoann Dufresne's avatar
Yoann Dufresne committed
3
from abc import abstractmethod
4
from multiprocessing import Pool, Value
Yoann Dufresne's avatar
Yoann Dufresne committed
5

Yoann Dufresne's avatar
Yoann Dufresne committed
6
from deconvolution.lcp.FixedDGIndex import FixedDGIndex, AbstractDGIndex
Yoann Dufresne's avatar
Yoann Dufresne committed
7

8
counter = None
Yoann Dufresne's avatar
Yoann Dufresne committed
9

10
def process_node(factory, node):
11 12 13 14 15 16
    global counter
    my_value = counter.value
    counter.value += 1

    if factory.verbose:
        print(f"{my_value}: Generating d-graphs")
17
        sys.stdout.flush()
18 19

    # udg generation
20
    neighbors = [x for x in factory.graph[node]]
21 22 23 24
    subgraph = nx.Graph(factory.graph.subgraph(neighbors))
    dgs = factory.generate_by_node(node, subgraph)

    if factory.verbose:
25
        print(f"{my_value}: d-graphs generated, starting filtering")
26
        print(f"{my_value}: {len(dgs)} udg to filter")
27
        sys.stdout.flush()
28 29 30 31 32

    # udg domination filtering
    dgs = AbstractDGIndex.filter_entry(dgs)

    if factory.verbose:
33
        print(f"{my_value}: {len(dgs)} udg remaining after filtering")
34
        print(f"{my_value}({factory.nb_nodes}) terminated")
35
        sys.stdout.flush()
36 37 38

    return node, dgs

Yoann Dufresne's avatar
Yoann Dufresne committed
39
class AbstractDGFactory:
40 41
    def __init__(self, graph, debug=False):
        self.debug = debug
Yoann Dufresne's avatar
Yoann Dufresne committed
42
        self.graph = graph
43 44
        self.nb_nodes = len(self.graph.nodes())
        self.verbose = False
45 46
        global counter
        counter = Value('i', 0)
Yoann Dufresne's avatar
Yoann Dufresne committed
47

Yoann Dufresne's avatar
Yoann Dufresne committed
48
    def generate_all_lcps(self, verbose=False, threads=8):
Yoann Dufresne's avatar
Yoann Dufresne committed
49
        index = FixedDGIndex(size=1)
50
        factory = self
Yoann Dufresne's avatar
Yoann Dufresne committed
51
        nb_nodes = len(self.graph.nodes())
52
        self.verbose = verbose
53 54
        global counter
        counter = Value('i', 0)
55

56 57 58
        if verbose:
            print("Start parallel work")

59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
        if threads > 1:
            results = None
            with Pool(processes=threads) as pool:
                results = pool.starmap(process_node, zip(
                    [factory]*nb_nodes,
                    self.graph.nodes()
                ))

            # Fill the index by node
            for node, dgs in results:
                key = frozenset({node})
                for dg in dgs:
                    index.add_value(key, dg)
        else:
            for node in self.graph.nodes():
                key = frozenset({node})
                _, dgs = process_node(factory, node)
                for dg in dgs:
                    index.add_value(key, dg)
Yoann Dufresne's avatar
Yoann Dufresne committed
78 79 80 81 82 83 84

        return index


    @abstractmethod
    def generate_by_node(self, node, subgraph):
        pass