feat: System DNS, Node Enrollment, and CDN Steering

- osds: Added system DNS forwarder on 127.0.0.1:53
  - SystemDnsManager for Windows/Linux DNS configuration
  - Auto-restore original DNS on exit
  - *.ospab.internal routing to master node
  - Encrypted DNS forwarding through OSTP tunnel

- oncp: Implemented node enrollment system
  - EnrollmentRegistry with state machine (Pending->Approved->Active)
  - SQLite-backed enrollment storage
  - Node PSK generation on approval
  - REST API endpoints for enrollment workflow

- oncp-master: Added enrollment CLI commands
  - 'node pending' - List pending enrollment requests
  - 'node approve <id>' - Approve and generate PSK
  - 'node reject <id>' - Reject enrollment

- ostp-server: Auto-registration on startup
  - Submits enrollment request to master node
  - Exits if PSK='AUTO' and awaits approval
  - Integrates with ONCP enrollment API

- oncp API: Enhanced CDN steering
  - Best nodes by country_code with fallback
  - Steering metadata (matched, fallback status)
  - Load-based node selection
This commit is contained in:
2026-01-01 23:45:24 +03:00
parent 7e1c87e70b
commit 5879344336
11 changed files with 1449 additions and 49 deletions

View File

@@ -21,3 +21,5 @@ hyper = { version = "1.0", features = ["full"] }
base64 = "0.21"
qrcode = "0.14"
image = { version = "0.24", default-features = false, features = ["png"] }
hex = "0.4"
rand = "0.8"

View File

@@ -15,6 +15,7 @@ use tower_http::trace::TraceLayer;
use uuid::Uuid;
use crate::billing::{SqliteRegistry, User, UserRegistry};
use crate::enrollment::{EnrollmentRegistry, EnrollmentRequest, EnrollmentState};
use crate::node::{NetworkStats, Node, NodeCheckin, NodeRegistry};
use crate::session::SessionManager;
use crate::sni::{SniManager, SniUpdate};
@@ -25,6 +26,7 @@ pub struct AppState {
pub users: SqliteRegistry,
pub sessions: SessionManager,
pub sni_manager: SniManager,
pub enrollment: EnrollmentRegistry,
}
impl AppState {
@@ -34,6 +36,7 @@ impl AppState {
users: SqliteRegistry::new(db_path)?,
sessions: SessionManager::new(300), // 5 minute heartbeat timeout
sni_manager: SniManager::new(),
enrollment: EnrollmentRegistry::new(db_path)?,
})
}
}
@@ -47,6 +50,12 @@ pub fn create_router(state: Arc<AppState>) -> Router {
.route("/api/v1/nodes/:id/checkin", post(node_checkin))
.route("/api/v1/nodes/best", get(best_nodes))
// Node enrollment
.route("/api/v1/enrollment/request", post(submit_enrollment))
.route("/api/v1/enrollment/pending", get(list_pending_enrollments))
.route("/api/v1/enrollment/:id/approve", post(approve_enrollment))
.route("/api/v1/enrollment/:id/reject", post(reject_enrollment))
// User management
.route("/api/v1/users", get(list_users).post(create_user))
.route("/api/v1/users/:id", get(get_user).delete(delete_user))
@@ -147,25 +156,62 @@ async fn node_checkin(
}
}
/// Get best nodes for client connection
/// Get best nodes for client connection (CDN Steering)
#[derive(Debug, Deserialize)]
struct BestNodesQuery {
country: Option<String>,
limit: Option<usize>,
}
#[derive(Debug, Serialize)]
struct BestNodesResponse {
nodes: Vec<Node>,
steering_info: SteeringInfo,
}
#[derive(Debug, Serialize)]
struct SteeringInfo {
requested_country: Option<String>,
matched_country: bool,
fallback_used: bool,
total_available: usize,
}
async fn best_nodes(
State(state): State<Arc<AppState>>,
Query(query): Query<BestNodesQuery>,
) -> Json<Vec<Node>> {
) -> Json<BestNodesResponse> {
let limit = query.limit.unwrap_or(3);
let nodes = match &query.country {
Some(country) => state.nodes.best_for_country(country, limit).await,
None => state.nodes.best_global(limit).await,
let (nodes, matched, fallback) = match &query.country {
Some(country) => {
let country_nodes = state.nodes.best_for_country(country, limit).await;
if country_nodes.is_empty() {
// No nodes in requested country - fallback to global
let global_nodes = state.nodes.best_global(limit).await;
(global_nodes, false, true)
} else {
(country_nodes, true, false)
}
}
None => {
let global = state.nodes.best_global(limit).await;
(global, false, false)
}
};
Json(nodes)
let total = state.nodes.list_online().await.len();
Json(BestNodesResponse {
nodes: nodes.clone(),
steering_info: SteeringInfo {
requested_country: query.country.clone(),
matched_country: matched,
fallback_used: fallback,
total_available: total,
},
})
}
// ============================================================================
@@ -376,6 +422,115 @@ async fn emergency_sni_update(
StatusCode::OK
}
// ============================================================================
// Enrollment Endpoints
// ============================================================================
/// Submit node enrollment request
#[derive(Debug, Deserialize)]
struct SubmitEnrollmentRequest {
name: String,
address: String,
country_code: String,
hardware_id: String,
region: String,
}
#[derive(Debug, Serialize)]
struct SubmitEnrollmentResponse {
node_id: Uuid,
state: String,
message: String,
}
async fn submit_enrollment(
State(state): State<Arc<AppState>>,
Json(req): Json<SubmitEnrollmentRequest>,
) -> impl IntoResponse {
let node_id = Uuid::new_v4();
let enrollment_req = EnrollmentRequest {
node_id,
name: req.name,
address: req.address,
country_code: req.country_code,
hardware_id: req.hardware_id,
region: req.region,
requested_at: chrono::Utc::now(),
};
match state.enrollment.submit_request(enrollment_req) {
Ok(()) => (
StatusCode::CREATED,
Json(SubmitEnrollmentResponse {
node_id,
state: "pending".into(),
message: "Enrollment request submitted. Awaiting approval.".into(),
}),
),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(SubmitEnrollmentResponse {
node_id,
state: "error".into(),
message: format!("Failed to submit: {}", e),
}),
),
}
}
/// List pending enrollments
async fn list_pending_enrollments(
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
match state.enrollment.list_by_state(EnrollmentState::Pending) {
Ok(nodes) => (StatusCode::OK, Json(nodes)),
Err(_) => (StatusCode::INTERNAL_SERVER_ERROR, Json(vec![])),
}
}
/// Approve enrollment
#[derive(Debug, Serialize)]
struct ApproveEnrollmentResponse {
node_id: Uuid,
node_psk: String,
message: String,
}
async fn approve_enrollment(
State(state): State<Arc<AppState>>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
match state.enrollment.approve(&id) {
Ok(node_psk) => (
StatusCode::OK,
Json(ApproveEnrollmentResponse {
node_id: id,
node_psk,
message: "Node approved. Use provided PSK to connect.".into(),
}),
),
Err(e) => (
StatusCode::BAD_REQUEST,
Json(ApproveEnrollmentResponse {
node_id: id,
node_psk: String::new(),
message: format!("Failed to approve: {}", e),
}),
),
}
}
/// Reject enrollment
async fn reject_enrollment(
State(state): State<Arc<AppState>>,
Path(id): Path<Uuid>,
) -> impl IntoResponse {
match state.enrollment.reject(&id) {
Ok(()) => StatusCode::NO_CONTENT,
Err(_) => StatusCode::BAD_REQUEST,
}
}
// ============================================================================
// Statistics
// ============================================================================

386
oncp/src/enrollment.rs Normal file
View File

@@ -0,0 +1,386 @@
//! Node enrollment and dynamic registration system
use chrono::{DateTime, Utc};
use rusqlite::{params, Connection, Result as SqliteResult};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use thiserror::Error;
use uuid::Uuid;
#[derive(Error, Debug)]
pub enum EnrollmentError {
#[error("database error: {0}")]
Database(#[from] rusqlite::Error),
#[error("node not found")]
NotFound,
#[error("invalid state transition")]
InvalidTransition,
#[error("crypto error: {0}")]
Crypto(String),
}
/// Node enrollment state machine
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum EnrollmentState {
Pending,
Approved,
Active,
Rejected,
Suspended,
}
impl EnrollmentState {
pub fn as_str(&self) -> &'static str {
match self {
Self::Pending => "pending",
Self::Approved => "approved",
Self::Active => "active",
Self::Rejected => "rejected",
Self::Suspended => "suspended",
}
}
pub fn from_str(s: &str) -> Option<Self> {
match s {
"pending" => Some(Self::Pending),
"approved" => Some(Self::Approved),
"active" => Some(Self::Active),
"rejected" => Some(Self::Rejected),
"suspended" => Some(Self::Suspended),
_ => None,
}
}
}
/// Node enrollment request
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnrollmentRequest {
pub node_id: Uuid,
pub name: String,
pub address: String, // Public IP:port
pub country_code: String,
pub hardware_id: String, // Unique hardware fingerprint
pub region: String, // Geographic region
pub requested_at: DateTime<Utc>,
}
/// Enrolled node record
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EnrolledNode {
pub node_id: Uuid,
pub name: String,
pub address: String,
pub country_code: String,
pub hardware_id: String,
pub region: String,
pub state: EnrollmentState,
pub node_psk: Option<String>, // Generated PSK for approved nodes
pub requested_at: DateTime<Utc>,
pub approved_at: Option<DateTime<Utc>>,
pub activated_at: Option<DateTime<Utc>>,
}
impl EnrolledNode {
/// Check if node can transition to new state
pub fn can_transition_to(&self, new_state: EnrollmentState) -> bool {
match (self.state, new_state) {
(EnrollmentState::Pending, EnrollmentState::Approved) => true,
(EnrollmentState::Pending, EnrollmentState::Rejected) => true,
(EnrollmentState::Approved, EnrollmentState::Active) => true,
(EnrollmentState::Approved, EnrollmentState::Rejected) => true,
(EnrollmentState::Active, EnrollmentState::Suspended) => true,
(EnrollmentState::Suspended, EnrollmentState::Active) => true,
_ => false,
}
}
}
/// Node enrollment registry with SQLite backend
pub struct EnrollmentRegistry {
conn: Arc<Mutex<Connection>>,
}
impl EnrollmentRegistry {
/// Create new enrollment registry with SQLite database
pub fn new(db_path: impl AsRef<std::path::Path>) -> Result<Self, EnrollmentError> {
let conn = Connection::open(db_path)?;
conn.execute(
"CREATE TABLE IF NOT EXISTS enrollments (
node_id TEXT PRIMARY KEY,
name TEXT NOT NULL,
address TEXT NOT NULL,
country_code TEXT NOT NULL,
hardware_id TEXT NOT NULL,
region TEXT NOT NULL,
state TEXT NOT NULL,
node_psk TEXT,
requested_at TEXT NOT NULL,
approved_at TEXT,
activated_at TEXT
)",
[],
)?;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
})
}
/// Submit enrollment request (server wants to join network)
pub fn submit_request(&self, req: EnrollmentRequest) -> Result<(), EnrollmentError> {
let conn = self.conn.lock().unwrap();
conn.execute(
"INSERT INTO enrollments (
node_id, name, address, country_code, hardware_id, region,
state, node_psk, requested_at, approved_at, activated_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, NULL, ?8, NULL, NULL)",
params![
req.node_id.to_string(),
req.name,
req.address,
req.country_code,
req.hardware_id,
req.region,
EnrollmentState::Pending.as_str(),
req.requested_at.to_rfc3339(),
],
)?;
tracing::info!("Enrollment request submitted for node {}", req.node_id);
Ok(())
}
/// Get enrollment by node ID
pub fn get(&self, node_id: &Uuid) -> Result<Option<EnrolledNode>, EnrollmentError> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT node_id, name, address, country_code, hardware_id, region,
state, node_psk, requested_at, approved_at, activated_at
FROM enrollments WHERE node_id = ?1"
)?;
let mut rows = stmt.query(params![node_id.to_string()])?;
if let Some(row) = rows.next()? {
Ok(Some(Self::row_to_node(row)?))
} else {
Ok(None)
}
}
/// List all enrollments by state
pub fn list_by_state(&self, state: EnrollmentState) -> Result<Vec<EnrolledNode>, EnrollmentError> {
let conn = self.conn.lock().unwrap();
let mut stmt = conn.prepare(
"SELECT node_id, name, address, country_code, hardware_id, region,
state, node_psk, requested_at, approved_at, activated_at
FROM enrollments WHERE state = ?1"
)?;
let rows = stmt.query_map(params![state.as_str()], |row| {
Ok(Self::row_to_node(row).unwrap())
})?;
Ok(rows.collect::<SqliteResult<Vec<_>>>()?)
}
/// Approve enrollment request and generate Node PSK
pub fn approve(&self, node_id: &Uuid) -> Result<String, EnrollmentError> {
let conn = self.conn.lock().unwrap();
// Generate unique PSK for this node
let node_psk = Self::generate_node_psk();
let now = Utc::now().to_rfc3339();
let updated = conn.execute(
"UPDATE enrollments
SET state = ?1, node_psk = ?2, approved_at = ?3
WHERE node_id = ?4 AND state = ?5",
params![
EnrollmentState::Approved.as_str(),
node_psk,
now,
node_id.to_string(),
EnrollmentState::Pending.as_str(),
],
)?;
if updated == 0 {
return Err(EnrollmentError::InvalidTransition);
}
tracing::info!("Node {} approved", node_id);
Ok(node_psk)
}
/// Reject enrollment request
pub fn reject(&self, node_id: &Uuid) -> Result<(), EnrollmentError> {
let conn = self.conn.lock().unwrap();
let updated = conn.execute(
"UPDATE enrollments SET state = ?1 WHERE node_id = ?2 AND state = ?3",
params![
EnrollmentState::Rejected.as_str(),
node_id.to_string(),
EnrollmentState::Pending.as_str(),
],
)?;
if updated == 0 {
return Err(EnrollmentError::InvalidTransition);
}
tracing::info!("Node {} rejected", node_id);
Ok(())
}
/// Activate approved node (node has successfully connected)
pub fn activate(&self, node_id: &Uuid) -> Result<(), EnrollmentError> {
let conn = self.conn.lock().unwrap();
let now = Utc::now().to_rfc3339();
let updated = conn.execute(
"UPDATE enrollments
SET state = ?1, activated_at = ?2
WHERE node_id = ?3 AND state = ?4",
params![
EnrollmentState::Active.as_str(),
now,
node_id.to_string(),
EnrollmentState::Approved.as_str(),
],
)?;
if updated == 0 {
return Err(EnrollmentError::InvalidTransition);
}
tracing::info!("Node {} activated", node_id);
Ok(())
}
/// Suspend active node
pub fn suspend(&self, node_id: &Uuid) -> Result<(), EnrollmentError> {
let conn = self.conn.lock().unwrap();
let updated = conn.execute(
"UPDATE enrollments SET state = ?1 WHERE node_id = ?2 AND state = ?3",
params![
EnrollmentState::Suspended.as_str(),
node_id.to_string(),
EnrollmentState::Active.as_str(),
],
)?;
if updated == 0 {
return Err(EnrollmentError::InvalidTransition);
}
tracing::info!("Node {} suspended", node_id);
Ok(())
}
/// Generate secure Node PSK (32 bytes hex)
fn generate_node_psk() -> String {
use rand::RngCore;
let mut psk = [0u8; 32];
rand::thread_rng().fill_bytes(&mut psk);
hex::encode(psk)
}
/// Convert SQLite row to EnrolledNode
fn row_to_node(row: &rusqlite::Row) -> SqliteResult<EnrolledNode> {
Ok(EnrolledNode {
node_id: Uuid::parse_str(&row.get::<_, String>(0)?).unwrap(),
name: row.get(1)?,
address: row.get(2)?,
country_code: row.get(3)?,
hardware_id: row.get(4)?,
region: row.get(5)?,
state: EnrollmentState::from_str(&row.get::<_, String>(6)?).unwrap(),
node_psk: row.get(7)?,
requested_at: DateTime::parse_from_rfc3339(&row.get::<_, String>(8)?)
.unwrap()
.into(),
approved_at: row
.get::<_, Option<String>>(9)?
.and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
.map(|dt| dt.into()),
activated_at: row
.get::<_, Option<String>>(10)?
.and_then(|s| DateTime::parse_from_rfc3339(&s).ok())
.map(|dt| dt.into()),
})
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_state_transitions() {
let node = EnrolledNode {
node_id: Uuid::new_v4(),
name: "test".into(),
address: "1.2.3.4:8443".into(),
country_code: "US".into(),
hardware_id: "hwid".into(),
region: "us-west".into(),
state: EnrollmentState::Pending,
node_psk: None,
requested_at: Utc::now(),
approved_at: None,
activated_at: None,
};
assert!(node.can_transition_to(EnrollmentState::Approved));
assert!(node.can_transition_to(EnrollmentState::Rejected));
assert!(!node.can_transition_to(EnrollmentState::Active));
}
#[test]
fn test_enrollment_workflow() -> Result<(), EnrollmentError> {
let registry = EnrollmentRegistry::new(":memory:")?;
let node_id = Uuid::new_v4();
// Submit request
let req = EnrollmentRequest {
node_id,
name: "test-node".into(),
address: "1.2.3.4:8443".into(),
country_code: "US".into(),
hardware_id: "test-hw".into(),
region: "us-west".into(),
requested_at: Utc::now(),
};
registry.submit_request(req)?;
// Check pending
let pending = registry.list_by_state(EnrollmentState::Pending)?;
assert_eq!(pending.len(), 1);
// Approve
let psk = registry.approve(&node_id)?;
assert_eq!(psk.len(), 64); // 32 bytes hex
// Check approved
let approved = registry.list_by_state(EnrollmentState::Approved)?;
assert_eq!(approved.len(), 1);
// Activate
registry.activate(&node_id)?;
// Check active
let active = registry.list_by_state(EnrollmentState::Active)?;
assert_eq!(active.len(), 1);
Ok(())
}
}

View File

@@ -1,11 +1,15 @@
pub mod api;
pub mod billing;
pub mod enrollment;
pub mod node;
pub mod session;
pub mod sni;
pub use api::{create_router, run_server, AppState};
pub use billing::{BillingError, SqliteRegistry, User, UserRegistry};
pub use enrollment::{
EnrolledNode, EnrollmentError, EnrollmentRegistry, EnrollmentRequest, EnrollmentState,
};
pub use node::{NetworkStats, Node, NodeCheckin, NodeRegistry, NodeStatus};
pub use session::{Session, SessionManager};
pub use sni::{SniManager, SniUpdate};