|
| 1 | +#!/usr/bin/env python3 |
| 2 | +""" |
| 3 | +This is a MCP server for power system analysis with GridCal and OpenDSS |
| 4 | +Converts REST API endpoints to MCP tools |
| 5 | +""" |
| 6 | +import time |
| 7 | +import matplotlib.pyplot as plt |
| 8 | +import numpy as np |
| 9 | +import GridCalEngine.api as gce |
| 10 | +import dss |
| 11 | +from fastmcp import FastMCP |
| 12 | + |
| 13 | + |
| 14 | +# It's currently buggy to get a session ID before continuing to make tool calls. |
| 15 | +# See https://github.com/jlowin/fastmcp/issues/956 |
| 16 | +# The workaround is to use stateless session currently. |
| 17 | +mcp = FastMCP("egrid-mcp-server", stateless_http=True) |
| 18 | + |
| 19 | +class PowerFlowService: |
| 20 | + def __init__(self): |
| 21 | + print("PowerFlow service initialized") |
| 22 | + |
| 23 | +class HostingCapService: |
| 24 | + def __init__(self): |
| 25 | + self.dss_via_python = dss.DSS |
| 26 | + self.dss_via_python.Start(0) |
| 27 | + self.dss_via_python.AllowForms = True |
| 28 | + print("HostingCap service initialized") |
| 29 | + |
| 30 | + def generate_commands(self, circuit_name: str, frequency: float, demand_mult: list, solar_mult: list, generator_type: str = "generator"): |
| 31 | + """Generate DSS commands with customizable parameters""" |
| 32 | + demand_str = ", ".join(map(str, demand_mult)) |
| 33 | + solar_str = ", ".join(map(str, solar_mult)) |
| 34 | + |
| 35 | + return [ |
| 36 | + 'clear', |
| 37 | + f'set DefaultBaseFrequency={frequency}', |
| 38 | + f'new circuit.{circuit_name} bus1=slack basekv=0.4 pu=1.0 angle=0 frequency={frequency} phases=3', |
| 39 | + 'new line.slack-B1 phases=3 bus1=slack bus2=B1 r1=0.1 x1=0.1 r0=0.05 x0=0.05 length=1', |
| 40 | + 'new line.B1-B2 phases=3 bus1=B1 bus2=B2 r1=0.1 x1=0.1 r0=0.05 x0=0.05 length=1', |
| 41 | + 'new line.B2-B3 phases=3 bus1=B2 bus2=B3 r1=0.1 x1=0.1 r0=0.05 x0=0.05 length=1', |
| 42 | + f'new loadshape.demand npts={len(demand_mult)} interval=1.0 mult={{{demand_str}}}', |
| 43 | + f'new loadshape.solar npts={len(solar_mult)} interval=1.0 mult={{{solar_str}}}', |
| 44 | + 'new load.house phases=1 bus1=B3.1 kv=0.23 kw=1 kvar=0 vmaxpu=1.5 vminpu=0.8 daily=demand', |
| 45 | + f'new {generator_type}.pv_system phases=1 bus1=B3.2 kv=0.23 kw=5 pf=1 vmaxpu=1.5 vminpu=0.8 daily=solar', |
| 46 | + 'reset', |
| 47 | + 'set ControlMode=Time', |
| 48 | + 'set Mode=Daily StepSize=1h Number=1 Time=(0,0)', |
| 49 | + 'set VoltageBases=[0.4]', |
| 50 | + 'calcv', |
| 51 | + ] |
| 52 | + |
| 53 | +# Initialize services |
| 54 | +power_flow_service = PowerFlowService() |
| 55 | +hosting_cap_service = HostingCapService() |
| 56 | + |
| 57 | +@mcp.tool() |
| 58 | +async def run_power_flow(case_name: str) -> str: |
| 59 | + """Run power flow analysis using GridCalEngine""" |
| 60 | + try: |
| 61 | + main_ckt = gce.open_file(case_name) |
| 62 | + results = gce.power_flow(main_ckt) |
| 63 | + |
| 64 | + response = { |
| 65 | + 'Circuit Name': main_ckt.name, |
| 66 | + 'Convergence': str(results.converged), |
| 67 | + 'error': results.error |
| 68 | + } |
| 69 | + |
| 70 | + return f"Power Flow Results:\n{response}" |
| 71 | + except Exception as e: |
| 72 | + return f"Power flow analysis failed: {str(e)}" |
| 73 | + |
| 74 | +@mcp.tool() |
| 75 | +async def run_hosting_capacity_analysis( |
| 76 | + sim_hours: int = 24, |
| 77 | + circuit_name: str = "test_lv_feeder", |
| 78 | + frequency: float = 50.0, |
| 79 | + demand_multipliers: str = "1.0,1.0,1.0,1.0,1.0,1.0,3.0,5.0,3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,5.0,7.0,7.0,5.0,3.0,1.0,1.0", |
| 80 | + solar_multipliers: str = "0.0,0.0,0.0,0.0,0.0,0.0,0.1,0.3,0.5,0.7,0.8,1.0,0.8,0.7,0.5,0.3,0.1,0.0,0.0,0.0,0.0,0.0,0.0,0.0", |
| 81 | + generator_type: str = "generator" |
| 82 | +) -> str: |
| 83 | + """Run DER hosting capacity analysis using OpenDSS with customizable parameters""" |
| 84 | + try: |
| 85 | + start_time = time.time() |
| 86 | + |
| 87 | + # Parse multiplier strings to lists |
| 88 | + demand_mult = [float(x.strip()) for x in demand_multipliers.split(',')] |
| 89 | + solar_mult = [float(x.strip()) for x in solar_multipliers.split(',')] |
| 90 | + |
| 91 | + # Generate and execute DSS commands |
| 92 | + commands = hosting_cap_service.generate_commands(circuit_name, frequency, demand_mult, solar_mult, generator_type) |
| 93 | + for cmd in commands: |
| 94 | + hosting_cap_service.dss_via_python.Text.Command = cmd |
| 95 | + |
| 96 | + data_python = { |
| 97 | + 'PV System': {'element_name': 'generator.pv_system', 'Power (kW)': [], 'Voltage (V)': []}, |
| 98 | + 'House': {'element_name': 'load.house', 'Power (kW)': [], 'Voltage (V)': []} |
| 99 | + } |
| 100 | + |
| 101 | + for t in range(sim_hours): |
| 102 | + hosting_cap_service.dss_via_python.ActiveCircuit.Solution.Solve() |
| 103 | + |
| 104 | + for element in data_python.keys(): |
| 105 | + hosting_cap_service.dss_via_python.ActiveCircuit.SetActiveElement(data_python[element]['element_name']) |
| 106 | + data_python[element]['Power (kW)'].append(hosting_cap_service.dss_via_python.ActiveCircuit.ActiveElement.Powers[0]) |
| 107 | + data_python[element]['Voltage (V)'].append(hosting_cap_service.dss_via_python.ActiveCircuit.ActiveElement.VoltagesMagAng[0]) |
| 108 | + |
| 109 | + elapse_time = time.time() - start_time |
| 110 | + |
| 111 | + # Generate plot |
| 112 | + fig, axes = plt.subplots(2, 2, figsize=(12, 8)) |
| 113 | + fig.subplots_adjust(hspace=0.2, wspace=0.2) |
| 114 | + plt.suptitle('DER Hosting Capacity Analysis') |
| 115 | + |
| 116 | + primary_keys = list(data_python.keys()) |
| 117 | + inner_keys = [key for key in data_python[primary_keys[0]].keys() if key != 'element_name'] |
| 118 | + |
| 119 | + for i, primary_key in enumerate(primary_keys): |
| 120 | + for j, inner_key in enumerate(inner_keys): |
| 121 | + ax = axes[i, j] |
| 122 | + if i == 0: ax.set_title(inner_key, fontsize=12, fontweight='bold') |
| 123 | + if j == 0: ax.set_ylabel(primary_key, rotation=90, fontsize=12, fontweight='bold') |
| 124 | + ax.plot(data_python[primary_key][inner_key]) |
| 125 | + ax.set_xticks(_generate_ticks(sim_hours).tolist()) |
| 126 | + |
| 127 | + plt.savefig("hosting_capacity_analysis.png", dpi=300) |
| 128 | + plt.close() |
| 129 | + |
| 130 | + return f"Hosting Capacity Analysis completed in {elapse_time:.2f} seconds. Results saved to hosting_capacity_analysis.png" |
| 131 | + |
| 132 | + except Exception as e: |
| 133 | + return f"Hosting capacity analysis failed: {str(e)}" |
| 134 | + |
| 135 | +def _generate_ticks(n: int): |
| 136 | + """Generate array with integer ticks""" |
| 137 | + num_intervals = 4 |
| 138 | + step = (n - 1) / num_intervals |
| 139 | + |
| 140 | + if step != int(step): |
| 141 | + n = 1 + int(step) * num_intervals |
| 142 | + step = (n - 1) / num_intervals |
| 143 | + |
| 144 | + return np.arange(1, (int(step)+2) * num_intervals, num_intervals) |
| 145 | + |
| 146 | + |
| 147 | +if __name__ == "__main__": |
| 148 | + # Run with Streamable HTTP (the most recommended way, SSE transport is deprecated) |
| 149 | + mcp.run(transport="streamable-http") |
0 commit comments