Skip to content

advanced-features

About 943 wordsAbout 3 min

2025-09-08

This guide covers advanced features and capabilities of RWKV Agent Kit.

Multi-Agent Systems

Agent Orchestration

Create complex workflows with multiple specialized agents:

use rwkv_agent_kit::{RwkvAgentKit, AgentOrchestrator, WorkflowBuilder};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let kit = RwkvAgentKit::new("config.toml").await?;
    
    // Create specialized agents
    let researcher = kit.create_agent(
        AgentConfig::new()
            .with_name("researcher")
            .with_system_prompt("You are a research specialist.")
    ).await?;
    
    let writer = kit.create_agent(
        AgentConfig::new()
            .with_name("writer")
            .with_system_prompt("You are a content writer.")
    ).await?;
    
    let reviewer = kit.create_agent(
        AgentConfig::new()
            .with_name("reviewer")
            .with_system_prompt("You are a content reviewer.")
    ).await?;
    
    // Create workflow
    let workflow = WorkflowBuilder::new()
        .add_step("research", researcher)
        .add_step("write", writer)
        .add_step("review", reviewer)
        .build();
    
    // Execute workflow
    let result = workflow.execute("Write an article about AI").await?;
    println!("Final result: {}", result);
    
    Ok(())
}

Agent Communication

// Agents can communicate with each other
let message = agent1.send_message_to(&agent2, "Hello from agent1").await?;
let response = agent2.receive_message(message).await?;

Advanced Memory Management

Custom Memory Backends

use rwkv_agent_kit::memory::{MemoryBackend, VectorMemory, GraphMemory};

// Use vector-based memory for semantic search
let vector_memory = VectorMemory::new()
    .with_embedding_model("sentence-transformers/all-MiniLM-L6-v2")
    .with_dimension(384)
    .build();

let agent = kit.create_agent(
    AgentConfig::new()
        .with_memory_backend(vector_memory)
).await?;

// Use graph-based memory for relationship tracking
let graph_memory = GraphMemory::new()
    .with_max_nodes(1000)
    .with_relationship_types(vec!["knows", "likes", "works_with"])
    .build();

Memory Querying

// Semantic search in memory
let relevant_memories = agent.search_memories(
    "programming languages",
    SearchOptions::new()
        .with_limit(10)
        .with_similarity_threshold(0.7)
).await?;

// Temporal queries
let recent_memories = agent.get_memories_since(
    chrono::Utc::now() - chrono::Duration::hours(24)
).await?;

// Graph traversal
let connected_memories = agent.get_connected_memories(
    memory_id,
    TraversalOptions::new()
        .with_max_depth(3)
        .with_relationship_types(vec!["related_to"])
).await?;

Custom Tools Development

Creating Custom Tools

use rwkv_agent_kit::tools::{Tool, ToolResult, ToolError};
use async_trait::async_trait;
use serde_json::Value;

#[derive(Debug)]
struct WeatherTool {
    api_key: String,
}

#[async_trait]
impl Tool for WeatherTool {
    fn name(&self) -> &str {
        "get_weather"
    }
    
    fn description(&self) -> &str {
        "Get current weather information for a location"
    }
    
    fn parameters(&self) -> Value {
        serde_json::json!({
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "The city and country"
                }
            },
            "required": ["location"]
        })
    }
    
    async fn execute(&self, params: Value) -> Result<ToolResult, ToolError> {
        let location = params["location"].as_str()
            .ok_or_else(|| ToolError::InvalidParameters("Missing location".to_string()))?;
        
        // Make API call to weather service
        let weather_data = self.fetch_weather(location).await?;
        
        Ok(ToolResult::success(weather_data))
    }
}

impl WeatherTool {
    async fn fetch_weather(&self, location: &str) -> Result<Value, ToolError> {
        // Implementation details...
        Ok(serde_json::json!({
            "location": location,
            "temperature": "22°C",
            "condition": "Sunny"
        }))
    }
}

Tool Composition

// Combine multiple tools
let composite_tool = CompositeToolBuilder::new()
    .add_tool(WeatherTool::new(api_key))
    .add_tool(Calculator::new())
    .add_tool(WebSearch::new(search_api_key))
    .with_execution_strategy(ExecutionStrategy::Sequential)
    .build();

Performance Optimization

Model Optimization

// Use quantized models for better performance
let config = RwkvConfig::new()
    .with_quantization(QuantizationType::Int8)
    .with_batch_size(4)
    .with_sequence_length(2048);

let kit = RwkvAgentKit::with_config(config).await?;

Caching

// Enable response caching
let agent = kit.create_agent(
    AgentConfig::new()
        .with_cache_enabled(true)
        .with_cache_ttl(Duration::from_secs(3600))
).await?;

// Custom cache backends
let redis_cache = RedisCache::new("redis://localhost:6379").await?;
let agent = kit.create_agent(
    AgentConfig::new()
        .with_cache_backend(redis_cache)
).await?;

Parallel Processing

use tokio::task::JoinSet;

// Process multiple requests in parallel
let mut join_set = JoinSet::new();

for query in queries {
    let agent_clone = agent.clone();
    join_set.spawn(async move {
        agent_clone.chat(&query).await
    });
}

while let Some(result) = join_set.join_next().await {
    match result? {
        Ok(response) => println!("Response: {}", response),
        Err(e) => eprintln!("Error: {}", e),
    }
}

Integration Patterns

Web Service Integration

use axum::{extract::State, http::StatusCode, response::Json, routing::post, Router};
use serde::{Deserialize, Serialize};

#[derive(Deserialize)]
struct ChatRequest {
    message: String,
    agent_id: Option<String>,
}

#[derive(Serialize)]
struct ChatResponse {
    response: String,
    agent_id: String,
}

async fn chat_handler(
    State(kit): State<RwkvAgentKit>,
    Json(request): Json<ChatRequest>,
) -> Result<Json<ChatResponse>, StatusCode> {
    let agent = match request.agent_id {
        Some(id) => kit.get_agent(&id).await
            .map_err(|_| StatusCode::NOT_FOUND)?,
        None => kit.create_agent(AgentConfig::default()).await
            .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?,
    };
    
    let response = agent.chat(&request.message).await
        .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
    
    Ok(Json(ChatResponse {
        response,
        agent_id: agent.id().to_string(),
    }))
}

#[tokio::main]
async fn main() {
    let kit = RwkvAgentKit::new("config.toml").await.unwrap();
    
    let app = Router::new()
        .route("/chat", post(chat_handler))
        .with_state(kit);
    
    axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
        .serve(app.into_make_service())
        .await
        .unwrap();
}

Database Integration

use sqlx::{PgPool, Row};

// Store conversation history in database
struct ConversationStore {
    pool: PgPool,
}

impl ConversationStore {
    async fn save_message(&self, agent_id: &str, message: &str, response: &str) -> Result<(), sqlx::Error> {
        sqlx::query!(
            "INSERT INTO conversations (agent_id, user_message, agent_response, created_at) VALUES ($1, $2, $3, NOW())",
            agent_id, message, response
        )
        .execute(&self.pool)
        .await?;
        
        Ok(())
    }
    
    async fn get_conversation_history(&self, agent_id: &str, limit: i32) -> Result<Vec<(String, String)>, sqlx::Error> {
        let rows = sqlx::query!(
            "SELECT user_message, agent_response FROM conversations WHERE agent_id = $1 ORDER BY created_at DESC LIMIT $2",
            agent_id, limit
        )
        .fetch_all(&self.pool)
        .await?;
        
        Ok(rows.into_iter()
            .map(|row| (row.user_message, row.agent_response))
            .collect())
    }
}

Monitoring and Debugging

Logging

use tracing::{info, warn, error};
use tracing_subscriber;

// Initialize logging
tracing_subscriber::fmt::init();

// Log agent interactions
let response = agent.chat("Hello").await?;
info!("Agent response: {}", response);

// Enable debug logging for the kit
let kit = RwkvAgentKit::new("config.toml")
    .with_log_level(tracing::Level::DEBUG)
    .await?;

Metrics

use prometheus::{Counter, Histogram, register_counter, register_histogram};

// Define metrics
let request_counter = register_counter!("agent_requests_total", "Total number of agent requests")?;
let response_time = register_histogram!("agent_response_time_seconds", "Agent response time in seconds")?;

// Instrument your code
let start = std::time::Instant::now();
let response = agent.chat("Hello").await?;
let duration = start.elapsed();

request_counter.inc();
response_time.observe(duration.as_secs_f64());

Security Considerations

Input Sanitization

use regex::Regex;

fn sanitize_input(input: &str) -> String {
    let re = Regex::new(r"[<>\"'&]").unwrap();
    re.replace_all(input, "").to_string()
}

// Use sanitized input
let sanitized = sanitize_input(&user_input);
let response = agent.chat(&sanitized).await?;

Rate Limiting

use governor::{Quota, RateLimiter};
use std::num::NonZeroU32;

// Create rate limiter
let quota = Quota::per_second(NonZeroU32::new(10).unwrap());
let limiter = RateLimiter::direct(quota);

// Check rate limit before processing
if limiter.check().is_ok() {
    let response = agent.chat(&message).await?;
    // Process response
} else {
    // Handle rate limit exceeded
}