MCP server wrapping the TPP Wholesale domain registrar HTTP API (v2.7.4). Provides 13 MCP tools for domain registration, DNS management, renewals, transfers, contact management, and account queries. Features: - Async TPP API client with automatic session management - FastMCP server with Streamable HTTP transport - Docker deployment with docker-compose - Full .com.au eligibility support - REST health check and balance endpoints Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
567 lines
20 KiB
Python
567 lines
20 KiB
Python
"""TPP Wholesale HTTP API client.
|
|
|
|
Wraps the TPP Wholesale domain registrar API (v2.7.4).
|
|
API docs: https://www.tppwholesale.com.au/api/
|
|
|
|
All methods are async and handle session management automatically.
|
|
Sessions expire after 15 minutes of inactivity — the client
|
|
re-authenticates transparently when needed.
|
|
"""
|
|
|
|
import asyncio
|
|
import time
|
|
import urllib.parse
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
import httpx
|
|
|
|
BASE_URL = "https://theconsole.tppwholesale.com.au/api"
|
|
|
|
|
|
@dataclass
|
|
class TPPSession:
|
|
session_id: str
|
|
created_at: float = field(default_factory=time.time)
|
|
last_used: float = field(default_factory=time.time)
|
|
|
|
@property
|
|
def is_expired(self) -> bool:
|
|
# TPP sessions expire after 15 min inactivity; refresh at 12 min
|
|
return (time.time() - self.last_used) > 720
|
|
|
|
def touch(self) -> None:
|
|
self.last_used = time.time()
|
|
|
|
|
|
class TPPError(Exception):
|
|
"""Raised when the TPP API returns an ERR response."""
|
|
|
|
def __init__(self, code: str, message: str):
|
|
self.code = code
|
|
self.message = message
|
|
super().__init__(f"TPP API error {code}: {message}")
|
|
|
|
|
|
class TPPClient:
|
|
"""Async client for the TPP Wholesale HTTP API."""
|
|
|
|
def __init__(self, account_no: str, user_id: str, password: str):
|
|
self._account_no = account_no
|
|
self._user_id = user_id
|
|
self._password = password
|
|
self._session: TPPSession | None = None
|
|
self._lock = asyncio.Lock()
|
|
self._http = httpx.AsyncClient(timeout=30.0)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Session management
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _ensure_session(self) -> str:
|
|
async with self._lock:
|
|
if self._session is None or self._session.is_expired:
|
|
self._session = await self._authenticate()
|
|
self._session.touch()
|
|
return self._session.session_id
|
|
|
|
async def _authenticate(self) -> TPPSession:
|
|
params = {
|
|
"AccountNo": self._account_no,
|
|
"UserId": self._user_id,
|
|
"Password": self._password,
|
|
}
|
|
resp = await self._http.get(f"{BASE_URL}/auth.pl", params=params)
|
|
resp.raise_for_status()
|
|
body = resp.text.strip()
|
|
if body.startswith("OK:"):
|
|
sid = body.split(":", 1)[1].strip()
|
|
return TPPSession(session_id=sid)
|
|
self._raise_error(body)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Low-level request helpers
|
|
# ------------------------------------------------------------------
|
|
|
|
async def _order(self, params: dict[str, Any]) -> str:
|
|
"""POST to order.pl and return the raw response body."""
|
|
sid = await self._ensure_session()
|
|
params["SessionID"] = sid
|
|
params["Type"] = "Domains"
|
|
resp = await self._http.post(f"{BASE_URL}/order.pl", data=params)
|
|
resp.raise_for_status()
|
|
body = resp.text.strip()
|
|
if body.startswith("ERR:"):
|
|
self._raise_error(body)
|
|
return body
|
|
|
|
async def _query(self, params: dict[str, Any]) -> str:
|
|
"""GET from query.pl and return the raw response body."""
|
|
sid = await self._ensure_session()
|
|
params["SessionID"] = sid
|
|
params["Type"] = "Domains"
|
|
resp = await self._http.get(f"{BASE_URL}/query.pl", params=params)
|
|
resp.raise_for_status()
|
|
body = resp.text.strip()
|
|
if body.startswith("ERR:"):
|
|
self._raise_error(body)
|
|
return body
|
|
|
|
@staticmethod
|
|
def _raise_error(body: str) -> None:
|
|
"""Parse 'ERR: code,description' and raise TPPError."""
|
|
err_part = body.split(":", 1)[1].strip() if ":" in body else body
|
|
parts = err_part.split(",", 1)
|
|
code = parts[0].strip()
|
|
msg = parts[1].strip() if len(parts) > 1 else "Unknown error"
|
|
raise TPPError(code, msg)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Domain availability & search
|
|
# ------------------------------------------------------------------
|
|
|
|
async def check_availability(self, domains: list[str]) -> list[dict]:
|
|
"""Check availability of one or more domains.
|
|
|
|
Returns a list of dicts with keys:
|
|
domain, available, min_years, max_years, error
|
|
"""
|
|
# TPP supports multiple Domain params
|
|
params: dict[str, Any] = {
|
|
"Object": "Domain",
|
|
"Action": "Availability",
|
|
}
|
|
# Build query string manually for multiple Domain params
|
|
sid = await self._ensure_session()
|
|
qs_parts = [
|
|
f"SessionID={sid}",
|
|
"Type=Domains",
|
|
"Object=Domain",
|
|
"Action=Availability",
|
|
]
|
|
for d in domains:
|
|
qs_parts.append(f"Domain={urllib.parse.quote(d)}")
|
|
|
|
resp = await self._http.get(f"{BASE_URL}/query.pl?{'&'.join(qs_parts)}")
|
|
resp.raise_for_status()
|
|
|
|
results = []
|
|
for line in resp.text.strip().splitlines():
|
|
line = line.strip()
|
|
if not line:
|
|
continue
|
|
# Format: "domain.com: OK: Minimum=2&Maximum=5"
|
|
# or: "domain.com: ERR: 304,Domain is not available"
|
|
domain_part, _, rest = line.partition(":")
|
|
domain = domain_part.strip()
|
|
rest = rest.strip()
|
|
|
|
if rest.startswith("OK:"):
|
|
detail = rest.split(":", 1)[1].strip()
|
|
kv = dict(p.split("=", 1) for p in detail.split("&") if "=" in p)
|
|
results.append({
|
|
"domain": domain,
|
|
"available": True,
|
|
"min_years": int(kv.get("Minimum", 1)),
|
|
"max_years": int(kv.get("Maximum", 1)),
|
|
})
|
|
elif rest.startswith("ERR:"):
|
|
err = rest.split(":", 1)[1].strip()
|
|
code, _, msg = err.partition(",")
|
|
results.append({
|
|
"domain": domain,
|
|
"available": False,
|
|
"error_code": code.strip(),
|
|
"error": msg.strip() or "Domain not available",
|
|
})
|
|
else:
|
|
results.append({"domain": domain, "available": False, "error": rest})
|
|
|
|
return results
|
|
|
|
# ------------------------------------------------------------------
|
|
# Domain registration
|
|
# ------------------------------------------------------------------
|
|
|
|
async def register_domain(
|
|
self,
|
|
domain: str,
|
|
period: int,
|
|
owner_contact_id: str,
|
|
admin_contact_id: str | None = None,
|
|
tech_contact_id: str | None = None,
|
|
billing_contact_id: str | None = None,
|
|
hosts: list[str] | None = None,
|
|
# .au eligibility fields
|
|
registrant_name: str | None = None,
|
|
registrant_id: str | None = None,
|
|
registrant_id_type: str | None = None,
|
|
eligibility_type: str | None = None,
|
|
eligibility_reason: str | None = None,
|
|
eligibility_name: str | None = None,
|
|
eligibility_id: str | None = None,
|
|
eligibility_id_type: str | None = None,
|
|
) -> str:
|
|
"""Register a new domain. Returns the order ID."""
|
|
params: dict[str, Any] = {
|
|
"Object": "Domain",
|
|
"Action": "Create",
|
|
"Domain": domain,
|
|
"Period": str(period),
|
|
"OwnerContactID": owner_contact_id,
|
|
"AdministrationContactID": admin_contact_id or owner_contact_id,
|
|
"TechnicalContactID": tech_contact_id or owner_contact_id,
|
|
"BillingContactID": billing_contact_id or owner_contact_id,
|
|
}
|
|
if hosts:
|
|
for h in hosts:
|
|
# Multiple Host params — we need to handle this specially
|
|
pass # handled below
|
|
|
|
# .au eligibility
|
|
if registrant_name:
|
|
params["RegistrantName"] = registrant_name
|
|
if registrant_id:
|
|
params["RegistrantID"] = registrant_id
|
|
if registrant_id_type:
|
|
params["RegistrantIDType"] = registrant_id_type
|
|
if eligibility_type:
|
|
params["EligibilityType"] = eligibility_type
|
|
if eligibility_reason:
|
|
params["EligibilityReason"] = eligibility_reason
|
|
if eligibility_name:
|
|
params["EligibilityName"] = eligibility_name
|
|
if eligibility_id:
|
|
params["EligibilityID"] = eligibility_id
|
|
if eligibility_id_type:
|
|
params["EligibilityIDType"] = eligibility_id_type
|
|
|
|
# Handle multiple Host params via manual POST
|
|
sid = await self._ensure_session()
|
|
params["SessionID"] = sid
|
|
params["Type"] = "Domains"
|
|
|
|
# Build form data with potential duplicate Host keys
|
|
form_pairs = [(k, v) for k, v in params.items()]
|
|
if hosts:
|
|
for h in hosts:
|
|
form_pairs.append(("Host", h))
|
|
|
|
resp = await self._http.post(
|
|
f"{BASE_URL}/order.pl",
|
|
content=urllib.parse.urlencode(form_pairs),
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
)
|
|
resp.raise_for_status()
|
|
body = resp.text.strip()
|
|
if body.startswith("OK:"):
|
|
return body.split(":", 1)[1].strip()
|
|
self._raise_error(body)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Domain renewal
|
|
# ------------------------------------------------------------------
|
|
|
|
async def renew_domain(self, domain: str, period: int = 1) -> str:
|
|
"""Renew a domain. Returns the order ID."""
|
|
body = await self._order({
|
|
"Object": "Domain",
|
|
"Action": "Renewal",
|
|
"Domain": domain,
|
|
"Period": str(period),
|
|
})
|
|
if body.startswith("OK:"):
|
|
return body.split(":", 1)[1].strip()
|
|
return body
|
|
|
|
# ------------------------------------------------------------------
|
|
# Domain transfer
|
|
# ------------------------------------------------------------------
|
|
|
|
async def check_transfer(self, domain: str, password: str | None = None) -> dict:
|
|
"""Check if a domain can be transferred in."""
|
|
params: dict[str, Any] = {
|
|
"Object": "Domain",
|
|
"Action": "Transfer",
|
|
"Domain": domain,
|
|
}
|
|
if password:
|
|
params["DomainPassword"] = password
|
|
|
|
body = await self._query(params)
|
|
# "domain.com: OK: Minimum=1&Maximum=10&Owner=..."
|
|
_, _, rest = body.partition(":")
|
|
rest = rest.strip()
|
|
if rest.startswith("OK:"):
|
|
detail = rest.split(":", 1)[1].strip()
|
|
kv = dict(p.split("=", 1) for p in detail.split("&") if "=" in p)
|
|
return {
|
|
"domain": domain,
|
|
"transferable": True,
|
|
"min_years": int(kv.get("Minimum", 0)),
|
|
"max_years": int(kv.get("Maximum", 0)),
|
|
"owner": kv.get("Owner", ""),
|
|
"owner_email": kv.get("OwnerEmail", ""),
|
|
"expiry_date": kv.get("ExpiryDate", ""),
|
|
"requires_password": kv.get("RequirePassword") == "1",
|
|
}
|
|
return {"domain": domain, "transferable": False, "error": rest}
|
|
|
|
async def transfer_domain(
|
|
self,
|
|
domain: str,
|
|
period: int,
|
|
domain_password: str,
|
|
owner_contact_id: str,
|
|
admin_contact_id: str | None = None,
|
|
tech_contact_id: str | None = None,
|
|
billing_contact_id: str | None = None,
|
|
) -> str:
|
|
"""Request a domain transfer. Returns the order ID."""
|
|
body = await self._order({
|
|
"Object": "Domain",
|
|
"Action": "TransferRequest",
|
|
"Domain": domain,
|
|
"Period": str(period),
|
|
"DomainPassword": domain_password,
|
|
"OwnerContactID": owner_contact_id,
|
|
"AdministrationContactID": admin_contact_id or owner_contact_id,
|
|
"TechnicalContactID": tech_contact_id or owner_contact_id,
|
|
"BillingContactID": billing_contact_id or owner_contact_id,
|
|
})
|
|
if body.startswith("OK:"):
|
|
return body.split(":", 1)[1].strip()
|
|
return body
|
|
|
|
# ------------------------------------------------------------------
|
|
# Contact management
|
|
# ------------------------------------------------------------------
|
|
|
|
async def create_contact(
|
|
self,
|
|
first_name: str,
|
|
last_name: str,
|
|
email: str,
|
|
phone: str,
|
|
address1: str,
|
|
city: str,
|
|
region: str,
|
|
postal_code: str,
|
|
country_code: str,
|
|
organisation: str = "",
|
|
address2: str = "",
|
|
address3: str = "",
|
|
fax: str = "",
|
|
) -> str:
|
|
"""Create a contact object. Returns the contact ID."""
|
|
params: dict[str, Any] = {
|
|
"Object": "Contact",
|
|
"Action": "Create",
|
|
"OrganisationName": organisation,
|
|
"FirstName": first_name,
|
|
"LastName": last_name,
|
|
"Address1": address1,
|
|
"City": city,
|
|
"Region": region,
|
|
"PostalCode": postal_code,
|
|
"CountryCode": country_code,
|
|
"PhoneNumber": phone,
|
|
"Email": email,
|
|
}
|
|
if address2:
|
|
params["Address2"] = address2
|
|
if address3:
|
|
params["Address3"] = address3
|
|
if fax:
|
|
params["FaxNumber"] = fax
|
|
|
|
body = await self._order(params)
|
|
if body.startswith("OK:"):
|
|
return body.split(":", 1)[1].strip()
|
|
return body
|
|
|
|
# ------------------------------------------------------------------
|
|
# Host / nameserver management
|
|
# ------------------------------------------------------------------
|
|
|
|
async def update_hosts(
|
|
self, domain: str, add_hosts: list[str] | None = None,
|
|
remove_hosts: list[str] | None = None
|
|
) -> str:
|
|
"""Add or remove nameservers on a domain."""
|
|
sid = await self._ensure_session()
|
|
form_pairs = [
|
|
("SessionID", sid),
|
|
("Type", "Domains"),
|
|
("Object", "Domain"),
|
|
("Action", "UpdateHosts"),
|
|
("Domain", domain),
|
|
]
|
|
if add_hosts:
|
|
for h in add_hosts:
|
|
form_pairs.append(("AddHost", h))
|
|
if remove_hosts:
|
|
for h in remove_hosts:
|
|
form_pairs.append(("RemoveHost", h))
|
|
|
|
resp = await self._http.post(
|
|
f"{BASE_URL}/order.pl",
|
|
content=urllib.parse.urlencode(form_pairs),
|
|
headers={"Content-Type": "application/x-www-form-urlencoded"},
|
|
)
|
|
resp.raise_for_status()
|
|
body = resp.text.strip()
|
|
if body.startswith("OK:"):
|
|
return body.split(":", 1)[1].strip()
|
|
if body.startswith("ERR:"):
|
|
self._raise_error(body)
|
|
return body
|
|
|
|
# ------------------------------------------------------------------
|
|
# Domain lock
|
|
# ------------------------------------------------------------------
|
|
|
|
async def set_domain_lock(self, domain: str, lock: bool) -> str:
|
|
"""Lock or unlock a domain."""
|
|
body = await self._order({
|
|
"Object": "Domain",
|
|
"Action": "UpdateDomainLock",
|
|
"Domain": domain,
|
|
"DomainLock": "1" if lock else "0",
|
|
})
|
|
if body.startswith("OK:"):
|
|
return body.split(":", 1)[1].strip()
|
|
return body
|
|
|
|
# ------------------------------------------------------------------
|
|
# Query operations
|
|
# ------------------------------------------------------------------
|
|
|
|
async def get_domain_details(self, domain: str) -> dict:
|
|
"""Get full details for a domain."""
|
|
body = await self._query({
|
|
"Object": "Domain",
|
|
"Action": "Details",
|
|
"Domain": domain,
|
|
})
|
|
# Response is OK: followed by key=value lines
|
|
result: dict[str, Any] = {"domain": domain}
|
|
nameservers = []
|
|
lines = body.splitlines()
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line.startswith("OK"):
|
|
continue
|
|
if "=" in line:
|
|
key, _, val = line.partition("=")
|
|
key = key.strip()
|
|
val = val.strip()
|
|
if key == "Nameserver":
|
|
nameservers.append(val)
|
|
else:
|
|
result[key] = val
|
|
if nameservers:
|
|
result["nameservers"] = nameservers
|
|
return result
|
|
|
|
async def list_domains(
|
|
self,
|
|
search: str | None = None,
|
|
tld: str | None = None,
|
|
account: str | None = None,
|
|
expires_before: str | None = None,
|
|
expires_after: str | None = None,
|
|
active_only: bool = True,
|
|
) -> list[dict]:
|
|
"""List domains in the account.
|
|
|
|
Returns list of dicts with: domain, account, expiry_date
|
|
"""
|
|
params: dict[str, Any] = {
|
|
"Object": "Domain",
|
|
"Action": "List",
|
|
}
|
|
if search:
|
|
params["Domain"] = search
|
|
if tld:
|
|
params["Tld"] = tld if tld.startswith(".") else f".{tld}"
|
|
if account:
|
|
params["Account"] = account
|
|
if expires_before:
|
|
params["ExpiresBefore"] = expires_before
|
|
if expires_after:
|
|
params["ExpiresAfter"] = expires_after
|
|
if active_only:
|
|
params["ActiveDomainsOnly"] = "true"
|
|
|
|
body = await self._query(params)
|
|
results = []
|
|
for line in body.splitlines():
|
|
line = line.strip()
|
|
if not line or line.startswith("OK"):
|
|
continue
|
|
parts = line.split(",")
|
|
if len(parts) >= 3:
|
|
results.append({
|
|
"domain": parts[0].strip(),
|
|
"account": parts[1].strip(),
|
|
"expiry_date": parts[2].strip(),
|
|
})
|
|
elif len(parts) == 1 and parts[0]:
|
|
results.append({"domain": parts[0].strip()})
|
|
return results
|
|
|
|
async def get_order_status(self, order_id: str) -> dict:
|
|
"""Check the status of a domain order."""
|
|
body = await self._query({
|
|
"Object": "Order",
|
|
"Action": "OrderStatus",
|
|
"OrderID": order_id,
|
|
})
|
|
# "12345: Status,Description"
|
|
_, _, rest = body.partition(":")
|
|
rest = rest.strip()
|
|
parts = rest.split(",", 1)
|
|
return {
|
|
"order_id": order_id,
|
|
"status": parts[0].strip(),
|
|
"description": parts[1].strip() if len(parts) > 1 else "",
|
|
}
|
|
|
|
async def get_account_balance(self) -> float:
|
|
"""Get the prepaid account balance in AUD."""
|
|
body = await self._query({"Action": "AccountBalance"})
|
|
# "OK: 1234.56"
|
|
if body.startswith("OK:"):
|
|
return float(body.split(":", 1)[1].strip())
|
|
return 0.0
|
|
|
|
async def get_renewal_info(self, domain: str) -> dict:
|
|
"""Check renewal status and expiry for a domain."""
|
|
body = await self._query({
|
|
"Object": "Domain",
|
|
"Action": "Renewal",
|
|
"Domain": domain,
|
|
})
|
|
# "domain.com: OK: Minimum=2&Maximum=5&ExpiryDate=2026-12-01"
|
|
_, _, rest = body.partition(":")
|
|
rest = rest.strip()
|
|
if rest.startswith("OK:"):
|
|
detail = rest.split(":", 1)[1].strip()
|
|
kv = dict(p.split("=", 1) for p in detail.split("&") if "=" in p)
|
|
return {
|
|
"domain": domain,
|
|
"renewable": True,
|
|
"min_years": int(kv.get("Minimum", 1)),
|
|
"max_years": int(kv.get("Maximum", 1)),
|
|
"expiry_date": kv.get("ExpiryDate", ""),
|
|
}
|
|
return {"domain": domain, "renewable": False, "error": rest}
|
|
|
|
# ------------------------------------------------------------------
|
|
# Cleanup
|
|
# ------------------------------------------------------------------
|
|
|
|
async def close(self) -> None:
|
|
await self._http.aclose()
|