A Coding Guide to Build an Autonomous Multi-Agent Logistics System with Route Planning, Dynamic Auctions, and Real-Time Visualization Using Graph-Based Simulation
These articles are AI-generated summaries. Please check the original sources for full details.
Autonomous Multi-Agent Logistics System
This tutorial details building a fully autonomous logistics simulation featuring multiple smart delivery trucks operating within a dynamic city-wide road network. The system utilizes agentic trucks capable of bidding on delivery orders, planning optimal routes, managing battery levels, and maximizing profit through self-interested decision-making.
Why This Matters
Traditional logistics modeling often relies on centralized optimization, which struggles with the complexity and dynamism of real-world scenarios. Agent-based modeling offers a more robust approach by simulating individual decision-making, but can be computationally expensive and difficult to scale. This system demonstrates a practical implementation of agentic AI, addressing the limitations of both centralized and simplistic agent models, and offering a scalable approach to logistics simulation that can be adapted to larger, more complex environments. Failure to address these scaling issues can lead to inaccurate predictions and inefficient resource allocation, costing logistics companies millions annually.
Key Insights
- NetworkX for Graph Representation, 2008: This Python library is used to model the city’s road network as a graph, facilitating route planning and distance calculations.
- Dynamic Auctions for Order Allocation: The system employs a dynamic auction mechanism where agents bid on delivery orders based on profitability and feasibility, mirroring real-world logistics markets.
- Temporal for Agent Coordination: While not directly used in this example, similar multi-agent systems benefit from frameworks like Temporal to manage state and ensure reliable coordination between agents.
Working Example
import networkx as nx
import matplotlib.pyplot as plt
import random
import time
from IPython.display import clear_output
from dataclasses import dataclass, field
from typing import List, Dict, Optional
NUM_NODES = 30
CONNECTION_RADIUS = 0.25
NUM_AGENTS = 5
STARTING_BALANCE = 1000
FUEL_PRICE = 2.0
PAYOUT_MULTIPLIER = 5.0
BATTERY_CAPACITY = 100
CRITICAL_BATTERY = 25
@dataclass
class Order:
id: str
target_node: int
weight_kg: int
payout: float
status: str = "pending"
class AgenticTruck:
def __init__(self, agent_id, start_node, graph, capacity=100):
self.id = agent_id
self.current_node = start_node
self.graph = graph
self.battery = BATTERY_CAPACITY
self.balance = STARTING_BALANCE
self.capacity = capacity
self.state = "IDLE"
self.path: List[int] = []
self.current_order: Optional[Order] = None
self.target_node: int = start_node
def get_path_cost(self, start, end):
try:
length = nx.shortest_path_length(self.graph, start, end, weight='weight')
path = nx.shortest_path(self.graph, start, end, weight='weight')
return length, path
except nx.NetworkXNoPath:
return float('inf'), []
def find_nearest_charger(self):
chargers = [n for n, attr in self.graph.nodes(data=True) if attr.get('type') == 'charger']
best_charger = None
min_dist = float('inf')
best_path = []
for charger in chargers:
dist, path = self.get_path_cost(self.current_node, charger)
if dist < min_dist:
min_dist = dist
best_charger = charger
best_path = path
return best_charger, best_path
def calculate_bid(self, order):
if order.weight_kg > self.capacity:
return float('inf')
if self.state != "IDLE" or self.battery < CRITICAL_BATTERY:
return float('inf')
dist_to_target, _ = self.get_path_cost(self.current_node, order.target_node)
fuel_cost = dist_to_target * FUEL_PRICE
expected_profit = order.payout - fuel_cost
if expected_profit < 10:
return float('inf')
return dist_to_target
def assign_order(self, order):
self.current_order = order
self.state = "MOVING"
self.target_node = order.target_node
_, self.path = self.get_path_cost(self.current_node, self.target_node)
if self.path: self.path.pop(0)
def go_charge(self):
charger_node, path = self.find_nearest_charger()
if charger_node is not None:
self.state = "TO_CHARGER"
self.target_node = charger_node
self.path = path
if self.path: self.path.pop(0)
def step(self):
if self.state == "IDLE" and self.battery < CRITICAL_BATTERY:
self.go_charge()
if self.state == "CHARGING":
self.battery += 10
self.balance -= 5
if self.battery >= 100:
self.battery = 100
self.state = "IDLE"
return
if self.path:
next_node = self.path[0]
edge_data = self.graph.get_edge_data(self.current_node, next_node)
distance = edge_data['weight']
self.current_node = next_node
self.path.pop(0)
self.battery -= (distance * 2)
self.balance -= (distance * FUEL_PRICE)
if not self.path:
if self.state == "MOVING":
self.balance += self.current_order.payout
self.current_order.status = "completed"
self.current_order = None
self.state = "IDLE"
elif self.state == "TO_CHARGER":
self.state = "CHARGING"
class Simulation:
def __init__(self):
self.setup_graph()
self.setup_agents()
self.orders = []
self.order_count = 0
def setup_graph(self):
self.G = nx.random_geometric_graph(NUM_NODES, CONNECTION_RADIUS)
for (u, v) in self.G.edges():
self.G.edges[u, v]['weight'] = random.uniform(1.0, 3.0)
for i in self.G.nodes():
r = random.random()
if r < 0.15:
self.G.nodes[i]['type'] = 'charger'
self.G.nodes[i]['color'] = 'red'
else:
self.G.nodes[i]['type'] = 'house'
self.G.nodes[i]['color'] = '#A0CBE2'
def setup_agents(self):
self.agents = []
for i in range(NUM_AGENTS):
start_node = random.randint(0, NUM_NODES-1)
cap = random.choice([50, 100, 200])
self.agents.append(AgenticTruck(i, start_node, self.G, capacity=cap))
def generate_order(self):
target = random.randint(0, NUM_NODES-1)
weight = random.randint(10, 120)
payout = random.randint(50, 200)
order = Order(id=f"ORD-{self.order_count}", target_node=target, weight_kg=weight, payout=payout)
self.orders.append(order)
self.order_count += 1
return order
def run_market(self):
for order in self.orders:
if order.status == "pending":
bids = {agent: agent.calculate_bid(order) for agent in self.agents}
valid_bids = {k: v for k, v in bids.items() if v != float('inf')}
if valid_bids:
winner = min(valid_bids, key=valid_bids.get)
winner.assign_order(order)
order.status = "assigned"
def step(self):
if random.random() < 0.3:
self.generate_order()
self.run_market()
for agent in self.agents:
agent.step()
def visualize(self, step_num):
clear_output(wait=True)
plt.figure(figsize=(10, 8))
pos = nx.get_node_attributes(self.G, 'pos')
node_colors = [self.G.nodes[n]['color'] for n in self.G.nodes()]
nx.draw(self.G, pos, node_color=node_colors, with_labels=True, node_size=300, edge_color='gray', alpha=0.6)
for agent in self.agents:
x, y = pos[agent.current_node]
jitter_x = x + random.uniform(-0.02, 0.02)
jitter_y = y + random.uniform(-0.02, 0.02)
color = 'green' if agent.state == "IDLE" else ('orange' if agent.state == "MOVING" else 'red')
plt.plot(jitter_x, jitter_y, marker='s', markersize=12, color=color, markeredgecolor='black')
plt.text(jitter_x, jitter_y+0.03, f"A{agent.id}\n${int(agent.balance)}\n{int(agent.battery)}%",
fontsize=8, ha='center', fontweight='bold', bbox=dict(facecolor='white', alpha=0.7, pad=1))
for order in self.orders:
if order.status in ["assigned", "pending"]:
ox, oy = pos[order.target_node]
plt.plot(ox, oy, marker='*', markersize=15, color='gold', markeredgecolor='black')
plt.title(f"Graph-Based Logistics Swarm | Step: {step_num}\nRed Nodes = Chargers | Gold Stars = Orders", fontsize=14)
plt.show()
# Run the simulation
print("Initializing Advanced Simulation...")
sim = Simulation()
for t in range(60):
sim.step()
sim.visualize(t)
time.sleep(0.5)
print("Simulation Finished.")
Practical Applications
- Use Case: Amazon utilizes similar agent-based systems to optimize delivery routes and manage its fleet of vehicles in real-time, adjusting to traffic and demand fluctuations.
- Pitfall: Overly complex reward functions can lead to unintended agent behaviors, such as prioritizing short-term gains over long-term efficiency or neglecting critical tasks like battery recharging.
References:
Continue reading
Next article
Tesla FSD Navigates San Francisco Blackout While Waymo Falters
Related Content
A Coding Guide to Design and Orchestrate Advanced ReAct-Based Multi-Agent Workflows with AgentScope and OpenAI
This tutorial demonstrates building a multi-agent incident response system using AgentScope, achieving complex workflows in pure Python.
Building Multi-Agent Data Analysis Pipelines with Google ADK
Learn to build a modular multi-agent system using Google ADK to automate data ingestion, statistical modeling, and visualization in Python. This tutorial demonstrates orchestrating five specialized agents to perform Shapiro-Wilk tests and ANOVA, significantly reducing manual analysis time in production-grade pipelines.
How to Design an Advanced Multi-Agent Reasoning System with spaCy Featuring Planning, Reflection, Memory, and Knowledge Graphs
Build a multi-agent AI system with spaCy that extracts entities, constructs knowledge graphs, and learns from experience using reflection and memory modules.