Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Core Monitoring Technical Specification

Overview

The Core Monitoring specification defines the fundamental process monitoring capabilities that form the foundation of DaemonEye. This includes process enumeration, executable integrity verification, SQL-based detection engine, and multi-channel alerting across the three-component architecture.

Process Collection Architecture

Cross-Platform Process Enumeration

DaemonEye uses a layered approach to process enumeration, providing a unified interface across different operating systems while allowing platform-specific optimizations.

Base Implementation (sysinfo crate)

Primary Interface: The sysinfo crate provides cross-platform process enumeration with consistent data structures.

use sysinfo::{System, SystemExt, ProcessExt, Pid};

pub struct ProcessCollector {
    system: System,
    config: CollectorConfig,
    hash_computer: Box<dyn HashComputer>,
}

impl ProcessCollector {
    pub async fn enumerate_processes(&self) -> Result<Vec<ProcessRecord>> {
        self.system.refresh_processes();

        let mut processes = Vec::new();
        let collection_time = SystemTime::now()
            .duration_since(UNIX_EPOCH)?
            .as_millis() as i64;

        for (pid, process) in self.system.processes() {
            let process_record = ProcessRecord {
                id: Uuid::new_v4(),
                scan_id: self.get_current_scan_id(),
                collection_time,
                pid: pid.as_u32(),
                ppid: process.parent().map(|p| p.as_u32()),
                name: process.name().to_string(),
                executable_path: process.exe().map(|p| p.to_path_buf()),
                command_line: process.cmd().to_vec(),
                start_time: process.start_time(),
                cpu_usage: process.cpu_usage(),
                memory_usage: Some(process.memory()),
                executable_hash: self.compute_executable_hash(process.exe()).await?,
                hash_algorithm: Some("sha256".to_string()),
                user_id: self.get_process_user(pid).await?,
                accessible: true,
                file_exists: process.exe().map(|p| p.exists()).unwrap_or(false),
                platform_data: self.get_platform_specific_data(pid).await?,
            };

            processes.push(process_record);
        }

        Ok(processes)
    }
}

Platform-Specific Enhancements

Linux eBPF Integration (Enterprise Tier):

#[cfg(target_os = "linux")]
pub struct EbpfProcessCollector {
    base_collector: ProcessCollector,
    ebpf_monitor: Option<EbpfMonitor>,
}

impl EbpfProcessCollector {
    pub async fn enumerate_processes(&self) -> Result<Vec<ProcessRecord>> {
        // Use eBPF for real-time process events if available
        if let Some(ebpf) = &self.ebpf_monitor {
            return self.enumerate_with_ebpf(ebpf).await;
        }

        // Fallback to sysinfo
        self.base_collector.enumerate_processes().await
    }
}

Windows ETW Integration (Enterprise Tier):

#[cfg(target_os = "windows")]
pub struct EtwProcessCollector {
    base_collector: ProcessCollector,
    etw_monitor: Option<EtwMonitor>,
}

impl EtwProcessCollector {
    pub async fn enumerate_processes(&self) -> Result<Vec<ProcessRecord>> {
        // Use ETW for enhanced process monitoring if available
        if let Some(etw) = &self.etw_monitor {
            return self.enumerate_with_etw(etw).await;
        }

        // Fallback to sysinfo
        self.base_collector.enumerate_processes().await
    }
}

macOS EndpointSecurity Integration (Enterprise Tier):

#[cfg(target_os = "macos")]
pub struct EndpointSecurityProcessCollector {
    base_collector: ProcessCollector,
    es_monitor: Option<EndpointSecurityMonitor>,
}

impl EndpointSecurityProcessCollector {
    pub async fn enumerate_processes(&self) -> Result<Vec<ProcessRecord>> {
        // Use EndpointSecurity for real-time monitoring if available
        if let Some(es) = &self.es_monitor {
            return self.enumerate_with_endpoint_security(es).await;
        }

        // Fallback to sysinfo
        self.base_collector.enumerate_processes().await
    }
}

Executable Integrity Verification

Hash Computation: SHA-256 hashing of executable files for integrity verification.

pub struct Sha256HashComputer {
    buffer_size: usize,
}

impl HashComputer for Sha256HashComputer {
    async fn compute_hash(&self, path: &Path) -> Result<Option<String>> {
        if !path.exists() {
            return Ok(None);
        }

        let mut file = File::open(path).await?;
        let mut hasher = Sha256::new();
        let mut buffer = vec![0u8; self.buffer_size];

        loop {
            let bytes_read = file.read(&mut buffer).await?;
            if bytes_read == 0 {
                break;
            }
            hasher.update(&buffer[..bytes_read]);
        }

        let hash = hasher.finalize();
        Ok(Some(format!("{:x}", hash)))
    }

    fn get_algorithm(&self) -> &'static str {
        "sha256"
    }
}

Performance Optimization: Asynchronous hash computation with configurable buffer sizes.

impl ProcessCollector {
    async fn compute_executable_hash(&self, path: Option<&Path>) -> Result<Option<String>> {
        let path = match path {
            Some(p) => p,
            None => return Ok(None),
        };

        // Skip hashing for system processes or inaccessible files
        if self.should_skip_hashing(path) {
            return Ok(None);
        }

        // Compute hash asynchronously
        let hash_computer = self.hash_computer.clone();
        let path = path.to_path_buf();

        tokio::task::spawn_blocking(move || {
            hash_computer.compute_hash(&path)
        }).await?
    }

    fn should_skip_hashing(&self, path: &Path) -> bool {
        // Skip hashing for system processes or temporary files
        let path_str = path.to_string_lossy();
        path_str.contains("/proc/") ||
        path_str.contains("/sys/") ||
        path_str.contains("/tmp/") ||
        path_str.contains("\\System32\\")
    }
}

SQL-Based Detection Engine

SQL Validation and Security

AST Validation: Comprehensive SQL parsing and validation to prevent injection attacks.

use sqlparser::{ast::*, dialect::SQLiteDialect, parser::Parser};

pub struct SqlValidator {
    parser: Parser<SQLiteDialect>,
    allowed_functions: HashSet<String>,
    allowed_operators: HashSet<String>,
}

impl SqlValidator {
    pub fn new() -> Self {
        let dialect = SQLiteDialect {};
        let parser = Parser::new(&dialect);

        Self {
            parser,
            allowed_functions: Self::get_allowed_functions(),
            allowed_operators: Self::get_allowed_operators(),
        }
    }

    pub fn validate_query(&self, sql: &str) -> Result<ValidationResult> {
        let ast = self.parser.parse_sql(sql)?;

        for statement in &ast {
            match statement {
                Statement::Query(query) => self.validate_select_query(query)?,
                _ => return Err(ValidationError::ForbiddenStatement),
            }
        }

        Ok(ValidationResult::Valid)
    }

    fn validate_select_query(&self, query: &Query) -> Result<()> {
        self.validate_select_body(&query.body)?;

        if let Some(selection) = &query.selection {
            self.validate_where_clause(selection)?;
        }

        if let Some(group_by) = &query.group_by {
            self.validate_group_by(group_by)?;
        }

        if let Some(having) = &query.having {
            self.validate_having(having)?;
        }

        Ok(())
    }

    fn validate_select_body(&self, body: &SetExpr) -> Result<()> {
        match body {
            SetExpr::Select(select) => {
                for item in &select.projection {
                    self.validate_projection_item(item)?;
                }

                if let Some(from) = &select.from {
                    self.validate_from_clause(from)?;
                }
            }
            _ => return Err(ValidationError::UnsupportedSetExpr),
        }

        Ok(())
    }

    fn validate_projection_item(&self, item: &SelectItem) -> Result<()> {
        match item {
            SelectItem::UnnamedExpr(expr) => self.validate_expression(expr)?,
            SelectItem::ExprWithAlias { expr, .. } => self.validate_expression(expr)?,
            SelectItem::Wildcard => Ok(()), // Allow SELECT *
            _ => Err(ValidationError::UnsupportedSelectItem),
        }
    }

    fn validate_expression(&self, expr: &Expr) -> Result<()> {
        match expr {
            Expr::Identifier(_) => Ok(()),
            Expr::Literal(_) => Ok(()),
            Expr::BinaryOp { left, op, right } => {
                self.validate_operator(op)?;
                self.validate_expression(left)?;
                self.validate_expression(right)?;
                Ok(())
            }
            Expr::Function { name, args, .. } => {
                self.validate_function(name, args)?;
                Ok(())
            }
            Expr::Cast { expr, .. } => self.validate_expression(expr),
            Expr::Case { .. } => Ok(()), // Allow CASE expressions
            _ => Err(ValidationError::UnsupportedExpression),
        }
    }

    fn validate_function(&self, name: &ObjectName, args: &[FunctionArg]) -> Result<()> {
        let func_name = name.to_string().to_lowercase();

        if !self.allowed_functions.contains(&func_name) {
            return Err(ValidationError::ForbiddenFunction(func_name));
        }

        // Validate function arguments
        for arg in args {
            match arg {
                FunctionArg::Unnamed(FunctionArgExpr::Expr(expr)) => {
                    self.validate_expression(expr)?;
                }
                _ => return Err(ValidationError::UnsupportedFunctionArg),
            }
        }

        Ok(())
    }

    fn get_allowed_functions() -> HashSet<String> {
        [
            "count",
            "sum",
            "avg",
            "min",
            "max",
            "length",
            "substr",
            "upper",
            "lower",
            "datetime",
            "strftime",
            "unixepoch",
            "coalesce",
            "nullif",
            "ifnull",
        ]
        .iter()
        .map(|s| s.to_string())
        .collect()
    }
}

Detection Rule Execution

Sandboxed Execution: Safe execution of detection rules with resource limits.

pub struct DetectionEngine {
    db: redb::Database,
    sql_validator: SqlValidator,
    rule_manager: RuleManager,
    alert_manager: AlertManager,
}

impl DetectionEngine {
    pub async fn execute_rules(&self, scan_id: i64) -> Result<Vec<Alert>> {
        let rules = self.rule_manager.load_enabled_rules().await?;
        let mut alerts = Vec::new();

        for rule in rules {
            match self.execute_rule(&rule, scan_id).await {
                Ok(rule_alerts) => alerts.extend(rule_alerts),
                Err(e) => {
                    tracing::error!(
                        rule_id = %rule.id,
                        error = %e,
                        "Failed to execute detection rule"
                    );
                    // Continue with other rules
                }
            }
        }

        Ok(alerts)
    }

    async fn execute_rule(&self, rule: &DetectionRule, scan_id: i64) -> Result<Vec<Alert>> {
        // Validate SQL before execution
        self.sql_validator.validate_query(&rule.sql_query)?;

        // Execute with timeout and resource limits
        let execution_result = tokio::time::timeout(
            Duration::from_secs(30),
            self.execute_sql_query(&rule.sql_query, scan_id)
        ).await??;

        // Generate alerts from query results
        let mut alerts = Vec::new();
        for row in execution_result.rows {
            let alert = self.alert_manager.generate_alert(
                &rule,
                &row,
                scan_id
            ).await?;

            if let Some(alert) = alert {
                alerts.push(alert);
            }
        }

        Ok(alerts)
    }

    async fn execute_sql_query(&self, sql: &str, scan_id: i64) -> Result<QueryResult> {
        // Use read-only connection for security
        let read_txn = self.db.begin_read()?;
        let table = read_txn.open_table(PROCESSES_TABLE)?;

        // Execute prepared statement with parameters
        let mut stmt = self.db.prepare(sql)?;
        stmt.bind((":scan_id", scan_id))?;

        let mut rows = Vec::new();
        while let Some(row) = stmt.next()? {
            rows.push(ProcessRow::from_sqlite_row(row)?);
        }

        Ok(QueryResult { rows })
    }
}

Alert Generation and Management

Alert Data Model

Structured Alerts: Comprehensive alert structure with full context.

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Alert {
    pub id: Uuid,
    pub alert_time: i64,
    pub rule_id: String,
    pub title: String,
    pub description: String,
    pub severity: AlertSeverity,
    pub scan_id: Option<i64>,
    pub affected_processes: Vec<u32>,
    pub process_count: i32,
    pub alert_data: serde_json::Value,
    pub rule_execution_time_ms: Option<i64>,
    pub dedupe_key: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AlertSeverity {
    Low,
    Medium,
    High,
    Critical,
}

impl Alert {
    pub fn new(rule: &DetectionRule, process_data: &ProcessRow, scan_id: Option<i64>) -> Self {
        let alert_time = SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .unwrap()
            .as_millis() as i64;

        let dedupe_key = self.generate_dedupe_key(rule, process_data);

        Self {
            id: Uuid::new_v4(),
            alert_time,
            rule_id: rule.id.clone(),
            title: rule.name.clone(),
            description: rule.description.clone().unwrap_or_default(),
            severity: rule.severity.clone(),
            scan_id,
            affected_processes: vec![process_data.pid],
            process_count: 1,
            alert_data: serde_json::to_value(process_data).unwrap(),
            rule_execution_time_ms: None,
            dedupe_key,
        }
    }

    fn generate_dedupe_key(&self, rule: &DetectionRule, process_data: &ProcessRow) -> String {
        // Generate deduplication key based on rule and process characteristics
        format!(
            "{}:{}:{}:{}",
            rule.id,
            process_data.pid,
            process_data.name,
            process_data.executable_path.as_deref().unwrap_or("")
        )
    }
}

Alert Deduplication

Intelligent Deduplication: Prevent alert spam while maintaining security visibility.

pub struct AlertManager {
    db: redb::Database,
    dedupe_cache: Arc<Mutex<HashMap<String, Instant>>>,
    dedupe_window: Duration,
}

impl AlertManager {
    pub async fn generate_alert(
        &self,
        rule: &DetectionRule,
        process_data: &ProcessRow,
        scan_id: Option<i64>,
    ) -> Result<Option<Alert>> {
        let alert = Alert::new(rule, process_data, scan_id);

        // Check for deduplication
        if self.is_duplicate(&alert).await? {
            return Ok(None);
        }

        // Store alert in database
        self.store_alert(&alert).await?;

        // Update deduplication cache
        self.update_dedupe_cache(&alert).await?;

        Ok(Some(alert))
    }

    async fn is_duplicate(&self, alert: &Alert) -> Result<bool> {
        let mut cache = self.dedupe_cache.lock().await;

        if let Some(last_seen) = cache.get(&alert.dedupe_key) {
            if last_seen.elapsed() < self.dedupe_window {
                return Ok(true);
            }
        }

        Ok(false)
    }

    async fn update_dedupe_cache(&self, alert: &Alert) -> Result<()> {
        let mut cache = self.dedupe_cache.lock().await;
        cache.insert(alert.dedupe_key.clone(), Instant::now());
        Ok(())
    }
}

Multi-Channel Alert Delivery

Alert Sink Architecture

Pluggable Sinks: Flexible alert delivery through multiple channels.

#[async_trait]
pub trait AlertSink: Send + Sync {
    async fn send(&self, alert: &Alert) -> Result<DeliveryResult>;
    async fn health_check(&self) -> HealthStatus;
    fn name(&self) -> &str;
}

pub struct AlertDeliveryManager {
    sinks: Vec<Box<dyn AlertSink>>,
    retry_policy: RetryPolicy,
    circuit_breaker: CircuitBreaker,
}

impl AlertDeliveryManager {
    pub async fn deliver_alert(&self, alert: &Alert) -> Result<Vec<DeliveryResult>> {
        let mut results = Vec::new();

        // Deliver to all sinks in parallel
        let sink_tasks: Vec<_> = self.sinks.iter()
            .map(|sink| {
                let alert = alert.clone();
                let sink = sink.as_ref();
                tokio::spawn(async move {
                    self.deliver_to_sink(sink, &alert).await
                })
            })
            .collect();

        for task in sink_tasks {
            match task.await {
                Ok(result) => results.push(result),
                Err(e) => {
                    tracing::error!("Alert delivery task failed: {}", e);
                    results.push(DeliveryResult::Failed(e.to_string()));
                }
            }
        }

        Ok(results)
    }

    async fn deliver_to_sink(&self, sink: &dyn AlertSink, alert: &Alert) -> DeliveryResult {
        // Apply circuit breaker
        if self.circuit_breaker.is_open(sink.name()) {
            return DeliveryResult::CircuitBreakerOpen;
        }

        // Retry with exponential backoff
        let mut attempt = 0;
        let mut delay = Duration::from_millis(100);

        loop {
            match sink.send(alert).await {
                Ok(result) => {
                    self.circuit_breaker.record_success(sink.name());
                    return result;
                }
                Err(e) => {
                    attempt += 1;
                    if attempt >= self.retry_policy.max_attempts {
                        self.circuit_breaker.record_failure(sink.name());
                        return DeliveryResult::Failed(e.to_string());
                    }

                    tokio::time::sleep(delay).await;
                    delay = std::cmp::min(delay * 2, Duration::from_secs(60));
                }
            }
        }
    }
}

Specific Sink Implementations

Stdout Sink:

pub struct StdoutSink {
    format: OutputFormat,
}

#[async_trait]
impl AlertSink for StdoutSink {
    async fn send(&self, alert: &Alert) -> Result<DeliveryResult> {
        let output = match self.format {
            OutputFormat::Json => serde_json::to_string_pretty(alert)?,
            OutputFormat::Text => self.format_text(alert),
            OutputFormat::Csv => self.format_csv(alert),
        };

        println!("{}", output);
        Ok(DeliveryResult::Success)
    }

    async fn health_check(&self) -> HealthStatus {
        HealthStatus::Healthy
    }

    fn name(&self) -> &str {
        "stdout"
    }
}

Syslog Sink:

pub struct SyslogSink {
    facility: SyslogFacility,
    tag: String,
    socket: UnixDatagram,
}

#[async_trait]
impl AlertSink for SyslogSink {
    async fn send(&self, alert: &Alert) -> Result<DeliveryResult> {
        let priority = self.map_severity_to_priority(&alert.severity);
        let timestamp = self.format_timestamp(alert.alert_time);
        let message = format!(
            "<{}>{} {} {}: {}",
            priority,
            timestamp,
            self.tag,
            alert.title,
            alert.description
        );

        self.socket.send(message.as_bytes()).await?;
        Ok(DeliveryResult::Success)
    }

    async fn health_check(&self) -> HealthStatus {
        // Check if syslog socket is accessible
        HealthStatus::Healthy
    }

    fn name(&self) -> &str {
        "syslog"
    }
}

Webhook Sink:

pub struct WebhookSink {
    url: Url,
    client: reqwest::Client,
    headers: HeaderMap,
    timeout: Duration,
}

#[async_trait]
impl AlertSink for WebhookSink {
    async fn send(&self, alert: &Alert) -> Result<DeliveryResult> {
        let payload = serde_json::to_value(alert)?;

        let response = self.client
            .post(self.url.clone())
            .headers(self.headers.clone())
            .json(&payload)
            .timeout(self.timeout)
            .send()
            .await?;

        if response.status().is_success() {
            Ok(DeliveryResult::Success)
        } else {
            Err(AlertDeliveryError::HttpError(response.status()))
        }
    }

    async fn health_check(&self) -> HealthStatus {
        // Perform health check by sending a test request
        match self.client
            .get(self.url.clone())
            .timeout(Duration::from_secs(5))
            .send()
            .await
        {
            Ok(response) if response.status().is_success() => HealthStatus::Healthy,
            _ => HealthStatus::Unhealthy,
        }
    }

    fn name(&self) -> &str {
        "webhook"
    }
}

Performance Requirements and Optimization

Process Collection Performance

Target Metrics:

  • Process Enumeration: <5 seconds for 10,000+ processes
  • CPU Usage: <5% sustained during continuous monitoring
  • Memory Usage: <100MB resident under normal operation
  • Hash Computation: Complete within enumeration time

Optimization Strategies:

impl ProcessCollector {
    async fn enumerate_processes_optimized(&self) -> Result<Vec<ProcessRecord>> {
        let start_time = Instant::now();

        // Use parallel processing for hash computation
        let (processes, hash_tasks): (Vec<_>, Vec<_>) = self.collect_basic_process_data()
            .into_iter()
            .partition(|p| p.executable_path.is_none());

        // Compute hashes in parallel
        let hash_results = futures::future::join_all(
            hash_tasks.into_iter().map(|p| self.compute_hash_async(p))
        ).await;

        let mut all_processes = processes;
        all_processes.extend(hash_results.into_iter().flatten());

        let duration = start_time.elapsed();
        tracing::info!(
            process_count = all_processes.len(),
            duration_ms = duration.as_millis(),
            "Process enumeration completed"
        );

        Ok(all_processes)
    }
}

Detection Engine Performance

Target Metrics:

  • Rule Execution: <100ms per detection rule
  • SQL Validation: <10ms per query
  • Resource Limits: 30-second timeout, memory limits
  • Concurrent Execution: Parallel rule processing

Optimization Strategies:

impl DetectionEngine {
    async fn execute_rules_optimized(&self, scan_id: i64) -> Result<Vec<Alert>> {
        let rules = self.rule_manager.load_enabled_rules().await?;

        // Group rules by complexity for optimal scheduling
        let (simple_rules, complex_rules) = self.categorize_rules(rules);

        // Execute simple rules in parallel
        let simple_alerts = futures::future::join_all(
            simple_rules.into_iter().map(|rule| self.execute_rule(rule, scan_id))
        ).await;

        // Execute complex rules sequentially to avoid resource contention
        let mut complex_alerts = Vec::new();
        for rule in complex_rules {
            let alerts = self.execute_rule(rule, scan_id).await?;
            complex_alerts.extend(alerts);
        }

        // Combine results
        let mut all_alerts = Vec::new();
        for result in simple_alerts {
            all_alerts.extend(result?);
        }
        all_alerts.extend(complex_alerts);

        Ok(all_alerts)
    }
}

Error Handling and Recovery

Graceful Degradation

Process Collection Failures:

impl ProcessCollector {
    async fn enumerate_processes_with_fallback(&self) -> Result<Vec<ProcessRecord>> {
        match self.enumerate_processes_enhanced().await {
            Ok(processes) => Ok(processes),
            Err(e) => {
                tracing::warn!("Enhanced enumeration failed, falling back to basic: {}", e);
                self.enumerate_processes_basic().await
            }
        }
    }
}

Detection Engine Failures:

impl DetectionEngine {
    async fn execute_rule_with_recovery(&self, rule: &DetectionRule, scan_id: i64) -> Result<Vec<Alert>> {
        match self.execute_rule(rule, scan_id).await {
            Ok(alerts) => Ok(alerts),
            Err(e) => {
                tracing::error!(
                    rule_id = %rule.id,
                    error = %e,
                    "Rule execution failed, marking as disabled"
                );

                // Disable problematic rules to prevent repeated failures
                self.rule_manager.disable_rule(&rule.id).await?;
                Ok(Vec::new())
            }
        }
    }
}

Resource Management

Memory Pressure Handling:

impl ProcessCollector {
    async fn handle_memory_pressure(&self) -> Result<()> {
        let memory_usage = self.get_memory_usage()?;

        if memory_usage > self.config.memory_threshold {
            tracing::warn!("Memory pressure detected, reducing batch size");

            // Reduce batch size for hash computation
            self.hash_computer.set_buffer_size(
                self.hash_computer.buffer_size() / 2
            );

            // Trigger garbage collection
            tokio::task::yield_now().await;
        }

        Ok(())
    }
}

This core monitoring specification provides the foundation for DaemonEye's process monitoring capabilities, ensuring high performance, security, and reliability across all supported platforms.