Files
ospab.network/oncp/src/api.rs
ospab 6d4c06a013 feat: CDN Control Plane (ONCP) implementation
- Add REST API for node/user management (axum-based)
- Add NodeRegistry for server check-in and load balancing
- Add SniManager for dynamic SNI updates and emergency blocking
- Add CDN Dashboard CLI (oncp-master) with real-time monitoring
- Add ProbeDetector in ostp-guard for active probing detection
- Add iptables/nftables/Windows firewall ban integration
- Extend MimicryEngine with async SNI updates from control plane
- Fix all compilation warnings
- Update author to ospab.team
2026-01-01 20:33:03 +03:00

439 lines
13 KiB
Rust

//! REST API for CDN Control Plane (Master Node)
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::IntoResponse,
routing::{get, post},
Json, Router,
};
use base64::{Engine as _, engine::general_purpose};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tower_http::cors::{Any, CorsLayer};
use tower_http::trace::TraceLayer;
use uuid::Uuid;
use crate::billing::{SqliteRegistry, User, UserRegistry};
use crate::node::{NetworkStats, Node, NodeCheckin, NodeRegistry};
use crate::session::SessionManager;
use crate::sni::{SniManager, SniUpdate};
/// Shared application state
pub struct AppState {
pub nodes: NodeRegistry,
pub users: SqliteRegistry,
pub sessions: SessionManager,
pub sni_manager: SniManager,
}
impl AppState {
pub fn new(db_path: &str) -> anyhow::Result<Self> {
Ok(Self {
nodes: NodeRegistry::new(60), // 60 second timeout
users: SqliteRegistry::new(db_path)?,
sessions: SessionManager::new(300), // 5 minute heartbeat timeout
sni_manager: SniManager::new(),
})
}
}
/// Create the control plane API router
pub fn create_router(state: Arc<AppState>) -> Router {
Router::new()
// Node management
.route("/api/v1/nodes", get(list_nodes).post(register_node))
.route("/api/v1/nodes/:id", get(get_node).delete(remove_node))
.route("/api/v1/nodes/:id/checkin", post(node_checkin))
.route("/api/v1/nodes/best", get(best_nodes))
// User management
.route("/api/v1/users", get(list_users).post(create_user))
.route("/api/v1/users/:id", get(get_user).delete(delete_user))
.route("/api/v1/users/:id/config", get(user_config))
// SNI management
.route("/api/v1/sni", get(list_sni).post(update_sni))
.route("/api/v1/sni/emergency", post(emergency_sni_update))
// Statistics
.route("/api/v1/stats", get(network_stats))
.route("/api/v1/stats/traffic", get(traffic_stats))
// Health check
.route("/health", get(health_check))
.layer(TraceLayer::new_for_http())
.layer(CorsLayer::new().allow_origin(Any).allow_methods(Any))
.with_state(state)
}
// ============================================================================
// Node Endpoints
// ============================================================================
/// List all nodes
async fn list_nodes(State(state): State<Arc<AppState>>) -> Json<Vec<Node>> {
Json(state.nodes.list().await)
}
/// Register new node
#[derive(Debug, Deserialize)]
struct RegisterNodeRequest {
name: String,
address: String,
country_code: String,
max_connections: Option<u32>,
psk_hash: Option<String>,
}
#[derive(Debug, Serialize)]
struct RegisterNodeResponse {
node_id: Uuid,
message: String,
}
async fn register_node(
State(state): State<Arc<AppState>>,
Json(req): Json<RegisterNodeRequest>,
) -> impl IntoResponse {
let mut node = Node::new(&req.name, &req.address, &req.country_code);
if let Some(max) = req.max_connections {
node.max_connections = max;
}
node.psk_hash = req.psk_hash;
let node_id = state.nodes.register(node).await;
(StatusCode::CREATED, Json(RegisterNodeResponse {
node_id,
message: "Node registered successfully".into(),
}))
}
/// Get single node
async fn get_node(
State(state): State<Arc<AppState>>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
match state.nodes.get(&id).await {
Some(node) => (StatusCode::OK, Json(Some(node))),
None => (StatusCode::NOT_FOUND, Json(None)),
}
}
/// Remove node
async fn remove_node(
State(state): State<Arc<AppState>>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
match state.nodes.remove(&id).await {
Some(_) => StatusCode::NO_CONTENT,
None => StatusCode::NOT_FOUND,
}
}
/// Node check-in (heartbeat)
async fn node_checkin(
State(state): State<Arc<AppState>>,
Path(id): Path<Uuid>,
Json(mut checkin): Json<NodeCheckin>,
) -> impl IntoResponse {
checkin.node_id = id; // Ensure node_id matches path
match state.nodes.checkin(checkin).await {
Some(node) => (StatusCode::OK, Json(Some(node))),
None => (StatusCode::NOT_FOUND, Json(None)),
}
}
/// Get best nodes for client connection
#[derive(Debug, Deserialize)]
struct BestNodesQuery {
country: Option<String>,
limit: Option<usize>,
}
async fn best_nodes(
State(state): State<Arc<AppState>>,
Query(query): Query<BestNodesQuery>,
) -> Json<Vec<Node>> {
let limit = query.limit.unwrap_or(3);
let nodes = match &query.country {
Some(country) => state.nodes.best_for_country(country, limit).await,
None => state.nodes.best_global(limit).await,
};
Json(nodes)
}
// ============================================================================
// User Endpoints
// ============================================================================
/// List users (limited info for security)
#[derive(Debug, Serialize)]
struct UserSummary {
uuid: Uuid,
active: bool,
expires_at: String,
bandwidth_used_gb: f64,
bandwidth_quota_gb: f64,
}
async fn list_users(State(state): State<Arc<AppState>>) -> impl IntoResponse {
// Note: In production, this should have pagination and auth
let conn = state.users.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT uuid, bandwidth_quota, bandwidth_used, expires_at, active FROM users LIMIT 100"
).unwrap();
let users: Vec<UserSummary> = stmt.query_map([], |row: &rusqlite::Row| {
let uuid_str: String = row.get(0)?;
Ok(UserSummary {
uuid: Uuid::parse_str(&uuid_str).unwrap(),
bandwidth_quota_gb: row.get::<_, i64>(1)? as f64 / (1024.0 * 1024.0 * 1024.0),
bandwidth_used_gb: row.get::<_, i64>(2)? as f64 / (1024.0 * 1024.0 * 1024.0),
expires_at: row.get::<_, String>(3)?,
active: row.get::<_, i32>(4)? == 1,
})
}).unwrap().filter_map(|r: Result<UserSummary, _>| r.ok()).collect();
Json(users)
}
/// Create new user
#[derive(Debug, Deserialize)]
struct CreateUserRequest {
quota_gb: Option<u64>,
valid_days: Option<i64>,
}
#[derive(Debug, Serialize)]
struct CreateUserResponse {
uuid: Uuid,
config_string: String,
expires_at: String,
}
async fn create_user(
State(state): State<Arc<AppState>>,
Json(req): Json<CreateUserRequest>,
) -> impl IntoResponse {
let quota = req.quota_gb.unwrap_or(100);
let days = req.valid_days.unwrap_or(30);
let user = User::new(quota, days);
match state.users.create_user(&user) {
Ok(()) => {
// Generate config string (can be used for QR code)
let config = serde_json::json!({
"uuid": user.uuid.to_string(),
"expires": user.expires_at.to_rfc3339(),
});
(StatusCode::CREATED, Json(CreateUserResponse {
uuid: user.uuid,
config_string: general_purpose::STANDARD.encode(config.to_string()),
expires_at: user.expires_at.to_rfc3339(),
}))
}
Err(e) => {
tracing::error!("Failed to create user: {}", e);
(StatusCode::INTERNAL_SERVER_ERROR, Json(CreateUserResponse {
uuid: Uuid::nil(),
config_string: String::new(),
expires_at: String::new(),
}))
}
}
}
/// Get user details
async fn get_user(
State(state): State<Arc<AppState>>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
match state.users.get_user(&id) {
Ok(Some(user)) => (StatusCode::OK, Json(Some(user))),
Ok(None) => (StatusCode::NOT_FOUND, Json(None)),
Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, Json(None)),
}
}
/// Delete user
async fn delete_user(
State(state): State<Arc<AppState>>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
let conn = state.users.conn.lock().unwrap();
match conn.execute("DELETE FROM users WHERE uuid = ?", [id.to_string()]) {
Ok(0) => StatusCode::NOT_FOUND,
Ok(_) => StatusCode::NO_CONTENT,
Err(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
}
/// Get user connection config (for client setup)
#[derive(Debug, Serialize)]
struct UserConfig {
uuid: String,
servers: Vec<ServerInfo>,
sni_list: Vec<String>,
qr_data: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
struct ServerInfo {
address: String,
country_code: String,
load: f32,
}
async fn user_config(
State(state): State<Arc<AppState>>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
// Verify user exists and is valid
match state.users.validate_user(&id) {
Ok(true) => {}
_ => return (StatusCode::NOT_FOUND, Json(UserConfig {
uuid: String::new(),
servers: vec![],
sni_list: vec![],
qr_data: None,
})),
}
// Get best available servers
let nodes = state.nodes.best_global(5).await;
let servers: Vec<ServerInfo> = nodes.iter().map(|n| ServerInfo {
address: n.address.clone(),
country_code: n.country_code.clone(),
load: n.load_score(),
}).collect();
// Get current SNI list
let sni_list = state.sni_manager.get_active_snis().await;
// Generate QR-compatible config
let qr_config = serde_json::json!({
"u": id.to_string(),
"s": servers.first().map(|s| &s.address),
});
let qr_data = general_purpose::STANDARD.encode(qr_config.to_string());
(StatusCode::OK, Json(UserConfig {
uuid: id.to_string(),
servers,
sni_list,
qr_data: Some(qr_data),
}))
}
// ============================================================================
// SNI Management
// ============================================================================
async fn list_sni(State(state): State<Arc<AppState>>) -> Json<Vec<String>> {
Json(state.sni_manager.get_active_snis().await)
}
async fn update_sni(
State(state): State<Arc<AppState>>,
Json(update): Json<SniUpdate>,
) -> impl IntoResponse {
state.sni_manager.apply_update(update).await;
StatusCode::OK
}
/// Emergency SNI update (broadcast to all nodes)
#[derive(Debug, Deserialize)]
struct EmergencySniRequest {
blocked_domains: Vec<String>,
replacement_domains: Vec<String>,
country_code: Option<String>,
}
async fn emergency_sni_update(
State(state): State<Arc<AppState>>,
Json(req): Json<EmergencySniRequest>,
) -> impl IntoResponse {
let update = SniUpdate {
remove: req.blocked_domains,
add: req.replacement_domains,
country_code: req.country_code,
emergency: true,
};
state.sni_manager.apply_update(update).await;
// Log for audit
tracing::warn!("Emergency SNI update applied");
StatusCode::OK
}
// ============================================================================
// Statistics
// ============================================================================
async fn network_stats(State(state): State<Arc<AppState>>) -> Json<NetworkStats> {
Json(state.nodes.network_stats().await)
}
#[derive(Debug, Serialize)]
struct TrafficStats {
total_bytes_tx: u64,
total_bytes_rx: u64,
total_mb_transferred: f64,
active_sessions: usize,
}
async fn traffic_stats(State(state): State<Arc<AppState>>) -> Json<TrafficStats> {
let net_stats = state.nodes.network_stats().await;
let total_bytes = net_stats.total_bytes_tx + net_stats.total_bytes_rx;
Json(TrafficStats {
total_bytes_tx: net_stats.total_bytes_tx,
total_bytes_rx: net_stats.total_bytes_rx,
total_mb_transferred: total_bytes as f64 / (1024.0 * 1024.0),
active_sessions: net_stats.total_connections as usize,
})
}
// ============================================================================
// Health Check
// ============================================================================
#[derive(Debug, Serialize)]
struct HealthStatus {
status: String,
version: String,
nodes_online: usize,
}
async fn health_check(State(state): State<Arc<AppState>>) -> Json<HealthStatus> {
let stats = state.nodes.network_stats().await;
Json(HealthStatus {
status: "ok".into(),
version: env!("CARGO_PKG_VERSION").into(),
nodes_online: stats.online_nodes,
})
}
/// Start the control plane API server
pub async fn run_server(state: Arc<AppState>, bind_addr: &str) -> anyhow::Result<()> {
let app = create_router(state);
let listener = tokio::net::TcpListener::bind(bind_addr).await?;
tracing::info!("Control Plane API listening on {}", bind_addr);
axum::serve(listener, app).await?;
Ok(())
}