advanced-features
About 943 wordsAbout 3 min
2025-09-08
This guide covers advanced features and capabilities of RWKV Agent Kit.
Multi-Agent Systems
Agent Orchestration
Create complex workflows with multiple specialized agents:
use rwkv_agent_kit::{RwkvAgentKit, AgentOrchestrator, WorkflowBuilder};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let kit = RwkvAgentKit::new("config.toml").await?;
// Create specialized agents
let researcher = kit.create_agent(
AgentConfig::new()
.with_name("researcher")
.with_system_prompt("You are a research specialist.")
).await?;
let writer = kit.create_agent(
AgentConfig::new()
.with_name("writer")
.with_system_prompt("You are a content writer.")
).await?;
let reviewer = kit.create_agent(
AgentConfig::new()
.with_name("reviewer")
.with_system_prompt("You are a content reviewer.")
).await?;
// Create workflow
let workflow = WorkflowBuilder::new()
.add_step("research", researcher)
.add_step("write", writer)
.add_step("review", reviewer)
.build();
// Execute workflow
let result = workflow.execute("Write an article about AI").await?;
println!("Final result: {}", result);
Ok(())
}Agent Communication
// Agents can communicate with each other
let message = agent1.send_message_to(&agent2, "Hello from agent1").await?;
let response = agent2.receive_message(message).await?;Advanced Memory Management
Custom Memory Backends
use rwkv_agent_kit::memory::{MemoryBackend, VectorMemory, GraphMemory};
// Use vector-based memory for semantic search
let vector_memory = VectorMemory::new()
.with_embedding_model("sentence-transformers/all-MiniLM-L6-v2")
.with_dimension(384)
.build();
let agent = kit.create_agent(
AgentConfig::new()
.with_memory_backend(vector_memory)
).await?;
// Use graph-based memory for relationship tracking
let graph_memory = GraphMemory::new()
.with_max_nodes(1000)
.with_relationship_types(vec!["knows", "likes", "works_with"])
.build();Memory Querying
// Semantic search in memory
let relevant_memories = agent.search_memories(
"programming languages",
SearchOptions::new()
.with_limit(10)
.with_similarity_threshold(0.7)
).await?;
// Temporal queries
let recent_memories = agent.get_memories_since(
chrono::Utc::now() - chrono::Duration::hours(24)
).await?;
// Graph traversal
let connected_memories = agent.get_connected_memories(
memory_id,
TraversalOptions::new()
.with_max_depth(3)
.with_relationship_types(vec!["related_to"])
).await?;Custom Tools Development
Creating Custom Tools
use rwkv_agent_kit::tools::{Tool, ToolResult, ToolError};
use async_trait::async_trait;
use serde_json::Value;
#[derive(Debug)]
struct WeatherTool {
api_key: String,
}
#[async_trait]
impl Tool for WeatherTool {
fn name(&self) -> &str {
"get_weather"
}
fn description(&self) -> &str {
"Get current weather information for a location"
}
fn parameters(&self) -> Value {
serde_json::json!({
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and country"
}
},
"required": ["location"]
})
}
async fn execute(&self, params: Value) -> Result<ToolResult, ToolError> {
let location = params["location"].as_str()
.ok_or_else(|| ToolError::InvalidParameters("Missing location".to_string()))?;
// Make API call to weather service
let weather_data = self.fetch_weather(location).await?;
Ok(ToolResult::success(weather_data))
}
}
impl WeatherTool {
async fn fetch_weather(&self, location: &str) -> Result<Value, ToolError> {
// Implementation details...
Ok(serde_json::json!({
"location": location,
"temperature": "22°C",
"condition": "Sunny"
}))
}
}Tool Composition
// Combine multiple tools
let composite_tool = CompositeToolBuilder::new()
.add_tool(WeatherTool::new(api_key))
.add_tool(Calculator::new())
.add_tool(WebSearch::new(search_api_key))
.with_execution_strategy(ExecutionStrategy::Sequential)
.build();Performance Optimization
Model Optimization
// Use quantized models for better performance
let config = RwkvConfig::new()
.with_quantization(QuantizationType::Int8)
.with_batch_size(4)
.with_sequence_length(2048);
let kit = RwkvAgentKit::with_config(config).await?;Caching
// Enable response caching
let agent = kit.create_agent(
AgentConfig::new()
.with_cache_enabled(true)
.with_cache_ttl(Duration::from_secs(3600))
).await?;
// Custom cache backends
let redis_cache = RedisCache::new("redis://localhost:6379").await?;
let agent = kit.create_agent(
AgentConfig::new()
.with_cache_backend(redis_cache)
).await?;Parallel Processing
use tokio::task::JoinSet;
// Process multiple requests in parallel
let mut join_set = JoinSet::new();
for query in queries {
let agent_clone = agent.clone();
join_set.spawn(async move {
agent_clone.chat(&query).await
});
}
while let Some(result) = join_set.join_next().await {
match result? {
Ok(response) => println!("Response: {}", response),
Err(e) => eprintln!("Error: {}", e),
}
}Integration Patterns
Web Service Integration
use axum::{extract::State, http::StatusCode, response::Json, routing::post, Router};
use serde::{Deserialize, Serialize};
#[derive(Deserialize)]
struct ChatRequest {
message: String,
agent_id: Option<String>,
}
#[derive(Serialize)]
struct ChatResponse {
response: String,
agent_id: String,
}
async fn chat_handler(
State(kit): State<RwkvAgentKit>,
Json(request): Json<ChatRequest>,
) -> Result<Json<ChatResponse>, StatusCode> {
let agent = match request.agent_id {
Some(id) => kit.get_agent(&id).await
.map_err(|_| StatusCode::NOT_FOUND)?,
None => kit.create_agent(AgentConfig::default()).await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?,
};
let response = agent.chat(&request.message).await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(ChatResponse {
response,
agent_id: agent.id().to_string(),
}))
}
#[tokio::main]
async fn main() {
let kit = RwkvAgentKit::new("config.toml").await.unwrap();
let app = Router::new()
.route("/chat", post(chat_handler))
.with_state(kit);
axum::Server::bind(&"0.0.0.0:3000".parse().unwrap())
.serve(app.into_make_service())
.await
.unwrap();
}Database Integration
use sqlx::{PgPool, Row};
// Store conversation history in database
struct ConversationStore {
pool: PgPool,
}
impl ConversationStore {
async fn save_message(&self, agent_id: &str, message: &str, response: &str) -> Result<(), sqlx::Error> {
sqlx::query!(
"INSERT INTO conversations (agent_id, user_message, agent_response, created_at) VALUES ($1, $2, $3, NOW())",
agent_id, message, response
)
.execute(&self.pool)
.await?;
Ok(())
}
async fn get_conversation_history(&self, agent_id: &str, limit: i32) -> Result<Vec<(String, String)>, sqlx::Error> {
let rows = sqlx::query!(
"SELECT user_message, agent_response FROM conversations WHERE agent_id = $1 ORDER BY created_at DESC LIMIT $2",
agent_id, limit
)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter()
.map(|row| (row.user_message, row.agent_response))
.collect())
}
}Monitoring and Debugging
Logging
use tracing::{info, warn, error};
use tracing_subscriber;
// Initialize logging
tracing_subscriber::fmt::init();
// Log agent interactions
let response = agent.chat("Hello").await?;
info!("Agent response: {}", response);
// Enable debug logging for the kit
let kit = RwkvAgentKit::new("config.toml")
.with_log_level(tracing::Level::DEBUG)
.await?;Metrics
use prometheus::{Counter, Histogram, register_counter, register_histogram};
// Define metrics
let request_counter = register_counter!("agent_requests_total", "Total number of agent requests")?;
let response_time = register_histogram!("agent_response_time_seconds", "Agent response time in seconds")?;
// Instrument your code
let start = std::time::Instant::now();
let response = agent.chat("Hello").await?;
let duration = start.elapsed();
request_counter.inc();
response_time.observe(duration.as_secs_f64());Security Considerations
Input Sanitization
use regex::Regex;
fn sanitize_input(input: &str) -> String {
let re = Regex::new(r"[<>\"'&]").unwrap();
re.replace_all(input, "").to_string()
}
// Use sanitized input
let sanitized = sanitize_input(&user_input);
let response = agent.chat(&sanitized).await?;Rate Limiting
use governor::{Quota, RateLimiter};
use std::num::NonZeroU32;
// Create rate limiter
let quota = Quota::per_second(NonZeroU32::new(10).unwrap());
let limiter = RateLimiter::direct(quota);
// Check rate limit before processing
if limiter.check().is_ok() {
let response = agent.chat(&message).await?;
// Process response
} else {
// Handle rate limit exceeded
}