# 11. How to generate branches for parallel node execution

## How to generate branches for parallel node execution <a href="#id-1" id="id-1"></a>

![](https://wikidocs.net/images/page/265766/langgraph-09.jpeg)

Parallel execution of nodes is essential to speed up the entire graph operation. `LangGraph` Supports parallel execution of the node by default, which can significantly improve the performance of graph-based workflows.

This parallelization **fan-out** and **fan-in** Implemented through mechanisms, standard edges `conditional_edges` Use

### Preferences <a href="#id-2" id="id-2"></a>

```python
Copy# Configuration file for managing API keys as environment variables
from dotenv import load_dotenv

# Load API key information

load_dotenv()
```

```
True 
```

```python
# Set up LangSmith tracking. https://smith.langchain.com
# !pip install -qU langchain-teddynote
from langchain_teddynote import logging

# Enter a project name.
logging.langsmith("CH17-LangGraph-Modules")
```

```
 Start tracking LangSmith. 
[Project name] 
CH17-LangGraph-Modules 
```

### Parallel nodes fan-out and fan-in <a href="#fan-out-fan-in" id="fan-out-fan-in"></a>

**fan-out / fan-in**

In parallel processing **fan-out** and **fan-in** Silver is a concept that describes the process of sharing and gathering work.

* **Fan-out (extended)** : Squat large tasks into multiple small tasks. For example, when making pizza, you can prepare dough, sauce, and cheese separately. It is fan-out to split each part like this and process it simultaneously.
* **Fan-in (collection)** : Combine small jobs divided into one again. Like the process of raising all the ingredients prepared for pizza to make a finished pizza, it is fan-in to collect the results and complete the final work after several tasks.

In other words, **fan-out** Distribute the work, **fan-in** This is the flow of combining the results to get the final result.

***

In this example `Node A` in `B and C` Panout `D` Shows the process of being a fan.

In State `reducer(add)` Specifies the operator. This combines or accumulates values instead of simply overwriting existing values for specific keys within the State. For list, it means connecting the new list with the existing one.

LangGraph to specify the reducer function for a specific key in State `Annotated` Use type. This is the original type for type inspection `list` ), but without changing the type itself, the reducer function `add` ) Can be attached to the type.

```python
from typing import Annotated, Any
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages


# Defining the state (using the add_messages reducer)
class State(TypedDict):
    aggregate: Annotated[list, add_messages]


# Node value return class
class ReturnNodeValue:
    # reset
    def __init__(self, node_secret: str):
        self._value = node_secret

    # Status update on call
    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['aggregate']}")
        return {"aggregate": [self._value]}


# Initialize state graph
builder = StateGraph(State)

# Create and assign values ​​to nodes A through D
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))

# Node connection
builder.add_edge("a", "b")
builder.add_edge("a", "c")
builder.add_edge("b", "d")
builder.add_edge("c", "d")
builder.add_edge("d", END)

# Compile the graph
graph = builder.compile()
```

Visualize the graph.

```python
from langchain_teddynote.graphs import visualize_graph

visualize_graph(graph)
```

![](https://wikidocs.net/images/page/265766/langgraph-08.jpeg)

`reducer` The values added to each node through **accumulation** You can check what is being.

```python
Copy# running a graph
graph.invoke({"aggregate": []}, {"configurable": {"thread_id": "foo"}})
```

```
Adding I'm A to [] 
Adding I'm B to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='93e79740-a331-4c56-8d9e-60c4b2704 
Adding I'm C to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='93e79740-a331-4c56-8d9e-60c4b2704 
Adding I'm D to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='93e79740-a331-4c56-8d9e-60c4b27 response_metadata={}, id='bb980f25-687c-44b3-8c94-e89f9a1109b0')] 
```

```
 {'aggregate': [HumanMessage (content=" I'm A", additional_kwargs={}, respons={}'93e79740-a331-4c56-8d9e-60 id='bb980f25-687c-44b3-8c94-e89f9a1109b0'), HumanMessage (content="I'm D", additional_kwargs={}, response_metadata={}, id< 
```

#### Response when an exception occurs during parallel processing <a href="#id-3" id="id-3"></a>

LangGraph executes nodes within "super-step (full process step where multiple nodes are handled)", which means that even if the parallel branch runs simultaneously, the entire super-step **transaction** It means it is processed in a way.

So, if any of these quarters have an exception, the update to the status **Not at all** Does not apply (full super-step is error handled).

> super-step: full process step where multiple nodes are handled

![](https://wikidocs.net/images/page/265766/langgraph-09.jpeg)

If you have an error-prone task (e.g. unstable API call handling), LangGraph provides two ways to fix it.

1. You can capture and handle exceptions by writing common Python code within the node.
2. [**retry\_policy** ](https://langchain-ai.github.io/langgraph/reference/graphs/#langgraph.graph.graph.CompiledGraph.retry_policy)You can set the graph to retry the node where a certain type of exception occurred. You only have to retry the failed branch, so you don't have to worry about performing unnecessary tasks.

These features give you full control over parallel execution and exception handling.

### Fan-out and fan-in of parallel nodes with additional steps <a href="#fan-out-fan-in_1" id="fan-out-fan-in_1"></a>

In the example above, when each path is a single step `fan-out` and `fan-in` Showed how. But what if there are multiple steps in one path?

```python
from typing import Annotated
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages


# Defining the state (using the add_messages reducer)
class State(TypedDict):
    aggregate: Annotated[list, add_messages]


# Node value return class
class ReturnNodeValue:
    # reset
    def __init__(self, node_secret: str):
        self._value = node_secret

    # Status update on call
    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['aggregate']}")
        return {"aggregate": [self._value]}


# Initialize state graph
builder = StateGraph(State)

# Creating and connecting nodes
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b1", ReturnNodeValue("I'm B1"))
builder.add_node("b2", ReturnNodeValue("I'm B2"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_edge("a", "b1")
builder.add_edge("a", "c")
builder.add_edge("b1", "b2")
builder.add_edge(["b2", "c"], "d")
builder.add_edge("d", END)

# compile the graph
graph = builder.compile()
```

Visualize the graph.

```python
from langchain_teddynote.graphs import visualize_graph

visualize_graph(graph)
```

![](https://wikidocs.net/images/page/265766/langgraph-14.jpeg)

```python
# Executing graph aggregation operations using an empty list, performing default aggregation on all data
graph.invoke({"aggregate": []})
```

```
 Adding I'm A to [] 
Adding I'm B1 to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='bbae6f5c-dea2-4790-8450-2d5fa6f29 
Adding I'm C to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='bbae6f5c-dea2-4790-8450-2d5fa6f29d 
Adding I'm B2 to [HumanMessage (content="I'm A", additional_kwargs={}, response_metadata={}, id='bbae6f5c-dea2-4790-8450-2d5fa6f29 response_metadata={}, id='d355a34e-43cd-4349-bc02-040e1085cca0')] 
Adding I'm D to [HumanMessage (content="I'm A", additional_kwargs={}, response_metadata={}, id='bbaef5c-dea2-4790-8450-2d5fa6f29 response_metadata={}, id='d355a34e-43cd-4349-bc02-040e1085cca0'), HumanMessage (content="I'm B2", additional_kwargs={}, respons 
```

### Conditional branching <a href="#conditional-branching" id="conditional-branching"></a>

If fan-out is not decisive, `add_conditional_edges` You can use it yourself.

When there is a known "sink" node to be connected after a conditional branch, when creating a conditional edge `then="실행할 노드명"` Can be provided.

```python
from typing import Annotated, Sequence
from typing_extensions import TypedDict
from langgraph.graph import END, START, StateGraph


# Defining the state (using the add_messages reducer)
class State(TypedDict):
    aggregate: Annotated[list, add_messages]
    which: str


# A class that returns unique values ​​for each node.
class ReturnNodeValue:
    def __init__(self, node_secret: str):
        self._value = node_secret

    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['aggregate']}")
        return {"aggregate": [self._value]}


# Initialize state graph
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")
builder.add_node("b", ReturnNodeValue("I'm B"))
builder.add_node("c", ReturnNodeValue("I'm C"))
builder.add_node("d", ReturnNodeValue("I'm D"))
builder.add_node("e", ReturnNodeValue("I'm E"))


# Conditional routing path determination function based on the 'which' value of the state
def route_bc_or_cd(state: State) -> Sequence[str]:
    if state["which"] == "cd":
        return ["c", "d"]
    return ["b", "c"]


# List of nodes to be processed in full parallel
intermediates = ["b", "c", "d"]

builder.add_conditional_edges(
    "a",
    route_bc_or_cd,
    intermediates,
)
for node in intermediates:
    builder.add_edge(node, "e")


# Final node connection and graph compilation
builder.add_edge("e", END)
graph = builder.compile()
```

Below is the reference code. `then` When using grammar, `then="e"` You don't have to add and add edge connections.

```python
## When using then grammar
# builder.add_conditional_edges(
#     "a",
#     route_bc_or_cd,
#     intermediates,
#     then="e",
# )
```

Visualize the graph.

```python
from langchain_teddynote.graphs import visualize_graph

visualize_graph(graph)
```

![](https://wikidocs.net/images/page/265766/langgraph-15.jpeg)

```python
# Run the graph (specified as which: bc)

graph.invoke({"aggregate": [], "which": "bc"})
```

```
 Adding I'm A to [] 
Adding I'm C to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='d6f6e77f-d2fe-4c6b-8dc7-0771b7f16eb 
Adding I'm B to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='d6f6e77f-d2fe-4c6b-8dc7-0771b7f16eb 
Adding I'm E to [HumanMessage (content="I'm A", additional_kwargs={}, response_metadata={}, id='d6f6e77f-d2fe-4c6b-8dc7-071b7f16 response_metadata={}, id='b60eb63d-879e-4c65-b1ff-09bac04e724c')] 
```

```
 {'aggregate': [HumanMessage (content=" I'm A", additional_kwargs={}, respons={}'d6f6e77f-d2fe-4c6b-8dc7-071 id='b60eb63d-879e-4c65-b1ff-09bac04e724c'), HumanMessage (content="I'm E", additional_kwargs={}, response_metadata={}, id= 
```

```python
# Run the graph (specified as which: cd)
graph.invoke({"aggregate": [], "which": "cd"})
```

```
 Adding I'm A to [] 
Adding I'm C to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='f5f810cc-a31a-42ea-b6f6-941c7ba647 
Adding I'm D to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='f5f810cc-a31a-42ea-b6f6-941c7ba647 
Adding I'm E to [HumanMessage (content="I'm A", additional_kwargs={}, response_metadata={}, id='f5f810cc-a31a-42ea-b6f6-941c7ba6 response_metadata={}, id='105d74b7-cea3-4cb5-8975-81168d773ca3')] 
```

```
 {'aggregate': [HumanMessage (content=" I'm A", additional_kwargs={}, respons={}'f8f810cc-a31a-42ea-b6f6-941 id='105d74b7-cea3-4cb5-8975-81168d773ca3'), HumanMessage (content="I'm E", additional_kwargs={}, response_metadata={}, id<TA 
```

### Sort by confidence of fan-out values <a href="#fan-out" id="fan-out"></a>

Nodes unfolded in parallel are one" **super-step** Runs with ". Updates that occur in each super-step are applied to the state sequentially after the corresponding super-step is completed.

If a consistent predefined update sequence is required in the parallel super-step, the output value is recorded in a separate field of state with an identification key, then general from each panned node to the gathering point `edge` You need to combine them in the "sink" node by adding.

For example, consider when you want to sort the output of a parallel step according to "reliability".

```python
from typing import Annotated, Sequence
from typing_extensions import TypedDict
from langgraph.graph import StateGraph
from langgraph.graph.message import add_messages


# Implementing merge logic for fanout values, handling empty lists, and performing list concatenation
def reduce_fanouts(left, right):
    if left is None:
        left = []
    if not right:
        # overwrite
        return []
    return left + right


# Defining types for state management, setting up aggregate and fanout value storage structures
class State(TypedDict):
    # Using the add_messages reducer
    aggregate: Annotated[list, add_messages]
    fanout_values: Annotated[list, reduce_fanouts]
    which: str


# Initialize graph
builder = StateGraph(State)
builder.add_node("a", ReturnNodeValue("I'm A"))
builder.add_edge(START, "a")


# Parallel node value return class
class ParallelReturnNodeValue:
    def __init__(
        self,
        node_secret: str,
        reliability: float,
    ):
        self._value = node_secret
        self._reliability = reliability

    # Status update on call
    def __call__(self, state: State) -> Any:
        print(f"Adding {self._value} to {state['aggregate']} in parallel.")
        return {
            "fanout_values": [
                {
                    "value": [self._value],
                    "reliability": self._reliability,
                }
            ]
        }


# Add parallel nodes with different reliability
builder.add_node("b", ParallelReturnNodeValue("I'm B", reliability=0.1))
builder.add_node("c", ParallelReturnNodeValue("I'm C", reliability=0.9))
builder.add_node("d", ParallelReturnNodeValue("I'm D", reliability=0.5))


# Add parallel nodes with different reliability
def aggregate_fanout_values(state: State) -> Any:
    # Sort by confidence level
    ranked_values = sorted(
        state["fanout_values"], key=lambda x: x["reliability"], reverse=True
    )
    print(ranked_values)
    return {
        "aggregate": [x["value"][0] for x in ranked_values] + ["I'm E"],
        "fanout_values": [],
    }


# Add an aggregate node
builder.add_node("e", aggregate_fanout_values)


# Implement conditional routing logic based on status
def route_bc_or_cd(state: State) -> Sequence[str]:
    if state["which"] == "cd":
        return ["c", "d"]
    return ["b", "c"]


# Setting up intermediate nodes and adding conditional edges
intermediates = ["b", "c", "d"]
builder.add_conditional_edges("a", route_bc_or_cd, intermediates)

# Connecting intermediate nodes and final aggregation nodes
for node in intermediates:
    builder.add_edge(node, "e")

# Final for completing the graph
graph = builder.compile()
```

Visualize the graph.

```python
from langchain_teddynote.graphs import visualize_graph

visualize_graph(graph)
```

![](https://wikidocs.net/images/page/265766/langgraph-16.jpeg)

Sort results by reliability when running nodes in parallel.

**Reference**

* `b` : reliability=0.1
* `c` : reliability=0.9
* `d` : reliability=0.5

```python
# Run the graph (specified as which: bc)
graph.invoke({"aggregate": [], "which": "bc", "fanout_values": []})
```

```
 Adding I'm A to [] 
Adding I'm B to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='982b6f35-da51-4cdd-b967-ab666372f4 
Adding I'm C to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='982b6f35-da51-4cdd-b967-ab666372f4 
[{'value': ["I'm C"],'reliability': 0.9}, {'value': ["I'm B"],'reliability': 0.1}] 
```

```
 {'aggregate': [HumanMessage (content=" I'm A", additional_kwargs={}, respons={}'982b6f35-da51-4cdd-b967-ab66 id='b9ef5354-3d34-4410-a803-ec3f8dd38be1'), HumanMessage (content="I'm E", additional_kwargs={}, response_metadata={}, id= 
```

```python
# Run the graph (specified as which: cd)
graph.invoke({"aggregate": [], "which": "cd"})
```

```
 Adding I'm A to [] 
Adding I'm C to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='ec7e7b0d-0c24-4b58-b2c9-8cf1bd9bb3 
Adding I'm D to [HumanMessage (content=" I'm A", additional_kwargs={}, response_metadata={}, id='ec7e7b0d-0c24-4b58-b2c9-8cf1bd9bb3 
[{'value': ["I'm C"],'reliability': 0.9}, {'value': ["I'm D"],'reliability': 0.5}] 
```

```
 {'aggregate': [HumanMessage (content=" I'm A", additional_kwargs={}, respons={}'ec7e7b0d-0c24-4b58-b2c9-8cf1 id='d9b8bc8b-33c4-4e87-841f-0a90498ca22e'), HumanMessage (content="I'm E", additional_kwargs={}, response_metadata={}, id< 
```
