POD Architecture

Understanding the 0rca POD (Personal Object Datastore) Architecture on Cronos EVM.

0rca POD Architecture

What is a POD?

A POD (Personal Object Datastore) in the 0rca ecosystem represents a containerized, sovereign AI agent with its own dedicated infrastructure, identity, and financial management system on Cronos EVM. Each POD is a self-contained unit that can operate independently while participating in the broader 0rca network.

POD Components

1. Container Infrastructure

Each POD runs as a Kubernetes deployment with dedicated resources:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: pod-agent-123
  namespace: orca-pods
  labels:
    pod-type: "ai-agent"
    pod-id: "agent-123"
    owner: "0x..."
spec:
  replicas: 2
  selector:
    matchLabels:
      pod-id: "agent-123"
  template:
    metadata:
      labels:
        pod-id: "agent-123"
    spec:
      containers:
      - name: agent
        image: orca-agent:latest
        ports:
        - containerPort: 8000
        env:
        - name: POD_ID
          value: "agent-123"
        - name: CRONOS_VAULT_ADDRESS
          valueFrom:
            secretKeyRef:
              name: pod-secrets
              key: vault-address
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "2Gi"
            cpu: "1000m"

2. Sovereign Vault (Financial Layer)

Each POD has its own smart contract on Cronos EVM for financial operations:

contract PODVault {
    // POD Identity
    string public podId;
    address public owner;           // POD creator/developer
    address public agentWallet;    // POD's operational wallet
    
    // Financial State
    IERC20 public immutable usdc;
    uint256 public totalEarnings;
    uint256 public availableBalance;
    uint256 public totalTasks;
    
    // Task Management
    mapping(bytes32 => Task) public tasks;
    mapping(address => bool) public authorizedOperators;
    
    struct Task {
        uint256 amount;
        address creator;
        bool completed;
        uint256 timestamp;
        string metadata;
    }
    
    // Events
    event TaskCreated(bytes32 indexed taskId, uint256 amount, address creator);
    event TaskCompleted(bytes32 indexed taskId, uint256 amount);
    event EarningsWithdrawn(uint256 amount, address recipient);
    
    constructor(
        string memory _podId,
        address _owner,
        address _agentWallet,
        address _usdc
    ) {
        podId = _podId;
        owner = _owner;
        agentWallet = _agentWallet;
        usdc = IERC20(_usdc);
    }
    
    function createTask(bytes32 taskId, uint256 amount, string memory metadata) 
        external {
        require(tasks[taskId].amount == 0, "Task already exists");
        require(usdc.transferFrom(msg.sender, address(this), amount), "Transfer failed");
        
        tasks[taskId] = Task({
            amount: amount,
            creator: msg.sender,
            completed: false,
            timestamp: block.timestamp,
            metadata: metadata
        });
        
        emit TaskCreated(taskId, amount, msg.sender);
    }
    
    function completeTask(bytes32 taskId) external {
        require(msg.sender == agentWallet || authorizedOperators[msg.sender], "Unauthorized");
        
        Task storage task = tasks[taskId];
        require(!task.completed, "Task already completed");
        require(task.amount > 0, "Task does not exist");
        
        task.completed = true;
        availableBalance += task.amount;
        totalEarnings += task.amount;
        totalTasks++;
        
        emit TaskCompleted(taskId, task.amount);
    }
    
    function withdraw(address recipient, uint256 amount) external {
        require(msg.sender == owner, "Only owner can withdraw");
        require(amount <= availableBalance, "Insufficient balance");
        
        availableBalance -= amount;
        require(usdc.transfer(recipient, amount), "Transfer failed");
        
        emit EarningsWithdrawn(amount, recipient);
    }
}

3. Identity & Authentication

Each POD has a unique cryptographic identity:

class PODIdentity:
    def __init__(self, pod_id: str):
        self.pod_id = pod_id
        self.private_key = self._load_or_create_key()
        self.public_address = Account.from_key(self.private_key).address
        self.vault_address = self._deploy_or_load_vault()
    
    def _load_or_create_key(self) -> str:
        key_file = f"/secrets/{self.pod_id}_identity.json"
        if os.path.exists(key_file):
            with open(key_file, 'r') as f:
                encrypted_key = json.load(f)
                return Account.decrypt(encrypted_key, os.getenv('POD_PASSWORD'))
        else:
            # Create new identity
            account = Account.create()
            encrypted_key = account.encrypt(os.getenv('POD_PASSWORD'))
            with open(key_file, 'w') as f:
                json.dump(encrypted_key, f)
            return account.privateKey.hex()
    
    def sign_transaction(self, transaction: dict) -> str:
        signed_txn = Account.sign_transaction(transaction, self.private_key)
        return signed_txn.rawTransaction.hex()
    
    def verify_ownership(self) -> bool:
        vault_contract = self.web3.eth.contract(
            address=self.vault_address, 
            abi=POD_VAULT_ABI
        )
        return vault_contract.functions.agentWallet().call() == self.public_address

4. Data Storage & State Management

PODs maintain their own persistent storage:

class PODDatastore:
    def __init__(self, pod_id: str):
        self.pod_id = pod_id
        self.db_path = f"/data/{pod_id}.db"
        self.redis_key_prefix = f"pod:{pod_id}"
        self._init_database()
    
    def _init_database(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS conversations (
                id INTEGER PRIMARY KEY,
                user_address TEXT,
                prompt TEXT,
                response TEXT,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP,
                task_id TEXT,
                cost_usdc REAL
            )
        """)
        
        conn.execute("""
            CREATE TABLE IF NOT EXISTS pod_metrics (
                id INTEGER PRIMARY KEY,
                metric_name TEXT,
                metric_value REAL,
                timestamp DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        """)
        conn.commit()
        conn.close()
    
    def store_conversation(self, user_address: str, prompt: str, 
                          response: str, task_id: str, cost: float):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            INSERT INTO conversations (user_address, prompt, response, task_id, cost_usdc)
            VALUES (?, ?, ?, ?, ?)
        """, (user_address, prompt, response, task_id, cost))
        conn.commit()
        conn.close()
    
    def get_conversation_history(self, user_address: str, limit: int = 10) -> List[dict]:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.execute("""
            SELECT prompt, response, timestamp, cost_usdc 
            FROM conversations 
            WHERE user_address = ? 
            ORDER BY timestamp DESC 
            LIMIT ?
        """, (user_address, limit))
        
        results = [
            {
                "prompt": row[0],
                "response": row[1], 
                "timestamp": row[2],
                "cost": row[3]
            }
            for row in cursor.fetchall()
        ]
        conn.close()
        return results

POD Lifecycle Management

1. POD Creation & Deployment

class PODManager:
    def __init__(self, kubernetes_client, cronos_web3):
        self.k8s = kubernetes_client
        self.web3 = cronos_web3
    
    async def create_pod(self, pod_config: PODConfig) -> PODDeployment:
        # 1. Deploy Sovereign Vault on Cronos
        vault_address = await self._deploy_vault(pod_config)
        
        # 2. Create Kubernetes resources
        deployment = await self._create_k8s_deployment(pod_config, vault_address)
        
        # 3. Setup networking and ingress
        service = await self._create_service(pod_config)
        ingress = await self._create_ingress(pod_config)
        
        # 4. Initialize POD identity and secrets
        identity = await self._setup_pod_identity(pod_config)
        
        # 5. Register in 0rca network
        await self._register_pod(pod_config, vault_address, identity.public_address)
        
        return PODDeployment(
            pod_id=pod_config.pod_id,
            vault_address=vault_address,
            endpoint=f"https://{pod_config.subdomain}.0rca.live",
            identity_address=identity.public_address
        )
    
    async def _deploy_vault(self, config: PODConfig) -> str:
        vault_factory = self.web3.eth.contract(
            address=VAULT_FACTORY_ADDRESS,
            abi=VAULT_FACTORY_ABI
        )
        
        tx = vault_factory.functions.createPODVault(
            config.pod_id,
            config.owner_address,
            config.agent_wallet_address,
            USDC_ADDRESS
        ).buildTransaction({
            'from': config.owner_address,
            'gas': 2000000,
            'gasPrice': self.web3.toWei('20', 'gwei')
        })
        
        # Use CroGas for gasless deployment
        signed_tx = self.web3.eth.account.sign_transaction(tx, config.private_key)
        tx_hash = await self.crogas.relay_transaction(signed_tx)
        
        receipt = self.web3.eth.wait_for_transaction_receipt(tx_hash)
        vault_address = receipt.logs[0].address
        
        return vault_address

2. POD Auto-Scaling

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: pod-agent-123-hpa
  namespace: orca-pods
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: pod-agent-123
  minReplicas: 1
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Pods
    pods:
      metric:
        name: active_requests
      target:
        type: AverageValue
        averageValue: "5"
  behavior:
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 15
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 10
        periodSeconds: 60

3. POD Health Monitoring

class PODHealthMonitor:
    def __init__(self, pod_id: str):
        self.pod_id = pod_id
        self.metrics_collector = MetricsCollector()
        self.alert_manager = AlertManager()
    
    async def health_check(self) -> PODHealthStatus:
        # Check container health
        container_health = await self._check_container_health()
        
        # Check vault balance and transactions
        vault_health = await self._check_vault_health()
        
        # Check AI model availability
        ai_health = await self._check_ai_health()
        
        # Check network connectivity
        network_health = await self._check_network_health()
        
        overall_status = self._calculate_overall_health([
            container_health, vault_health, ai_health, network_health
        ])
        
        if overall_status.status != "healthy":
            await self.alert_manager.send_alert(
                f"POD {self.pod_id} health issue: {overall_status.message}"
            )
        
        return overall_status
    
    async def _check_vault_health(self) -> HealthStatus:
        try:
            vault = self.web3.eth.contract(
                address=self.vault_address,
                abi=POD_VAULT_ABI
            )
            
            # Check if vault is responsive
            balance = vault.functions.availableBalance().call()
            total_tasks = vault.functions.totalTasks().call()
            
            # Check recent activity
            recent_blocks = 100
            events = vault.events.TaskCompleted.createFilter(
                fromBlock=self.web3.eth.block_number - recent_blocks
            ).get_all_entries()
            
            return HealthStatus(
                status="healthy",
                message=f"Vault active: {balance} USDC, {total_tasks} tasks, {len(events)} recent completions"
            )
            
        except Exception as e:
            return HealthStatus(
                status="unhealthy",
                message=f"Vault check failed: {str(e)}"
            )

POD Communication & Discovery

1. POD Registry

class PODRegistry:
    def __init__(self, supabase_client):
        self.supabase = supabase_client
    
    async def register_pod(self, pod_info: PODInfo):
        result = self.supabase.table('pods').insert({
            'pod_id': pod_info.pod_id,
            'name': pod_info.name,
            'description': pod_info.description,
            'owner_address': pod_info.owner_address,
            'vault_address': pod_info.vault_address,
            'endpoint': pod_info.endpoint,
            'capabilities': pod_info.capabilities,
            'price_per_task': pod_info.price_per_task,
            'category': pod_info.category,
            'status': 'active',
            'created_at': datetime.utcnow().isoformat()
        }).execute()
        
        return result.data[0]['id']
    
    async def discover_pods(self, query: PODQuery) -> List[PODInfo]:
        query_builder = self.supabase.table('pods').select('*')
        
        if query.category:
            query_builder = query_builder.eq('category', query.category)
        
        if query.max_price:
            query_builder = query_builder.lte('price_per_task', query.max_price)
        
        if query.capabilities:
            for capability in query.capabilities:
                query_builder = query_builder.contains('capabilities', [capability])
        
        result = query_builder.eq('status', 'active').execute()
        
        return [PODInfo.from_dict(pod) for pod in result.data]

2. Inter-POD Communication

class InterPODCommunication:
    def __init__(self, pod_id: str):
        self.pod_id = pod_id
        self.identity = PODIdentity(pod_id)
    
    async def send_message(self, target_pod_id: str, message: PODMessage) -> PODResponse:
        # Discover target POD endpoint
        target_pod = await self.registry.get_pod_info(target_pod_id)
        
        # Sign message with POD identity
        signed_message = self.identity.sign_message(message.to_dict())
        
        # Send HTTP request to target POD
        async with aiohttp.ClientSession() as session:
            async with session.post(
                f"{target_pod.endpoint}/pod/message",
                json={
                    'message': message.to_dict(),
                    'signature': signed_message,
                    'sender_pod_id': self.pod_id
                },
                headers={'Content-Type': 'application/json'}
            ) as response:
                if response.status == 200:
                    return PODResponse.from_dict(await response.json())
                else:
                    raise PODCommunicationError(f"Failed to send message: {response.status}")
    
    async def handle_incoming_message(self, message_data: dict) -> PODResponse:
        # Verify sender signature
        sender_pod_id = message_data['sender_pod_id']
        sender_pod = await self.registry.get_pod_info(sender_pod_id)
        
        if not self._verify_signature(message_data, sender_pod.identity_address):
            raise PODSecurityError("Invalid message signature")
        
        # Process message based on type
        message = PODMessage.from_dict(message_data['message'])
        
        if message.type == "task_request":
            return await self._handle_task_request(message)
        elif message.type == "capability_query":
            return await self._handle_capability_query(message)
        else:
            raise PODError(f"Unknown message type: {message.type}")

POD Performance & Analytics

1. Performance Metrics

class PODAnalytics:
    def __init__(self, pod_id: str):
        self.pod_id = pod_id
        self.prometheus = PrometheusClient()
    
    def track_request(self, request_type: str, duration: float, success: bool):
        # Prometheus metrics
        self.prometheus.increment_counter(
            'pod_requests_total',
            labels={'pod_id': self.pod_id, 'type': request_type, 'success': str(success)}
        )
        
        self.prometheus.observe_histogram(
            'pod_request_duration_seconds',
            duration,
            labels={'pod_id': self.pod_id, 'type': request_type}
        )
    
    def track_earnings(self, amount_usdc: float, task_type: str):
        self.prometheus.increment_counter(
            'pod_earnings_usdc_total',
            amount_usdc,
            labels={'pod_id': self.pod_id, 'task_type': task_type}
        )
    
    async def get_performance_report(self, time_range: str = "24h") -> PODPerformanceReport:
        # Query Prometheus for metrics
        queries = {
            'total_requests': f'sum(increase(pod_requests_total{{pod_id="{self.pod_id}"}}[{time_range}]))',
            'success_rate': f'sum(rate(pod_requests_total{{pod_id="{self.pod_id}",success="true"}}[{time_range}])) / sum(rate(pod_requests_total{{pod_id="{self.pod_id}"}}[{time_range}]))',
            'avg_response_time': f'avg(pod_request_duration_seconds{{pod_id="{self.pod_id}"}})',
            'total_earnings': f'sum(increase(pod_earnings_usdc_total{{pod_id="{self.pod_id}"}}[{time_range}]))'
        }
        
        results = {}
        for metric, query in queries.items():
            result = await self.prometheus.query(query)
            results[metric] = float(result[0]['value'][1]) if result else 0
        
        return PODPerformanceReport(
            pod_id=self.pod_id,
            time_range=time_range,
            total_requests=int(results['total_requests']),
            success_rate=results['success_rate'],
            avg_response_time=results['avg_response_time'],
            total_earnings=results['total_earnings']
        )

2. Cost Optimization

class PODCostOptimizer:
    def __init__(self, pod_id: str):
        self.pod_id = pod_id
        self.k8s_client = KubernetesClient()
    
    async def optimize_resources(self) -> OptimizationResult:
        # Get current resource usage
        current_usage = await self._get_resource_usage()
        
        # Analyze usage patterns
        usage_analysis = await self._analyze_usage_patterns()
        
        # Calculate optimal resource allocation
        optimal_resources = self._calculate_optimal_resources(
            current_usage, usage_analysis
        )
        
        # Apply optimizations
        if optimal_resources.cpu_request != current_usage.cpu_request:
            await self._update_cpu_request(optimal_resources.cpu_request)
        
        if optimal_resources.memory_request != current_usage.memory_request:
            await self._update_memory_request(optimal_resources.memory_request)
        
        # Update HPA settings if needed
        if optimal_resources.max_replicas != current_usage.max_replicas:
            await self._update_hpa_max_replicas(optimal_resources.max_replicas)
        
        return OptimizationResult(
            cost_savings=self._calculate_cost_savings(current_usage, optimal_resources),
            performance_impact=self._estimate_performance_impact(optimal_resources),
            recommendations=optimal_resources.recommendations
        )

Security & Compliance

1. POD Security Model

class PODSecurity:
    def __init__(self, pod_id: str):
        self.pod_id = pod_id
        self.security_policies = self._load_security_policies()
    
    def validate_request(self, request: PODRequest) -> SecurityValidation:
        validations = []
        
        # Rate limiting
        if not self._check_rate_limit(request.sender_address):
            validations.append(SecurityIssue("RATE_LIMIT_EXCEEDED", "Too many requests"))
        
        # Input validation
        if not self._validate_input(request.payload):
            validations.append(SecurityIssue("INVALID_INPUT", "Malicious input detected"))
        
        # Payment verification
        if request.payment_signature:
            if not self._verify_payment_signature(request):
                validations.append(SecurityIssue("INVALID_PAYMENT", "Payment signature invalid"))
        
        # Reputation check
        sender_reputation = self._get_sender_reputation(request.sender_address)
        if sender_reputation < self.security_policies.min_reputation:
            validations.append(SecurityIssue("LOW_REPUTATION", "Sender reputation too low"))
        
        return SecurityValidation(
            is_valid=len(validations) == 0,
            issues=validations
        )
    
    def _validate_input(self, payload: dict) -> bool:
        # Check for common attack patterns
        dangerous_patterns = [
            r'<script.*?>.*?</script>',  # XSS
            r'union\s+select',           # SQL injection
            r'\.\./',                    # Path traversal
            r'eval\s*\(',               # Code injection
        ]
        
        payload_str = json.dumps(payload).lower()
        
        for pattern in dangerous_patterns:
            if re.search(pattern, payload_str, re.IGNORECASE):
                return False
        
        return True

2. Compliance & Auditing

class PODCompliance:
    def __init__(self, pod_id: str):
        self.pod_id = pod_id
        self.audit_logger = AuditLogger()
    
    def log_transaction(self, transaction: PODTransaction):
        audit_entry = {
            'pod_id': self.pod_id,
            'transaction_type': transaction.type,
            'user_address': transaction.user_address,
            'amount_usdc': transaction.amount,
            'task_id': transaction.task_id,
            'timestamp': datetime.utcnow().isoformat(),
            'blockchain_tx_hash': transaction.tx_hash,
            'compliance_flags': self._check_compliance_flags(transaction)
        }
        
        self.audit_logger.log(audit_entry)
    
    def generate_compliance_report(self, start_date: datetime, end_date: datetime) -> ComplianceReport:
        # Query audit logs
        transactions = self.audit_logger.query_range(
            pod_id=self.pod_id,
            start_date=start_date,
            end_date=end_date
        )
        
        # Analyze for compliance issues
        compliance_issues = []
        total_volume = 0
        
        for tx in transactions:
            total_volume += tx['amount_usdc']
            
            # Check for suspicious patterns
            if tx['amount_usdc'] > 10000:  # Large transaction
                compliance_issues.append(f"Large transaction: {tx['task_id']}")
            
            if 'high_risk_country' in tx['compliance_flags']:
                compliance_issues.append(f"High-risk jurisdiction: {tx['task_id']}")
        
        return ComplianceReport(
            pod_id=self.pod_id,
            period=f"{start_date} to {end_date}",
            total_transactions=len(transactions),
            total_volume_usdc=total_volume,
            compliance_issues=compliance_issues,
            risk_score=self._calculate_risk_score(transactions)
        )

The POD architecture represents the fundamental building block of the 0rca ecosystem, providing a secure, scalable, and sovereign environment for AI agents to operate on Cronos EVM. Each POD is a complete economic actor with its own identity, financial management, and operational capabilities, while seamlessly integrating with the broader 0rca network.