0rca POD Architecture
What is a POD?
A POD (Personal Object Datastore) in the 0rca ecosystem represents a containerized, sovereign AI agent with its own dedicated infrastructure, identity, and financial management system on Cronos EVM. Each POD is a self-contained unit that can operate independently while participating in the broader 0rca network.
POD Components
1. Container Infrastructure
Each POD runs as a Kubernetes deployment with dedicated resources:
apiVersion: apps/v1
kind: Deployment
metadata:
name: pod-agent-123
namespace: orca-pods
labels:
pod-type: "ai-agent"
pod-id: "agent-123"
owner: "0x..."
spec:
replicas: 2
selector:
matchLabels:
pod-id: "agent-123"
template:
metadata:
labels:
pod-id: "agent-123"
spec:
containers:
- name: agent
image: orca-agent:latest
ports:
- containerPort: 8000
env:
- name: POD_ID
value: "agent-123"
- name: CRONOS_VAULT_ADDRESS
valueFrom:
secretKeyRef:
name: pod-secrets
key: vault-address
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "2Gi"
cpu: "1000m"
2. Sovereign Vault (Financial Layer)
Each POD has its own smart contract on Cronos EVM for financial operations:
contract PODVault {
// POD Identity
string public podId;
address public owner; // POD creator/developer
address public agentWallet; // POD's operational wallet
// Financial State
IERC20 public immutable usdc;
uint256 public totalEarnings;
uint256 public availableBalance;
uint256 public totalTasks;
// Task Management
mapping(bytes32 => Task) public tasks;
mapping(address => bool) public authorizedOperators;
struct Task {
uint256 amount;
address creator;
bool completed;
uint256 timestamp;
string metadata;
}
// Events
event TaskCreated(bytes32 indexed taskId, uint256 amount, address creator);
event TaskCompleted(bytes32 indexed taskId, uint256 amount);
event EarningsWithdrawn(uint256 amount, address recipient);
constructor(
string memory _podId,
address _owner,
address _agentWallet,
address _usdc
) {
podId = _podId;
owner = _owner;
agentWallet = _agentWallet;
usdc = IERC20(_usdc);
}
function createTask(bytes32 taskId, uint256 amount, string memory metadata)
external {
require(tasks[taskId].amount == 0, "Task already exists");
require(usdc.transferFrom(msg.sender, address(this), amount), "Transfer failed");
tasks[taskId] = Task({
amount: amount,
creator: msg.sender,
completed: false,
timestamp: block.timestamp,
metadata: metadata
});
emit TaskCreated(taskId, amount, msg.sender);
}
function completeTask(bytes32 taskId) external {
require(msg.sender == agentWallet || authorizedOperators[msg.sender], "Unauthorized");
Task storage task = tasks[taskId];
require(!task.completed, "Task already completed");
require(task.amount > 0, "Task does not exist");
task.completed = true;
availableBalance += task.amount;
totalEarnings += task.amount;
totalTasks++;
emit TaskCompleted(taskId, task.amount);
}
function withdraw(address recipient, uint256 amount) external {
require(msg.sender == owner, "Only owner can withdraw");
require(amount <= availableBalance, "Insufficient balance");
availableBalance -= amount;
require(usdc.transfer(recipient, amount), "Transfer failed");
emit EarningsWithdrawn(amount, recipient);
}
}
3. Identity & Authentication
Each POD has a unique cryptographic identity:
class PODIdentity:
def __init__(self, pod_id: str):
self.pod_id = pod_id
self.private_key = self._load_or_create_key()
self.public_address = Account.from_key(self.private_key).address
self.vault_address = self._deploy_or_load_vault()
def _load_or_create_key(self) -> str:
key_file = f"/secrets/{self.pod_id}_identity.json"
if os.path.exists(key_file):
with open(key_file, 'r') as f:
encrypted_key = json.load(f)
return Account.decrypt(encrypted_key, os.getenv('POD_PASSWORD'))
else:
# Create new identity
account = Account.create()
encrypted_key = account.encrypt(os.getenv('POD_PASSWORD'))
with open(key_file, 'w') as f:
json.dump(encrypted_key, f)
return account.privateKey.hex()
def sign_transaction(self, transaction: dict) -> str:
signed_txn = Account.sign_transaction(transaction, self.private_key)
return signed_txn.rawTransaction.hex()
def verify_ownership(self) -> bool:
vault_contract = self.web3.eth.contract(
address=self.vault_address,
abi=POD_VAULT_ABI
)
return vault_contract.functions.agentWallet().call() == self.public_address
4. Data Storage & State Management
PODs maintain their own persistent storage:
class PODDatastore:
def __init__(self, pod_id: str):
self.pod_id = pod_id
self.db_path = f"/data/{pod_id}.db"
self.redis_key_prefix = f"pod:{pod_id}"
self._init_database()
def _init_database(self):
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY,
user_address TEXT,
prompt TEXT,
response TEXT,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
task_id TEXT,
cost_usdc REAL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS pod_metrics (
id INTEGER PRIMARY KEY,
metric_name TEXT,
metric_value REAL,
timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
def store_conversation(self, user_address: str, prompt: str,
response: str, task_id: str, cost: float):
conn = sqlite3.connect(self.db_path)
conn.execute("""
INSERT INTO conversations (user_address, prompt, response, task_id, cost_usdc)
VALUES (?, ?, ?, ?, ?)
""", (user_address, prompt, response, task_id, cost))
conn.commit()
conn.close()
def get_conversation_history(self, user_address: str, limit: int = 10) -> List[dict]:
conn = sqlite3.connect(self.db_path)
cursor = conn.execute("""
SELECT prompt, response, timestamp, cost_usdc
FROM conversations
WHERE user_address = ?
ORDER BY timestamp DESC
LIMIT ?
""", (user_address, limit))
results = [
{
"prompt": row[0],
"response": row[1],
"timestamp": row[2],
"cost": row[3]
}
for row in cursor.fetchall()
]
conn.close()
return results
POD Lifecycle Management
1. POD Creation & Deployment
class PODManager:
def __init__(self, kubernetes_client, cronos_web3):
self.k8s = kubernetes_client
self.web3 = cronos_web3
async def create_pod(self, pod_config: PODConfig) -> PODDeployment:
# 1. Deploy Sovereign Vault on Cronos
vault_address = await self._deploy_vault(pod_config)
# 2. Create Kubernetes resources
deployment = await self._create_k8s_deployment(pod_config, vault_address)
# 3. Setup networking and ingress
service = await self._create_service(pod_config)
ingress = await self._create_ingress(pod_config)
# 4. Initialize POD identity and secrets
identity = await self._setup_pod_identity(pod_config)
# 5. Register in 0rca network
await self._register_pod(pod_config, vault_address, identity.public_address)
return PODDeployment(
pod_id=pod_config.pod_id,
vault_address=vault_address,
endpoint=f"https://{pod_config.subdomain}.0rca.live",
identity_address=identity.public_address
)
async def _deploy_vault(self, config: PODConfig) -> str:
vault_factory = self.web3.eth.contract(
address=VAULT_FACTORY_ADDRESS,
abi=VAULT_FACTORY_ABI
)
tx = vault_factory.functions.createPODVault(
config.pod_id,
config.owner_address,
config.agent_wallet_address,
USDC_ADDRESS
).buildTransaction({
'from': config.owner_address,
'gas': 2000000,
'gasPrice': self.web3.toWei('20', 'gwei')
})
# Use CroGas for gasless deployment
signed_tx = self.web3.eth.account.sign_transaction(tx, config.private_key)
tx_hash = await self.crogas.relay_transaction(signed_tx)
receipt = self.web3.eth.wait_for_transaction_receipt(tx_hash)
vault_address = receipt.logs[0].address
return vault_address
2. POD Auto-Scaling
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: pod-agent-123-hpa
namespace: orca-pods
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: pod-agent-123
minReplicas: 1
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Pods
pods:
metric:
name: active_requests
target:
type: AverageValue
averageValue: "5"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 10
periodSeconds: 60
3. POD Health Monitoring
class PODHealthMonitor:
def __init__(self, pod_id: str):
self.pod_id = pod_id
self.metrics_collector = MetricsCollector()
self.alert_manager = AlertManager()
async def health_check(self) -> PODHealthStatus:
# Check container health
container_health = await self._check_container_health()
# Check vault balance and transactions
vault_health = await self._check_vault_health()
# Check AI model availability
ai_health = await self._check_ai_health()
# Check network connectivity
network_health = await self._check_network_health()
overall_status = self._calculate_overall_health([
container_health, vault_health, ai_health, network_health
])
if overall_status.status != "healthy":
await self.alert_manager.send_alert(
f"POD {self.pod_id} health issue: {overall_status.message}"
)
return overall_status
async def _check_vault_health(self) -> HealthStatus:
try:
vault = self.web3.eth.contract(
address=self.vault_address,
abi=POD_VAULT_ABI
)
# Check if vault is responsive
balance = vault.functions.availableBalance().call()
total_tasks = vault.functions.totalTasks().call()
# Check recent activity
recent_blocks = 100
events = vault.events.TaskCompleted.createFilter(
fromBlock=self.web3.eth.block_number - recent_blocks
).get_all_entries()
return HealthStatus(
status="healthy",
message=f"Vault active: {balance} USDC, {total_tasks} tasks, {len(events)} recent completions"
)
except Exception as e:
return HealthStatus(
status="unhealthy",
message=f"Vault check failed: {str(e)}"
)
POD Communication & Discovery
1. POD Registry
class PODRegistry:
def __init__(self, supabase_client):
self.supabase = supabase_client
async def register_pod(self, pod_info: PODInfo):
result = self.supabase.table('pods').insert({
'pod_id': pod_info.pod_id,
'name': pod_info.name,
'description': pod_info.description,
'owner_address': pod_info.owner_address,
'vault_address': pod_info.vault_address,
'endpoint': pod_info.endpoint,
'capabilities': pod_info.capabilities,
'price_per_task': pod_info.price_per_task,
'category': pod_info.category,
'status': 'active',
'created_at': datetime.utcnow().isoformat()
}).execute()
return result.data[0]['id']
async def discover_pods(self, query: PODQuery) -> List[PODInfo]:
query_builder = self.supabase.table('pods').select('*')
if query.category:
query_builder = query_builder.eq('category', query.category)
if query.max_price:
query_builder = query_builder.lte('price_per_task', query.max_price)
if query.capabilities:
for capability in query.capabilities:
query_builder = query_builder.contains('capabilities', [capability])
result = query_builder.eq('status', 'active').execute()
return [PODInfo.from_dict(pod) for pod in result.data]
2. Inter-POD Communication
class InterPODCommunication:
def __init__(self, pod_id: str):
self.pod_id = pod_id
self.identity = PODIdentity(pod_id)
async def send_message(self, target_pod_id: str, message: PODMessage) -> PODResponse:
# Discover target POD endpoint
target_pod = await self.registry.get_pod_info(target_pod_id)
# Sign message with POD identity
signed_message = self.identity.sign_message(message.to_dict())
# Send HTTP request to target POD
async with aiohttp.ClientSession() as session:
async with session.post(
f"{target_pod.endpoint}/pod/message",
json={
'message': message.to_dict(),
'signature': signed_message,
'sender_pod_id': self.pod_id
},
headers={'Content-Type': 'application/json'}
) as response:
if response.status == 200:
return PODResponse.from_dict(await response.json())
else:
raise PODCommunicationError(f"Failed to send message: {response.status}")
async def handle_incoming_message(self, message_data: dict) -> PODResponse:
# Verify sender signature
sender_pod_id = message_data['sender_pod_id']
sender_pod = await self.registry.get_pod_info(sender_pod_id)
if not self._verify_signature(message_data, sender_pod.identity_address):
raise PODSecurityError("Invalid message signature")
# Process message based on type
message = PODMessage.from_dict(message_data['message'])
if message.type == "task_request":
return await self._handle_task_request(message)
elif message.type == "capability_query":
return await self._handle_capability_query(message)
else:
raise PODError(f"Unknown message type: {message.type}")
POD Performance & Analytics
1. Performance Metrics
class PODAnalytics:
def __init__(self, pod_id: str):
self.pod_id = pod_id
self.prometheus = PrometheusClient()
def track_request(self, request_type: str, duration: float, success: bool):
# Prometheus metrics
self.prometheus.increment_counter(
'pod_requests_total',
labels={'pod_id': self.pod_id, 'type': request_type, 'success': str(success)}
)
self.prometheus.observe_histogram(
'pod_request_duration_seconds',
duration,
labels={'pod_id': self.pod_id, 'type': request_type}
)
def track_earnings(self, amount_usdc: float, task_type: str):
self.prometheus.increment_counter(
'pod_earnings_usdc_total',
amount_usdc,
labels={'pod_id': self.pod_id, 'task_type': task_type}
)
async def get_performance_report(self, time_range: str = "24h") -> PODPerformanceReport:
# Query Prometheus for metrics
queries = {
'total_requests': f'sum(increase(pod_requests_total{{pod_id="{self.pod_id}"}}[{time_range}]))',
'success_rate': f'sum(rate(pod_requests_total{{pod_id="{self.pod_id}",success="true"}}[{time_range}])) / sum(rate(pod_requests_total{{pod_id="{self.pod_id}"}}[{time_range}]))',
'avg_response_time': f'avg(pod_request_duration_seconds{{pod_id="{self.pod_id}"}})',
'total_earnings': f'sum(increase(pod_earnings_usdc_total{{pod_id="{self.pod_id}"}}[{time_range}]))'
}
results = {}
for metric, query in queries.items():
result = await self.prometheus.query(query)
results[metric] = float(result[0]['value'][1]) if result else 0
return PODPerformanceReport(
pod_id=self.pod_id,
time_range=time_range,
total_requests=int(results['total_requests']),
success_rate=results['success_rate'],
avg_response_time=results['avg_response_time'],
total_earnings=results['total_earnings']
)
2. Cost Optimization
class PODCostOptimizer:
def __init__(self, pod_id: str):
self.pod_id = pod_id
self.k8s_client = KubernetesClient()
async def optimize_resources(self) -> OptimizationResult:
# Get current resource usage
current_usage = await self._get_resource_usage()
# Analyze usage patterns
usage_analysis = await self._analyze_usage_patterns()
# Calculate optimal resource allocation
optimal_resources = self._calculate_optimal_resources(
current_usage, usage_analysis
)
# Apply optimizations
if optimal_resources.cpu_request != current_usage.cpu_request:
await self._update_cpu_request(optimal_resources.cpu_request)
if optimal_resources.memory_request != current_usage.memory_request:
await self._update_memory_request(optimal_resources.memory_request)
# Update HPA settings if needed
if optimal_resources.max_replicas != current_usage.max_replicas:
await self._update_hpa_max_replicas(optimal_resources.max_replicas)
return OptimizationResult(
cost_savings=self._calculate_cost_savings(current_usage, optimal_resources),
performance_impact=self._estimate_performance_impact(optimal_resources),
recommendations=optimal_resources.recommendations
)
Security & Compliance
1. POD Security Model
class PODSecurity:
def __init__(self, pod_id: str):
self.pod_id = pod_id
self.security_policies = self._load_security_policies()
def validate_request(self, request: PODRequest) -> SecurityValidation:
validations = []
# Rate limiting
if not self._check_rate_limit(request.sender_address):
validations.append(SecurityIssue("RATE_LIMIT_EXCEEDED", "Too many requests"))
# Input validation
if not self._validate_input(request.payload):
validations.append(SecurityIssue("INVALID_INPUT", "Malicious input detected"))
# Payment verification
if request.payment_signature:
if not self._verify_payment_signature(request):
validations.append(SecurityIssue("INVALID_PAYMENT", "Payment signature invalid"))
# Reputation check
sender_reputation = self._get_sender_reputation(request.sender_address)
if sender_reputation < self.security_policies.min_reputation:
validations.append(SecurityIssue("LOW_REPUTATION", "Sender reputation too low"))
return SecurityValidation(
is_valid=len(validations) == 0,
issues=validations
)
def _validate_input(self, payload: dict) -> bool:
# Check for common attack patterns
dangerous_patterns = [
r'<script.*?>.*?</script>', # XSS
r'union\s+select', # SQL injection
r'\.\./', # Path traversal
r'eval\s*\(', # Code injection
]
payload_str = json.dumps(payload).lower()
for pattern in dangerous_patterns:
if re.search(pattern, payload_str, re.IGNORECASE):
return False
return True
2. Compliance & Auditing
class PODCompliance:
def __init__(self, pod_id: str):
self.pod_id = pod_id
self.audit_logger = AuditLogger()
def log_transaction(self, transaction: PODTransaction):
audit_entry = {
'pod_id': self.pod_id,
'transaction_type': transaction.type,
'user_address': transaction.user_address,
'amount_usdc': transaction.amount,
'task_id': transaction.task_id,
'timestamp': datetime.utcnow().isoformat(),
'blockchain_tx_hash': transaction.tx_hash,
'compliance_flags': self._check_compliance_flags(transaction)
}
self.audit_logger.log(audit_entry)
def generate_compliance_report(self, start_date: datetime, end_date: datetime) -> ComplianceReport:
# Query audit logs
transactions = self.audit_logger.query_range(
pod_id=self.pod_id,
start_date=start_date,
end_date=end_date
)
# Analyze for compliance issues
compliance_issues = []
total_volume = 0
for tx in transactions:
total_volume += tx['amount_usdc']
# Check for suspicious patterns
if tx['amount_usdc'] > 10000: # Large transaction
compliance_issues.append(f"Large transaction: {tx['task_id']}")
if 'high_risk_country' in tx['compliance_flags']:
compliance_issues.append(f"High-risk jurisdiction: {tx['task_id']}")
return ComplianceReport(
pod_id=self.pod_id,
period=f"{start_date} to {end_date}",
total_transactions=len(transactions),
total_volume_usdc=total_volume,
compliance_issues=compliance_issues,
risk_score=self._calculate_risk_score(transactions)
)
The POD architecture represents the fundamental building block of the 0rca ecosystem, providing a secure, scalable, and sovereign environment for AI agents to operate on Cronos EVM. Each POD is a complete economic actor with its own identity, financial management, and operational capabilities, while seamlessly integrating with the broader 0rca network.