A Coding Guide to Building a Scalable Multi-Agent Communication Systems Using Agent Communication Protocol (ACP)

In this tutorial, we implement the Agent Communication Protocol (ACP) through building a flexible, ACP-compliant messaging system in Python, leveraging Google’s Gemini API for natural language processing. Beginning with the installation and configuration of the google-generativeai library, the tutorial introduces core abstractions, message types, performatives, and the ACPMessage data class, which standardizes inter-agent communication. By […] The post A Coding Guide to Building a Scalable Multi-Agent Communication Systems Using Agent Communication Protocol (ACP) appeared first on MarkTechPost.

May 31, 2025 - 11:00
 0
A Coding Guide to Building a Scalable Multi-Agent Communication Systems Using Agent Communication Protocol (ACP)

In this tutorial, we implement the Agent Communication Protocol (ACP) through building a flexible, ACP-compliant messaging system in Python, leveraging Google’s Gemini API for natural language processing. Beginning with the installation and configuration of the google-generativeai library, the tutorial introduces core abstractions, message types, performatives, and the ACPMessage data class, which standardizes inter-agent communication. By defining ACPAgent and ACPMessageBroker classes, the guide demonstrates how to create, send, route, and process structured messages among multiple autonomous agents. Through clear code examples, users learn to implement querying, requesting actions, and broadcasting information, while maintaining conversation threads, acknowledgments, and error handling.

import google.generativeai as genai
import json
import time
import uuid
from enum import Enum
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, asdict


GEMINI_API_KEY = "Use Your Gemini API Key"
genai.configure(api_key=GEMINI_API_KEY)

We import essential Python modules, ranging from JSON handling and timing to unique identifier generation and type annotations, to support a structured ACP implementation. It then retrieves the user’s Gemini API key placeholder and configures the google-generativeai client for subsequent calls to the Gemini language model.

class ACPMessageType(Enum):
    """Standard ACP message types"""
    REQUEST = "request"
    RESPONSE = "response"
    INFORM = "inform"
    QUERY = "query"
    SUBSCRIBE = "subscribe"
    UNSUBSCRIBE = "unsubscribe"
    ERROR = "error"
    ACK = "acknowledge"

The ACPMessageType enumeration defines the core message categories used in the Agent Communication Protocol, including requests, responses, informational broadcasts, queries, and control actions like subscription management, error signaling, and acknowledgments. By centralizing these message types, the protocol ensures consistent handling and routing of inter-agent communications throughout the system.

class ACPPerformative(Enum):
    """ACP speech acts (performatives)"""
    TELL = "tell"
    ASK = "ask"
    REPLY = "reply"
    REQUEST_ACTION = "request-action"
    AGREE = "agree"
    REFUSE = "refuse"
    PROPOSE = "propose"
    ACCEPT = "accept"
    REJECT = "reject"

The ACPPerformative enumeration captures the variety of speech acts agents can use when interacting under the ACP framework, mapping high-level intentions, such as making requests, posing questions, giving commands, or negotiating agreements, onto standardized labels. This clear taxonomy enables agents to interpret and respond to messages in contextually appropriate ways, ensuring robust and semantically rich communication.

@dataclass
class ACPMessage:
    """Agent Communication Protocol Message Structure"""
    message_id: str
    sender: str
    receiver: str
    performative: str  
    content: Dict[str, Any]
    protocol: str = "ACP-1.0"
    conversation_id: str = None
    reply_to: str = None
    language: str = "english"
    encoding: str = "json"
    timestamp: float = None
   
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = time.time()
        if self.conversation_id is None:
            self.conversation_id = str(uuid.uuid4())
   
    def to_acp_format(self) -> str:
        """Convert to standard ACP message format"""
        acp_msg = {
            "message-id": self.message_id,
            "sender": self.sender,
            "receiver": self.receiver,
            "performative": self.performative,
            "content": self.content,
            "protocol": self.protocol,
            "conversation-id": self.conversation_id,
            "reply-to": self.reply_to,
            "language": self.language,
            "encoding": self.encoding,
            "timestamp": self.timestamp
        }
        return json.dumps(acp_msg, indent=2)
   
    @classmethod
    def from_acp_format(cls, acp_string: str) -> 'ACPMessage':
        """Parse ACP message from string format"""
        data = json.loads(acp_string)
        return cls(
            message_id=data["message-id"],
            sender=data["sender"],
            receiver=data["receiver"],
            performative=data["performative"],
            content=data["content"],
            protocol=data.get("protocol", "ACP-1.0"),
            conversation_id=data.get("conversation-id"),
            reply_to=data.get("reply-to"),
            language=data.get("language", "english"),
            encoding=data.get("encoding", "json"),
            timestamp=data.get("timestamp", time.time())
        )

The ACPMessage data class encapsulates all the fields required for a structured ACP exchange, including identifiers, participants, performative, payload, and metadata such as protocol version, language, and timestamps. Its __post_init__ method auto-populates missing timestamp and conversation_id values, ensuring every message is uniquely tracked. Utility methods to_acp_format and from_acp_format handle serialization to and from the standardized JSON representation for seamless transmission and parsing.

class ACPAgent:
    """Agent implementing Agent Communication Protocol"""
   
    def __init__(self, agent_id: str, name: str, capabilities: List[str]):
        self.agent_id = agent_id
        self.name = name
        self.capabilities = capabilities
        self.model = genai.GenerativeModel("gemini-1.5-flash")
        self.message_queue: List[ACPMessage] = []
        self.subscriptions: Dict[str, List[str]] = {}  
        self.conversations: Dict[str, List[ACPMessage]] = {}
   
    def create_message(self, receiver: str, performative: str,
                      content: Dict[str, Any], conversation_id: str = None,
                      reply_to: str = None) -> ACPMessage:
        """Create a new ACP-compliant message"""
        return ACPMessage(
            message_id=str(uuid.uuid4()),
            sender=self.agent_id,
            receiver=receiver,
            performative=performative,
            content=content,
            conversation_id=conversation_id,
            reply_to=reply_to
        )
   
    def send_inform(self, receiver: str, fact: str, data: Any = None) -> ACPMessage:
        """Send an INFORM message (telling someone a fact)"""
        content = {"fact": fact, "data": data}
        return self.create_message(receiver, ACPPerformative.TELL.value, content)
   
    def send_query(self, receiver: str, question: str, query_type: str = "yes-no") -> ACPMessage:
        """Send a QUERY message (asking for information)"""
        content = {"question": question, "query-type": query_type}
        return self.create_message(receiver, ACPPerformative.ASK.value, content)
   
    def send_request(self, receiver: str, action: str, parameters: Dict = None) -> ACPMessage:
        """Send a REQUEST message (asking someone to perform an action)"""
        content = {"action": action, "parameters": parameters or {}}
        return self.create_message(receiver, ACPPerformative.REQUEST_ACTION.value, content)
   
    def send_reply(self, original_msg: ACPMessage, response_data: Any) -> ACPMessage:
        """Send a REPLY message in response to another message"""
        content = {"response": response_data, "original-question": original_msg.content}
        return self.create_message(
            original_msg.sender,
            ACPPerformative.REPLY.value,
            content,
            conversation_id=original_msg.conversation_id,
            reply_to=original_msg.message_id
        )
   
    def process_message(self, message: ACPMessage) -> Optional[ACPMessage]:
        """Process incoming ACP message and generate appropriate response"""
        self.message_queue.append(message)
       
        conv_id = message.conversation_id
        if conv_id not in self.conversations:
            self.conversations[conv_id] = []
        self.conversations[conv_id].append(message)
       
        if message.performative == ACPPerformative.ASK.value:
            return self._handle_query(message)
        elif message.performative == ACPPerformative.REQUEST_ACTION.value:
            return self._handle_request(message)
        elif message.performative == ACPPerformative.TELL.value:
            return self._handle_inform(message)
       
        return None
   
    def _handle_query(self, message: ACPMessage) -> ACPMessage:
        """Handle incoming query messages"""
        question = message.content.get("question", "")
       
        prompt = f"As agent {self.name} with capabilities {self.capabilities}, answer: {question}"
        try:
            response = self.model.generate_content(prompt)
            answer = response.text.strip()
        except:
            answer = "Unable to process query at this time"
       
        return self.send_reply(message, {"answer": answer, "confidence": 0.8})
   
    def _handle_request(self, message: ACPMessage) -> ACPMessage:
        """Handle incoming action requests"""
        action = message.content.get("action", "")
        parameters = message.content.get("parameters", {})
       
        if any(capability in action.lower() for capability in self.capabilities):
            result = f"Executing {action} with parameters {parameters}"
            status = "agreed"
        else:
            result = f"Cannot perform {action} - not in my capabilities"
            status = "refused"
       
        return self.send_reply(message, {"status": status, "result": result})
   
    def _handle_inform(self, message: ACPMessage) -> Optional[ACPMessage]:
        """Handle incoming information messages"""
        fact = message.content.get("fact", "")
        print(f"[{self.name}] Received information: {fact}")
       
        ack_content = {"status": "received", "fact": fact}
        return self.create_message(message.sender, "acknowledge", ack_content,
                                 conversation_id=message.conversation_id)

The ACPAgent class encapsulates an autonomous entity capable of sending, receiving, and processing ACP-compliant messages using Gemini’s language model. It manages its own message queue, conversation history, and subscriptions, and provides helper methods (send_inform, send_query, send_request, send_reply) to construct correctly formatted ACPMessage instances. Incoming messages are routed through process_message, which delegates to specialized handlers for queries, action requests, and informational messages.

class ACPMessageBroker:
    """Message broker implementing ACP routing and delivery"""
   
    def __init__(self):
        self.agents: Dict[str, ACPAgent] = {}
        self.message_log: List[ACPMessage] = []
        self.routing_table: Dict[str, str] = {}  
   
    def register_agent(self, agent: ACPAgent):
        """Register an agent with the message broker"""
        self.agents[agent.agent_id] = agent
        self.routing_table[agent.agent_id] = "local"
        print(f"✓ Registered agent: {agent.name} ({agent.agent_id})")
   
    def route_message(self, message: ACPMessage) -> bool:
        """Route ACP message to appropriate recipient"""
        if message.receiver not in self.agents:
            print(f"✗ Receiver {message.receiver} not found")
            return False
       
        print(f"\n                        </div>
                                            <div class=
                            
                                Read More