Initial commit: TPP Wholesale MCP server

MCP server wrapping the TPP Wholesale domain registrar HTTP API (v2.7.4).
Provides 13 MCP tools for domain registration, DNS management, renewals,
transfers, contact management, and account queries.

Features:
- Async TPP API client with automatic session management
- FastMCP server with Streamable HTTP transport
- Docker deployment with docker-compose
- Full .com.au eligibility support
- REST health check and balance endpoints

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
mineracks 2026-04-21 21:19:54 +10:00
commit aa397c585e
9 changed files with 1250 additions and 0 deletions

16
.env.example Normal file
View File

@ -0,0 +1,16 @@
# TPP Wholesale API credentials
# Get these from The Console → Account Settings → API Preferences
TPP_ACCOUNT_NO=your_account_number
TPP_USER_ID=your_api_key
TPP_PASSWORD=your_secret_key
# MCP host validation — comma-separated list of allowed Host headers
# Add your Cloudflare Tunnel hostname and/or Tailscale IP here
TPP_ALLOWED_HOSTS=localhost:8089,127.0.0.1:8089
# Optional: API key for the REST endpoints (X-Api-Key header)
# Leave empty to disable REST API auth
TPP_API_KEY=
# Port to listen on
TPP_PORT=8089

11
.gitignore vendored Normal file
View File

@ -0,0 +1,11 @@
__pycache__/
*.py[cod]
*$py.class
*.egg-info/
dist/
build/
.env
*.egg
.venv/
venv/
data/

12
Dockerfile Normal file
View File

@ -0,0 +1,12 @@
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY app/ ./
EXPOSE 8089
CMD ["python", "server.py"]

161
README.md Normal file
View File

@ -0,0 +1,161 @@
# tppwholesale-mcp
MCP (Model Context Protocol) server for the [TPP Wholesale](https://www.tppwholesale.com.au/) domain registrar API. Exposes domain registration, DNS management, renewals, transfers, and account queries as MCP tools that can be used by AI assistants like Claude.
## Features
- **Domain availability** — check if domains are available across 1,200+ TLDs
- **Domain registration** — register new domains with full .au eligibility support
- **Domain renewal** — renew existing registrations
- **Domain transfer** — transfer domains from other registrars
- **Nameserver management** — add/remove nameservers on any domain
- **Domain lock** — lock/unlock domains to prevent unauthorized transfers
- **Contact management** — create and manage WHOIS contacts
- **Account balance** — check prepaid account balance
- **Order tracking** — check status of pending registrations/renewals/transfers
- **Domain listing** — list and filter all domains in the account
## Quick start
### 1. Get API credentials
Log in to [The Console](https://theconsole.tppwholesale.com.au/) and go to **Account Settings > API Preferences** to find your:
- Account Number
- API Key (User ID)
- Secret Key (Password)
### 2. Configure
```bash
cp .env.example .env
# Edit .env with your credentials
```
### 3. Run with Docker
```bash
docker compose up -d
```
The MCP server will be available at `http://localhost:8089/mcp/mcp`.
### 4. Connect to Claude Code
Add to your Claude Code MCP config (`.claude.json` or project settings):
```json
{
"mcpServers": {
"tppwholesale": {
"type": "streamable-http",
"url": "http://localhost:8089/mcp/mcp"
}
}
}
```
## MCP Tools
| Tool | Description |
|------|-------------|
| `check_domain_availability` | Check if domains are available for registration |
| `register_domain` | Register a new domain name |
| `create_contact` | Create a WHOIS contact for use in registrations |
| `renew_domain` | Renew an existing domain |
| `get_domain_details` | Get full details (contacts, nameservers, status) |
| `list_domains` | List and filter domains in the account |
| `update_nameservers` | Add or remove nameservers on a domain |
| `set_domain_lock` | Lock or unlock a domain |
| `check_order_status` | Check status of a pending order |
| `get_account_balance` | Get prepaid account balance (AUD) |
| `get_renewal_info` | Check renewal status and expiry date |
| `check_transfer` | Check if a domain can be transferred in |
| `transfer_domain` | Request a domain transfer |
## .au domain registration
Australian domains (.com.au, .net.au, .org.au, etc.) require additional eligibility fields:
| Field | Description | Example |
|-------|-------------|---------|
| `registrant_name` | Legal entity name | "Mineracks Pty Ltd" |
| `registrant_id` | ABN or ACN | "54109565095" |
| `registrant_id_type` | ID type code | "ABN" or "ACN" |
| `eligibility_type` | Why the registrant is eligible | "Company" |
| `eligibility_reason` | Relationship to domain | "Exact match" |
### Registrant ID types
- `ABN` — Australian Business Number
- `ACN` — Australian Company Number
- `TM` — Trademark Number
- `OTHER` — Other
### Eligibility types
- `Company` — Registered company
- `Sole Trader` — Sole trader
- `Partnership` — Partnership
- `Registered Business` — Registered business name
- `Trust` — Trust
- `Government Entity` — Government body
- `Other` — Other entity type
## REST API
In addition to MCP, two REST endpoints are available:
| Endpoint | Method | Description |
|----------|--------|-------------|
| `/api/health` | GET | Health check |
| `/api/balance` | GET | Account balance (requires `X-Api-Key` header if `TPP_API_KEY` is set) |
## Architecture
```
Claude Code / AI Assistant
▼ (MCP Streamable HTTP)
┌─────────────────────┐
│ tppwholesale-mcp │
│ (FastMCP + httpx) │
│ Port 8089 │
└─────────┬───────────┘
│ HTTPS
┌─────────────────────┐
│ TPP Wholesale API │
│ theconsole. │
│ tppwholesale.com.au │
└─────────────────────┘
```
## Environment variables
| Variable | Required | Description |
|----------|----------|-------------|
| `TPP_ACCOUNT_NO` | Yes | TPP Wholesale account number |
| `TPP_USER_ID` | Yes | API key from The Console |
| `TPP_PASSWORD` | Yes | Secret key from The Console |
| `TPP_ALLOWED_HOSTS` | No | Comma-separated allowed Host headers (default: `localhost:8089,127.0.0.1:8089`) |
| `TPP_API_KEY` | No | Optional API key for REST endpoints |
| `TPP_PORT` | No | Port to listen on (default: `8089`) |
## Development
```bash
# Install dependencies
pip install -r requirements.txt
# Run directly
TPP_ACCOUNT_NO=xxx TPP_USER_ID=xxx TPP_PASSWORD=xxx python app/server.py
# Run tests (TODO)
pytest tests/
```
## API reference
This MCP server wraps the [TPP Wholesale HTTP API v2.7.4](https://www.tppwholesale.com.au/api/). The full API specification is available as a [PDF download](https://www.tppwholesale.com.au/wp-content/uploads/2023/10/2.7.4_TPPW_HTTP_API_Domain_Specs.pdf).
## License
MIT

0
app/__init__.py Normal file
View File

469
app/server.py Normal file
View File

@ -0,0 +1,469 @@
"""TPP Wholesale MCP Server.
Exposes TPP Wholesale domain registrar operations as MCP tools.
Supports both Streamable HTTP and SSE transports.
"""
import os
import json
import logging
from contextlib import asynccontextmanager
from mcp.server.fastmcp import FastMCP
from mcp.server.fastmcp.utilities.types import TransportSecuritySettings
from tpp_client import TPPClient, TPPError
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("tppwholesale-mcp")
# --- Configuration -----------------------------------------------------------
ACCOUNT_NO = os.environ.get("TPP_ACCOUNT_NO", "")
USER_ID = os.environ.get("TPP_USER_ID", "")
PASSWORD = os.environ.get("TPP_PASSWORD", "")
ALLOWED_HOSTS = os.environ.get(
"TPP_ALLOWED_HOSTS",
"localhost:8089,127.0.0.1:8089",
).split(",")
if not all([ACCOUNT_NO, USER_ID, PASSWORD]):
logger.warning(
"TPP_ACCOUNT_NO, TPP_USER_ID, and TPP_PASSWORD must be set. "
"The server will start but all tools will fail."
)
# --- MCP Server --------------------------------------------------------------
mcp = FastMCP(
"TPP Wholesale Domain Registrar",
transport_security=TransportSecuritySettings(
allowed_hosts=[h.strip() for h in ALLOWED_HOSTS],
),
)
_client: TPPClient | None = None
def _get_client() -> TPPClient:
global _client
if _client is None:
_client = TPPClient(ACCOUNT_NO, USER_ID, PASSWORD)
return _client
def _error_response(e: Exception) -> str:
if isinstance(e, TPPError):
return json.dumps({"error": True, "code": e.code, "message": e.message})
return json.dumps({"error": True, "message": str(e)})
# =============================================================================
# MCP Tools
# =============================================================================
@mcp.tool()
async def check_domain_availability(domains: str) -> str:
"""Check if one or more domains are available for registration.
Args:
domains: Comma-separated list of fully qualified domain names
(e.g. "example.com,example.com.au,example.net")
"""
try:
domain_list = [d.strip() for d in domains.split(",") if d.strip()]
client = _get_client()
results = await client.check_availability(domain_list)
return json.dumps(results, indent=2)
except Exception as e:
return _error_response(e)
@mcp.tool()
async def register_domain(
domain: str,
period: int,
owner_contact_id: str,
admin_contact_id: str = "",
tech_contact_id: str = "",
billing_contact_id: str = "",
nameservers: str = "",
registrant_name: str = "",
registrant_id: str = "",
registrant_id_type: str = "",
eligibility_type: str = "",
eligibility_reason: str = "",
) -> str:
"""Register a new domain name.
For .com.au domains, the registrant_name, registrant_id,
registrant_id_type, eligibility_type, and eligibility_reason
fields are mandatory.
Args:
domain: Fully qualified domain name (e.g. "example.com.au")
period: Registration period in years
owner_contact_id: Contact ID for domain owner (from create_contact)
admin_contact_id: Contact ID for admin (defaults to owner)
tech_contact_id: Contact ID for tech (defaults to owner)
billing_contact_id: Contact ID for billing (defaults to owner)
nameservers: Comma-separated nameservers (e.g. "ns1.cloudflare.com,ns2.cloudflare.com")
registrant_name: Legal entity name (required for .au)
registrant_id: ABN/ACN of the legal entity (required for .au)
registrant_id_type: ID type code ABN, ACN, TM etc (required for .au)
eligibility_type: Eligibility type code (required for .au)
eligibility_reason: Eligibility reason code (required for .au)
"""
try:
hosts = [h.strip() for h in nameservers.split(",") if h.strip()] if nameservers else None
client = _get_client()
order_id = await client.register_domain(
domain=domain,
period=period,
owner_contact_id=owner_contact_id,
admin_contact_id=admin_contact_id or None,
tech_contact_id=tech_contact_id or None,
billing_contact_id=billing_contact_id or None,
hosts=hosts,
registrant_name=registrant_name or None,
registrant_id=registrant_id or None,
registrant_id_type=registrant_id_type or None,
eligibility_type=eligibility_type or None,
eligibility_reason=eligibility_reason or None,
)
return json.dumps({
"success": True,
"order_id": order_id,
"domain": domain,
"message": f"Domain {domain} registration submitted. Use check_order_status to track progress.",
}, indent=2)
except Exception as e:
return _error_response(e)
@mcp.tool()
async def create_contact(
first_name: str,
last_name: str,
email: str,
phone: str,
address1: str,
city: str,
region: str,
postal_code: str,
country_code: str,
organisation: str = "",
address2: str = "",
) -> str:
"""Create a contact object for use in domain registrations.
Returns a contact ID that can be used as owner/admin/tech/billing
contact when registering domains.
Args:
first_name: Contact first name
last_name: Contact last name
email: Contact email address
phone: Phone number (international format, e.g. "+61.7XXXXXXXX")
address1: Street address line 1
city: City
region: State or region (e.g. "QLD")
postal_code: Postal/ZIP code
country_code: 2-letter country code (e.g. "AU")
organisation: Organisation name (optional)
address2: Street address line 2 (optional)
"""
try:
client = _get_client()
contact_id = await client.create_contact(
first_name=first_name,
last_name=last_name,
email=email,
phone=phone,
address1=address1,
city=city,
region=region,
postal_code=postal_code,
country_code=country_code,
organisation=organisation,
address2=address2,
)
return json.dumps({
"success": True,
"contact_id": contact_id,
"message": "Contact created. Use this ID for domain registrations.",
}, indent=2)
except Exception as e:
return _error_response(e)
@mcp.tool()
async def renew_domain(domain: str, period: int = 1) -> str:
"""Renew an existing domain registration.
Args:
domain: Fully qualified domain name to renew
period: Renewal period in years (default: 1)
"""
try:
client = _get_client()
order_id = await client.renew_domain(domain, period)
return json.dumps({
"success": True,
"order_id": order_id,
"domain": domain,
"period": period,
"message": f"Domain {domain} renewal submitted for {period} year(s).",
}, indent=2)
except Exception as e:
return _error_response(e)
@mcp.tool()
async def get_domain_details(domain: str) -> str:
"""Get full details for a domain including contacts, nameservers, and status.
Args:
domain: Fully qualified domain name
"""
try:
client = _get_client()
details = await client.get_domain_details(domain)
return json.dumps(details, indent=2)
except Exception as e:
return _error_response(e)
@mcp.tool()
async def list_domains(
search: str = "",
tld: str = "",
active_only: bool = True,
expires_before: str = "",
expires_after: str = "",
) -> str:
"""List domains in the TPP Wholesale account.
Args:
search: Filter by domain name substring (optional)
tld: Filter by TLD, e.g. ".com.au" (optional)
active_only: Only show active domains (default: true)
expires_before: Filter by expiry date, format YYYY-MM-DD (optional)
expires_after: Filter by expiry date, format YYYY-MM-DD (optional)
"""
try:
client = _get_client()
domains = await client.list_domains(
search=search or None,
tld=tld or None,
active_only=active_only,
expires_before=expires_before or None,
expires_after=expires_after or None,
)
return json.dumps({
"count": len(domains),
"domains": domains,
}, indent=2)
except Exception as e:
return _error_response(e)
@mcp.tool()
async def update_nameservers(
domain: str,
add: str = "",
remove: str = "",
) -> str:
"""Update nameservers on a domain.
Args:
domain: Fully qualified domain name
add: Comma-separated nameservers to add (e.g. "ns1.cloudflare.com,ns2.cloudflare.com")
remove: Comma-separated nameservers to remove
"""
try:
add_hosts = [h.strip() for h in add.split(",") if h.strip()] if add else None
remove_hosts = [h.strip() for h in remove.split(",") if h.strip()] if remove else None
if not add_hosts and not remove_hosts:
return json.dumps({"error": True, "message": "Provide at least one nameserver to add or remove."})
client = _get_client()
result = await client.update_hosts(domain, add_hosts, remove_hosts)
return json.dumps({
"success": True,
"domain": domain,
"added": add_hosts or [],
"removed": remove_hosts or [],
"order_id": result,
}, indent=2)
except Exception as e:
return _error_response(e)
@mcp.tool()
async def set_domain_lock(domain: str, lock: bool) -> str:
"""Lock or unlock a domain to prevent unauthorized transfers.
Args:
domain: Fully qualified domain name
lock: True to lock, False to unlock
"""
try:
client = _get_client()
result = await client.set_domain_lock(domain, lock)
return json.dumps({
"success": True,
"domain": domain,
"locked": lock,
"order_id": result,
}, indent=2)
except Exception as e:
return _error_response(e)
@mcp.tool()
async def check_order_status(order_id: str) -> str:
"""Check the status of a domain order (registration, renewal, transfer).
Args:
order_id: The order ID returned from register/renew/transfer operations
"""
try:
client = _get_client()
status = await client.get_order_status(order_id)
return json.dumps(status, indent=2)
except Exception as e:
return _error_response(e)
@mcp.tool()
async def get_account_balance() -> str:
"""Get the current prepaid account balance in AUD."""
try:
client = _get_client()
balance = await client.get_account_balance()
return json.dumps({
"balance_aud": balance,
"message": f"Account balance: ${balance:.2f} AUD",
}, indent=2)
except Exception as e:
return _error_response(e)
@mcp.tool()
async def get_renewal_info(domain: str) -> str:
"""Check renewal status and expiry date for a domain.
Args:
domain: Fully qualified domain name
"""
try:
client = _get_client()
info = await client.get_renewal_info(domain)
return json.dumps(info, indent=2)
except Exception as e:
return _error_response(e)
@mcp.tool()
async def check_transfer(domain: str, password: str = "") -> str:
"""Check if a domain can be transferred from another registrar.
Args:
domain: Fully qualified domain name
password: Domain auth/EPP code (required for some TLDs)
"""
try:
client = _get_client()
result = await client.check_transfer(domain, password or None)
return json.dumps(result, indent=2)
except Exception as e:
return _error_response(e)
@mcp.tool()
async def transfer_domain(
domain: str,
period: int,
domain_password: str,
owner_contact_id: str,
admin_contact_id: str = "",
tech_contact_id: str = "",
billing_contact_id: str = "",
) -> str:
"""Request a domain transfer from another registrar.
Args:
domain: Fully qualified domain name to transfer
period: Renewal period in years (included with transfer)
domain_password: EPP/auth code from current registrar
owner_contact_id: Contact ID for domain owner
admin_contact_id: Contact ID for admin (defaults to owner)
tech_contact_id: Contact ID for tech (defaults to owner)
billing_contact_id: Contact ID for billing (defaults to owner)
"""
try:
client = _get_client()
order_id = await client.transfer_domain(
domain=domain,
period=period,
domain_password=domain_password,
owner_contact_id=owner_contact_id,
admin_contact_id=admin_contact_id or None,
tech_contact_id=tech_contact_id or None,
billing_contact_id=billing_contact_id or None,
)
return json.dumps({
"success": True,
"order_id": order_id,
"domain": domain,
"message": f"Transfer request submitted for {domain}. Transfers can take up to 9 days.",
}, indent=2)
except Exception as e:
return _error_response(e)
# =============================================================================
# REST API endpoints (non-MCP, for direct HTTP access)
# =============================================================================
from starlette.requests import Request
from starlette.responses import JSONResponse
app = mcp.get_app()
@app.get("/api/health")
async def health(request: Request) -> JSONResponse:
"""Health check endpoint."""
configured = all([ACCOUNT_NO, USER_ID, PASSWORD])
return JSONResponse({
"status": "ok" if configured else "unconfigured",
"service": "tppwholesale-mcp",
"configured": configured,
})
@app.get("/api/balance")
async def api_balance(request: Request) -> JSONResponse:
"""Quick balance check via REST."""
api_key = request.headers.get("x-api-key", "")
expected_key = os.environ.get("TPP_API_KEY", "")
if expected_key and api_key != expected_key:
return JSONResponse({"error": "Unauthorized"}, status_code=401)
try:
client = _get_client()
balance = await client.get_account_balance()
return JSONResponse({"balance_aud": balance})
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("TPP_PORT", "8089"))
uvicorn.run(app, host="0.0.0.0", port=port)

566
app/tpp_client.py Normal file
View File

@ -0,0 +1,566 @@
"""TPP Wholesale HTTP API client.
Wraps the TPP Wholesale domain registrar API (v2.7.4).
API docs: https://www.tppwholesale.com.au/api/
All methods are async and handle session management automatically.
Sessions expire after 15 minutes of inactivity the client
re-authenticates transparently when needed.
"""
import asyncio
import time
import urllib.parse
from dataclasses import dataclass, field
from typing import Any
import httpx
BASE_URL = "https://theconsole.tppwholesale.com.au/api"
@dataclass
class TPPSession:
session_id: str
created_at: float = field(default_factory=time.time)
last_used: float = field(default_factory=time.time)
@property
def is_expired(self) -> bool:
# TPP sessions expire after 15 min inactivity; refresh at 12 min
return (time.time() - self.last_used) > 720
def touch(self) -> None:
self.last_used = time.time()
class TPPError(Exception):
"""Raised when the TPP API returns an ERR response."""
def __init__(self, code: str, message: str):
self.code = code
self.message = message
super().__init__(f"TPP API error {code}: {message}")
class TPPClient:
"""Async client for the TPP Wholesale HTTP API."""
def __init__(self, account_no: str, user_id: str, password: str):
self._account_no = account_no
self._user_id = user_id
self._password = password
self._session: TPPSession | None = None
self._lock = asyncio.Lock()
self._http = httpx.AsyncClient(timeout=30.0)
# ------------------------------------------------------------------
# Session management
# ------------------------------------------------------------------
async def _ensure_session(self) -> str:
async with self._lock:
if self._session is None or self._session.is_expired:
self._session = await self._authenticate()
self._session.touch()
return self._session.session_id
async def _authenticate(self) -> TPPSession:
params = {
"AccountNo": self._account_no,
"UserId": self._user_id,
"Password": self._password,
}
resp = await self._http.get(f"{BASE_URL}/auth.pl", params=params)
resp.raise_for_status()
body = resp.text.strip()
if body.startswith("OK:"):
sid = body.split(":", 1)[1].strip()
return TPPSession(session_id=sid)
self._raise_error(body)
# ------------------------------------------------------------------
# Low-level request helpers
# ------------------------------------------------------------------
async def _order(self, params: dict[str, Any]) -> str:
"""POST to order.pl and return the raw response body."""
sid = await self._ensure_session()
params["SessionID"] = sid
params["Type"] = "Domains"
resp = await self._http.post(f"{BASE_URL}/order.pl", data=params)
resp.raise_for_status()
body = resp.text.strip()
if body.startswith("ERR:"):
self._raise_error(body)
return body
async def _query(self, params: dict[str, Any]) -> str:
"""GET from query.pl and return the raw response body."""
sid = await self._ensure_session()
params["SessionID"] = sid
params["Type"] = "Domains"
resp = await self._http.get(f"{BASE_URL}/query.pl", params=params)
resp.raise_for_status()
body = resp.text.strip()
if body.startswith("ERR:"):
self._raise_error(body)
return body
@staticmethod
def _raise_error(body: str) -> None:
"""Parse 'ERR: code,description' and raise TPPError."""
err_part = body.split(":", 1)[1].strip() if ":" in body else body
parts = err_part.split(",", 1)
code = parts[0].strip()
msg = parts[1].strip() if len(parts) > 1 else "Unknown error"
raise TPPError(code, msg)
# ------------------------------------------------------------------
# Domain availability & search
# ------------------------------------------------------------------
async def check_availability(self, domains: list[str]) -> list[dict]:
"""Check availability of one or more domains.
Returns a list of dicts with keys:
domain, available, min_years, max_years, error
"""
# TPP supports multiple Domain params
params: dict[str, Any] = {
"Object": "Domain",
"Action": "Availability",
}
# Build query string manually for multiple Domain params
sid = await self._ensure_session()
qs_parts = [
f"SessionID={sid}",
"Type=Domains",
"Object=Domain",
"Action=Availability",
]
for d in domains:
qs_parts.append(f"Domain={urllib.parse.quote(d)}")
resp = await self._http.get(f"{BASE_URL}/query.pl?{'&'.join(qs_parts)}")
resp.raise_for_status()
results = []
for line in resp.text.strip().splitlines():
line = line.strip()
if not line:
continue
# Format: "domain.com: OK: Minimum=2&Maximum=5"
# or: "domain.com: ERR: 304,Domain is not available"
domain_part, _, rest = line.partition(":")
domain = domain_part.strip()
rest = rest.strip()
if rest.startswith("OK:"):
detail = rest.split(":", 1)[1].strip()
kv = dict(p.split("=", 1) for p in detail.split("&") if "=" in p)
results.append({
"domain": domain,
"available": True,
"min_years": int(kv.get("Minimum", 1)),
"max_years": int(kv.get("Maximum", 1)),
})
elif rest.startswith("ERR:"):
err = rest.split(":", 1)[1].strip()
code, _, msg = err.partition(",")
results.append({
"domain": domain,
"available": False,
"error_code": code.strip(),
"error": msg.strip() or "Domain not available",
})
else:
results.append({"domain": domain, "available": False, "error": rest})
return results
# ------------------------------------------------------------------
# Domain registration
# ------------------------------------------------------------------
async def register_domain(
self,
domain: str,
period: int,
owner_contact_id: str,
admin_contact_id: str | None = None,
tech_contact_id: str | None = None,
billing_contact_id: str | None = None,
hosts: list[str] | None = None,
# .au eligibility fields
registrant_name: str | None = None,
registrant_id: str | None = None,
registrant_id_type: str | None = None,
eligibility_type: str | None = None,
eligibility_reason: str | None = None,
eligibility_name: str | None = None,
eligibility_id: str | None = None,
eligibility_id_type: str | None = None,
) -> str:
"""Register a new domain. Returns the order ID."""
params: dict[str, Any] = {
"Object": "Domain",
"Action": "Create",
"Domain": domain,
"Period": str(period),
"OwnerContactID": owner_contact_id,
"AdministrationContactID": admin_contact_id or owner_contact_id,
"TechnicalContactID": tech_contact_id or owner_contact_id,
"BillingContactID": billing_contact_id or owner_contact_id,
}
if hosts:
for h in hosts:
# Multiple Host params — we need to handle this specially
pass # handled below
# .au eligibility
if registrant_name:
params["RegistrantName"] = registrant_name
if registrant_id:
params["RegistrantID"] = registrant_id
if registrant_id_type:
params["RegistrantIDType"] = registrant_id_type
if eligibility_type:
params["EligibilityType"] = eligibility_type
if eligibility_reason:
params["EligibilityReason"] = eligibility_reason
if eligibility_name:
params["EligibilityName"] = eligibility_name
if eligibility_id:
params["EligibilityID"] = eligibility_id
if eligibility_id_type:
params["EligibilityIDType"] = eligibility_id_type
# Handle multiple Host params via manual POST
sid = await self._ensure_session()
params["SessionID"] = sid
params["Type"] = "Domains"
# Build form data with potential duplicate Host keys
form_pairs = [(k, v) for k, v in params.items()]
if hosts:
for h in hosts:
form_pairs.append(("Host", h))
resp = await self._http.post(
f"{BASE_URL}/order.pl",
content=urllib.parse.urlencode(form_pairs),
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
resp.raise_for_status()
body = resp.text.strip()
if body.startswith("OK:"):
return body.split(":", 1)[1].strip()
self._raise_error(body)
# ------------------------------------------------------------------
# Domain renewal
# ------------------------------------------------------------------
async def renew_domain(self, domain: str, period: int = 1) -> str:
"""Renew a domain. Returns the order ID."""
body = await self._order({
"Object": "Domain",
"Action": "Renewal",
"Domain": domain,
"Period": str(period),
})
if body.startswith("OK:"):
return body.split(":", 1)[1].strip()
return body
# ------------------------------------------------------------------
# Domain transfer
# ------------------------------------------------------------------
async def check_transfer(self, domain: str, password: str | None = None) -> dict:
"""Check if a domain can be transferred in."""
params: dict[str, Any] = {
"Object": "Domain",
"Action": "Transfer",
"Domain": domain,
}
if password:
params["DomainPassword"] = password
body = await self._query(params)
# "domain.com: OK: Minimum=1&Maximum=10&Owner=..."
_, _, rest = body.partition(":")
rest = rest.strip()
if rest.startswith("OK:"):
detail = rest.split(":", 1)[1].strip()
kv = dict(p.split("=", 1) for p in detail.split("&") if "=" in p)
return {
"domain": domain,
"transferable": True,
"min_years": int(kv.get("Minimum", 0)),
"max_years": int(kv.get("Maximum", 0)),
"owner": kv.get("Owner", ""),
"owner_email": kv.get("OwnerEmail", ""),
"expiry_date": kv.get("ExpiryDate", ""),
"requires_password": kv.get("RequirePassword") == "1",
}
return {"domain": domain, "transferable": False, "error": rest}
async def transfer_domain(
self,
domain: str,
period: int,
domain_password: str,
owner_contact_id: str,
admin_contact_id: str | None = None,
tech_contact_id: str | None = None,
billing_contact_id: str | None = None,
) -> str:
"""Request a domain transfer. Returns the order ID."""
body = await self._order({
"Object": "Domain",
"Action": "TransferRequest",
"Domain": domain,
"Period": str(period),
"DomainPassword": domain_password,
"OwnerContactID": owner_contact_id,
"AdministrationContactID": admin_contact_id or owner_contact_id,
"TechnicalContactID": tech_contact_id or owner_contact_id,
"BillingContactID": billing_contact_id or owner_contact_id,
})
if body.startswith("OK:"):
return body.split(":", 1)[1].strip()
return body
# ------------------------------------------------------------------
# Contact management
# ------------------------------------------------------------------
async def create_contact(
self,
first_name: str,
last_name: str,
email: str,
phone: str,
address1: str,
city: str,
region: str,
postal_code: str,
country_code: str,
organisation: str = "",
address2: str = "",
address3: str = "",
fax: str = "",
) -> str:
"""Create a contact object. Returns the contact ID."""
params: dict[str, Any] = {
"Object": "Contact",
"Action": "Create",
"OrganisationName": organisation,
"FirstName": first_name,
"LastName": last_name,
"Address1": address1,
"City": city,
"Region": region,
"PostalCode": postal_code,
"CountryCode": country_code,
"PhoneNumber": phone,
"Email": email,
}
if address2:
params["Address2"] = address2
if address3:
params["Address3"] = address3
if fax:
params["FaxNumber"] = fax
body = await self._order(params)
if body.startswith("OK:"):
return body.split(":", 1)[1].strip()
return body
# ------------------------------------------------------------------
# Host / nameserver management
# ------------------------------------------------------------------
async def update_hosts(
self, domain: str, add_hosts: list[str] | None = None,
remove_hosts: list[str] | None = None
) -> str:
"""Add or remove nameservers on a domain."""
sid = await self._ensure_session()
form_pairs = [
("SessionID", sid),
("Type", "Domains"),
("Object", "Domain"),
("Action", "UpdateHosts"),
("Domain", domain),
]
if add_hosts:
for h in add_hosts:
form_pairs.append(("AddHost", h))
if remove_hosts:
for h in remove_hosts:
form_pairs.append(("RemoveHost", h))
resp = await self._http.post(
f"{BASE_URL}/order.pl",
content=urllib.parse.urlencode(form_pairs),
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
resp.raise_for_status()
body = resp.text.strip()
if body.startswith("OK:"):
return body.split(":", 1)[1].strip()
if body.startswith("ERR:"):
self._raise_error(body)
return body
# ------------------------------------------------------------------
# Domain lock
# ------------------------------------------------------------------
async def set_domain_lock(self, domain: str, lock: bool) -> str:
"""Lock or unlock a domain."""
body = await self._order({
"Object": "Domain",
"Action": "UpdateDomainLock",
"Domain": domain,
"DomainLock": "1" if lock else "0",
})
if body.startswith("OK:"):
return body.split(":", 1)[1].strip()
return body
# ------------------------------------------------------------------
# Query operations
# ------------------------------------------------------------------
async def get_domain_details(self, domain: str) -> dict:
"""Get full details for a domain."""
body = await self._query({
"Object": "Domain",
"Action": "Details",
"Domain": domain,
})
# Response is OK: followed by key=value lines
result: dict[str, Any] = {"domain": domain}
nameservers = []
lines = body.splitlines()
for line in lines:
line = line.strip()
if line.startswith("OK"):
continue
if "=" in line:
key, _, val = line.partition("=")
key = key.strip()
val = val.strip()
if key == "Nameserver":
nameservers.append(val)
else:
result[key] = val
if nameservers:
result["nameservers"] = nameservers
return result
async def list_domains(
self,
search: str | None = None,
tld: str | None = None,
account: str | None = None,
expires_before: str | None = None,
expires_after: str | None = None,
active_only: bool = True,
) -> list[dict]:
"""List domains in the account.
Returns list of dicts with: domain, account, expiry_date
"""
params: dict[str, Any] = {
"Object": "Domain",
"Action": "List",
}
if search:
params["Domain"] = search
if tld:
params["Tld"] = tld if tld.startswith(".") else f".{tld}"
if account:
params["Account"] = account
if expires_before:
params["ExpiresBefore"] = expires_before
if expires_after:
params["ExpiresAfter"] = expires_after
if active_only:
params["ActiveDomainsOnly"] = "true"
body = await self._query(params)
results = []
for line in body.splitlines():
line = line.strip()
if not line or line.startswith("OK"):
continue
parts = line.split(",")
if len(parts) >= 3:
results.append({
"domain": parts[0].strip(),
"account": parts[1].strip(),
"expiry_date": parts[2].strip(),
})
elif len(parts) == 1 and parts[0]:
results.append({"domain": parts[0].strip()})
return results
async def get_order_status(self, order_id: str) -> dict:
"""Check the status of a domain order."""
body = await self._query({
"Object": "Order",
"Action": "OrderStatus",
"OrderID": order_id,
})
# "12345: Status,Description"
_, _, rest = body.partition(":")
rest = rest.strip()
parts = rest.split(",", 1)
return {
"order_id": order_id,
"status": parts[0].strip(),
"description": parts[1].strip() if len(parts) > 1 else "",
}
async def get_account_balance(self) -> float:
"""Get the prepaid account balance in AUD."""
body = await self._query({"Action": "AccountBalance"})
# "OK: 1234.56"
if body.startswith("OK:"):
return float(body.split(":", 1)[1].strip())
return 0.0
async def get_renewal_info(self, domain: str) -> dict:
"""Check renewal status and expiry for a domain."""
body = await self._query({
"Object": "Domain",
"Action": "Renewal",
"Domain": domain,
})
# "domain.com: OK: Minimum=2&Maximum=5&ExpiryDate=2026-12-01"
_, _, rest = body.partition(":")
rest = rest.strip()
if rest.startswith("OK:"):
detail = rest.split(":", 1)[1].strip()
kv = dict(p.split("=", 1) for p in detail.split("&") if "=" in p)
return {
"domain": domain,
"renewable": True,
"min_years": int(kv.get("Minimum", 1)),
"max_years": int(kv.get("Maximum", 1)),
"expiry_date": kv.get("ExpiryDate", ""),
}
return {"domain": domain, "renewable": False, "error": rest}
# ------------------------------------------------------------------
# Cleanup
# ------------------------------------------------------------------
async def close(self) -> None:
await self._http.aclose()

11
docker-compose.yml Normal file
View File

@ -0,0 +1,11 @@
services:
tppwholesale-mcp:
build: .
container_name: tppwholesale-mcp
restart: unless-stopped
ports:
- "8089:8089"
env_file:
- .env
environment:
- TPP_PORT=8089

4
requirements.txt Normal file
View File

@ -0,0 +1,4 @@
mcp[cli]>=1.9.0
httpx>=0.27.0
uvicorn>=0.30.0
starlette>=0.37.0