Collector-Core Framework
The collector-core framework provides a unified collection infrastructure that enables multiple monitoring components while maintaining shared operational foundation.
Table of Contents
- Overview
- Architecture
- Core Components
- Event Processing Pipeline
- Configuration System
- Health Monitoring
- IPC Integration
- Event Source Implementation
- Advanced Features
- Performance Characteristics
- Testing Strategy
- Usage Examples
- Best Practices
Overview
The collector-core framework is the foundation of DaemonEye’s extensible monitoring architecture. It provides:
- Universal
EventSourcetrait for pluggable collection implementations Collectorruntime for event source management and aggregation- Extensible
CollectionEventenum for unified event handling - Capability negotiation through
SourceCapsbitflags - Shared infrastructure for configuration, logging, and health checks
Architecture
┌─────────────────────────────────────────────────────────────┐
│ Collector Runtime │
├─────────────────────────────────────────────────────────────┤
│ EventSource EventSource EventSource EventSource │
│ (Process) (Network) (Filesystem) (Performance) │
└─────────────────────────────────────────────────────────────┘
The framework separates collection methodology from operational infrastructure, allowing different collection strategies to share the same runtime foundation.
Core Components
EventSource Trait
The EventSource trait abstracts collection methodology from operational infrastructure:
#[async_trait]
pub trait EventSource: Send + Sync {
fn name(&self) -> &'static str;
fn capabilities(&self) -> SourceCaps;
async fn start(
&self,
tx: mpsc::Sender<CollectionEvent>,
shutdown_signal: Arc<AtomicBool>,
) -> anyhow::Result<()>;
async fn stop(&self) -> anyhow::Result<()>;
async fn health_check(&self) -> anyhow::Result<()>;
}
Capability System
The SourceCaps bitflags enable capability negotiation between components:
bitflags! {
pub struct SourceCaps: u32 {
const PROCESS = 1 << 0; // Process monitoring
const NETWORK = 1 << 1; // Network monitoring
const FILESYSTEM = 1 << 2; // Filesystem monitoring
const PERFORMANCE = 1 << 3; // Performance monitoring
const REALTIME = 1 << 4; // Real-time event streaming
const KERNEL_LEVEL = 1 << 5; // Kernel-level access
const SYSTEM_WIDE = 1 << 6; // System-wide monitoring
}
}
Collection Events
The CollectionEvent enum provides unified event handling:
pub enum CollectionEvent {
Process(ProcessEvent),
Network(NetworkEvent),
Filesystem(FilesystemEvent),
Performance(PerformanceEvent),
TriggerRequest(TriggerRequest),
}
Collector Runtime
The Collector provides unified runtime for multiple event sources:
pub struct Collector {
config: CollectorConfig,
sources: Vec<Box<dyn EventSource>>,
}
impl Collector {
pub fn new(config: CollectorConfig) -> Self;
pub fn register(&mut self, source: Box<dyn EventSource>) -> anyhow::Result<()>;
pub fn capabilities(&self) -> SourceCaps;
pub async fn run(self) -> Result<()>;
}
Event Processing Pipeline
Event Flow
- Event Sources generate events based on their collection methodology
- Event Channel receives events via
mpsc::Sender<CollectionEvent> - Event Processor handles batching, backpressure, and processing
- Event Bus (optional) provides pub/sub event distribution
- Storage/Forwarding processes events according to configuration
Batching and Backpressure
The framework includes sophisticated event processing with:
- Configurable Batching: Events are batched for efficient processing
- Backpressure Handling: Semaphore-based flow control prevents memory exhaustion
- Timeout Management: Batch timeouts ensure timely processing
- Graceful Degradation: System continues operation under load
// Batch configuration
fn create_batch_config() -> CollectorConfig {
CollectorConfig::new()
.with_max_batch_size(1000)
.with_batch_timeout(Duration::from_secs(5))
.with_backpressure_threshold(800)
}
Configuration System
Hierarchical Configuration
The framework supports hierarchical configuration loading:
- Command-line flags (highest precedence)
- Environment variables
- User configuration files
- System configuration files
- Embedded defaults (lowest precedence)
Configuration Structure
pub struct CollectorConfig {
pub component_name: String,
pub max_event_sources: usize,
pub event_buffer_size: usize,
pub max_batch_size: usize,
pub batch_timeout: Duration,
pub shutdown_timeout: Duration,
pub health_check_interval: Duration,
pub enable_telemetry: bool,
pub enable_debug_logging: bool,
// ... additional configuration options
}
Health Monitoring
Health Check System
The framework provides comprehensive health monitoring:
- Source Health Checks: Individual event source health monitoring
- System Resource Monitoring: CPU, memory, and performance tracking
- Error Rate Monitoring: Automatic error rate calculation and alerting
- Telemetry Collection: Performance metrics and operational statistics
Health Status Types
pub enum HealthStatus {
Healthy,
Degraded,
Unhealthy,
}
IPC Integration
Collector IPC Server
The framework includes IPC server capabilities for external communication:
pub struct CollectorIpcServer {
// IPC server implementation
}
impl CollectorIpcServer {
pub async fn start(&mut self) -> Result<()>;
pub async fn handle_request(&self, request: IpcRequest) -> IpcResponse;
pub fn get_capabilities(&self) -> SourceCaps;
}
Protocol Support
- Unix Domain Sockets (Linux/macOS)
- Named Pipes (Windows)
- Protobuf Serialization for efficient communication
- Automatic Reconnection with exponential backoff
Event Source Implementation
Process Event Source Example
use async_trait::async_trait;
use collector_core::{CollectionEvent, EventSource, ProcessEvent, SourceCaps};
pub struct ProcessEventSource {
config: ProcessSourceConfig,
db_manager: Arc<Mutex<DatabaseManager>>,
}
#[async_trait]
impl EventSource for ProcessEventSource {
fn name(&self) -> &'static str {
"process-collector"
}
fn capabilities(&self) -> SourceCaps {
SourceCaps::PROCESS | SourceCaps::REALTIME | SourceCaps::SYSTEM_WIDE
}
async fn start(
&self,
tx: mpsc::Sender<CollectionEvent>,
shutdown_signal: Arc<AtomicBool>,
) -> anyhow::Result<()> {
let mut interval = tokio::time::interval(self.config.collection_interval);
while !shutdown_signal.load(Ordering::Relaxed) {
interval.tick().await;
// Collect process information
let processes = self.collect_processes().await?;
// Send events
for process in processes {
let event = CollectionEvent::Process(ProcessEvent {
pid: process.pid,
name: process.name,
timestamp: SystemTime::now(),
// ... additional fields
});
if tx.send(event).await.is_err() {
warn!("Event channel closed, stopping collection");
break;
}
}
}
Ok(())
}
async fn stop(&self) -> anyhow::Result<()> {
// Cleanup resources
Ok(())
}
}
Advanced Features
Analysis Chain Coordination
The framework includes analysis chain coordination for complex workflows:
pub struct AnalysisChainCoordinator {
// Coordinates multi-stage analysis workflows
}
pub struct AnalysisWorkflowDefinition {
pub stages: Vec<AnalysisStage>,
pub dependencies: HashMap<String, Vec<String>>,
pub timeout: Duration,
}
Trigger Management
Sophisticated trigger management for event-driven analysis:
pub struct TriggerManager {
// Manages trigger conditions and priorities
}
pub struct TriggerCondition {
pub sql_condition: String,
pub priority: TriggerPriority,
pub resource_limits: TriggerResourceLimits,
}
Event Bus System
Optional pub/sub event distribution:
pub struct EventBus {
// Pub/sub event distribution system
}
pub struct EventSubscription {
pub filter: EventFilter,
pub correlation_filter: Option<CorrelationFilter>,
pub subscriber_id: String,
}
Performance Characteristics
Throughput Targets
- Event Processing: >1,000 events/second
- CPU Overhead: <5% sustained usage
- Memory Usage: <100MB resident under normal operation
- Latency: <100ms per event processing
Scalability
- Event Sources: Support for 16+ concurrent event sources
- Event Buffer: Configurable buffer sizes (1,000-10,000 events)
- Batch Processing: Configurable batch sizes (100-1,000 events)
- Backpressure: Automatic flow control and graceful degradation
Testing Strategy
Test Coverage
The framework includes comprehensive testing:
- Unit Tests: Individual component functionality
- Integration Tests: Cross-component interaction
- Performance Tests: Throughput and latency benchmarks
- Property Tests: Generative testing for edge cases
Test Utilities
// Test utilities for event source testing
pub mod test_utils {
pub struct MockEventSource;
pub struct TestCollector;
pub fn create_test_config() -> CollectorConfig;
}
Usage Examples
Basic Collector Setup
use collector_core::{Collector, CollectorConfig};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let config = CollectorConfig::new()
.with_component_name("my-collector".to_string())
.with_max_event_sources(4)
.with_event_buffer_size(2000);
let mut collector = Collector::new(config);
// Register event sources
collector.register(Box::new(ProcessEventSource::new()))?;
collector.register(Box::new(NetworkEventSource::new()))?;
// Run the collector
collector.run().await
}
Custom Event Source
use async_trait::async_trait;
use collector_core::{CollectionEvent, EventSource, SourceCaps};
struct CustomEventSource;
#[async_trait]
impl EventSource for CustomEventSource {
fn name(&self) -> &'static str {
"custom-source"
}
fn capabilities(&self) -> SourceCaps {
SourceCaps::PERFORMANCE | SourceCaps::REALTIME
}
async fn start(
&self,
tx: mpsc::Sender<CollectionEvent>,
shutdown_signal: Arc<AtomicBool>,
) -> anyhow::Result<()> {
// Custom collection logic
Ok(())
}
async fn stop(&self) -> anyhow::Result<()> {
Ok(())
}
}
Best Practices
Event Source Development
- Implement Graceful Shutdown: Always check the shutdown signal
- Handle Errors Gracefully: Continue operation when possible
- Use Structured Logging: Provide detailed operational information
- Monitor Performance: Track collection rates and resource usage
- Test Thoroughly: Include unit, integration, and performance tests
Configuration Management
- Use Builder Pattern: Provide fluent configuration APIs
- Validate Configuration: Check configuration at startup
- Support Hierarchical Loading: Allow multiple configuration sources
- Document Defaults: Clearly document default values
- Environment Variable Support: Enable container-friendly configuration
Performance Optimization
- Batch Events: Use batching for efficient processing
- Monitor Backpressure: Implement flow control mechanisms
- Optimize Hot Paths: Profile and optimize critical code paths
- Use Async I/O: Leverage Tokio for concurrent operations
- Resource Limits: Implement resource budgets and limits
The collector-core framework provides the foundation for DaemonEye’s extensible monitoring architecture, enabling multiple collection strategies while maintaining shared operational infrastructure.