Skip to content

Concrete Edge

graphorchestrator.edges.concrete

ConcreteEdge

Bases: Edge

Concrete implementation of an edge in a graph.

This class represents a direct, unconditional connection between a source node and a sink node in the graph. It logs the creation of the edge for debugging and monitoring purposes.

Attributes:

Name Type Description
source Node

The source node of the edge.

sink Node

The sink node of the edge.

Source code in graphorchestrator\edges\concrete.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class ConcreteEdge(Edge):
    """Concrete implementation of an edge in a graph.

    This class represents a direct, unconditional connection between
    a source node and a sink node in the graph.
    It logs the creation of the edge for debugging and monitoring purposes.

    Attributes:
        source (Node): The source node of the edge.
        sink (Node): The sink node of the edge.
    """

    def __init__(self, source: Node, sink: Node):
        """Initializes a ConcreteEdge object.

        This method sets up the connection between a source node and
        a sink node, and logs the creation of this edge.

        Args:
            source (Node): The source node of the edge.
            sink (Node): The sink node of the edge.
        """
        self.source = source
        # Sets the source node of the edge.
        self.sink = sink
        # Sets the sink node of the edge.

        GraphLogger.get().info(
            # Logs the creation of the edge with relevant details.
            **wrap_constants(
                # Wraps the log message with necessary constants.
                message="Concrete edge created",
                **{
                    # Additional details for the log message.
                    LC.EVENT_TYPE: "edge",
                    LC.ACTION: "edge_created",
                    LC.EDGE_TYPE: "concrete",
                    LC.SOURCE_NODE: self.source.node_id,
                    LC.SINK_NODE: self.sink.node_id,
                }
            )
        )

__init__(source, sink)

Initializes a ConcreteEdge object.

This method sets up the connection between a source node and a sink node, and logs the creation of this edge.

Parameters:

Name Type Description Default
source Node

The source node of the edge.

required
sink Node

The sink node of the edge.

required
Source code in graphorchestrator\edges\concrete.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def __init__(self, source: Node, sink: Node):
    """Initializes a ConcreteEdge object.

    This method sets up the connection between a source node and
    a sink node, and logs the creation of this edge.

    Args:
        source (Node): The source node of the edge.
        sink (Node): The sink node of the edge.
    """
    self.source = source
    # Sets the source node of the edge.
    self.sink = sink
    # Sets the sink node of the edge.

    GraphLogger.get().info(
        # Logs the creation of the edge with relevant details.
        **wrap_constants(
            # Wraps the log message with necessary constants.
            message="Concrete edge created",
            **{
                # Additional details for the log message.
                LC.EVENT_TYPE: "edge",
                LC.ACTION: "edge_created",
                LC.EDGE_TYPE: "concrete",
                LC.SOURCE_NODE: self.source.node_id,
                LC.SINK_NODE: self.sink.node_id,
            }
        )
    )