Interfaces - Api Handler

La interacción entre los Listeners y la API de administración de Malon se hace de manera desacoplada a través de una interfaz REST

Clases abstractas

El componente principal a la hora de definir las acciones a tomar ante un mensaje de un agente es el Handler:

class Handler(ABC):
    @abstractmethod
    def handle_msg(self, msg: bytes):
        """
        En cada una de las requests que realiza el agente, se invoca este metodo.
        Es necesario en este lugar decidir el tipo de mensaje que se esta recibiendo
        para definir si es necesario consultar por tareas nuevas que deba realizar el agente
        o registrar el resultado
        """
        pass

Debido a que múltiples agentes pueden realizar conexiones en simultaneo con un listener, definimos un método abstracto para poder encapsular la logica que se encarga de discriminarlos.

class AuthenticatedHandler(ABC):
    @abstractmethod
    def handle_auth_msg(self, msg: bytes, client_id: str):
        """
        Posee la  misma logica correspondiente al manejo de mensajes, pero con la 
        funcionalidad adicional de poder identificar al agente que se encuentra
        interactuando con el listener
        """
        pass

Implementación de API Handler

Utilizaremos el siguiente API Handler para interactuar con la API Rest, la cual requiere identificar a que agente pertenece la interacción (client_id)

class ApiHandler(AuthenticatedHandler):
    def __init__(self, base_url: str):
        self._base_url = base_url
    
    def _report_agent(self, client_id: str):
        requests.post('{}/agents/{}/report'.format(self._base_url, client_id))

    def _handle_get_tasks_msg(self, client_id: str, _msg) -> bytes:
        response = requests.get('{}/agents/{}/tasks/unread/'.format(self._base_url, client_id))
        tasks = response.json() if response.ok else []
        return json.dumps({
            'task_list_msg': {
                'tasks': tasks
            }
        }).encode('utf-8')

    def _handle_task_results_msg(self, client_id: str, msg) -> bytes:
        for result in msg['results']:
            task_id = result['task_id']
            response = requests.post('{}/agents/{}/tasks/{}/result'.format(
                self._base_url,
                client_id,
                task_id
            ), json=result)
        return json.dumps({
            'status_msg': {
                'success': True
            }
        }).encode('utf-8')

    def handle_auth_msg(self, msg: bytes, client_id: str) -> bytes:
        self._report_agent(client_id)
        agent_msg = json.loads(msg.decode('utf-8'))
        if agent_msg.get('get_tasks_msg') is not None:
            return self._handle_get_tasks_msg(client_id, agent_msg['get_tasks_msg'])
        elif agent_msg.get('task_results_msg') is not None:
            return self._handle_task_results_msg(client_id, agent_msg['task_results_msg'])
        else:
            raise RuntimeError('Unexpected message type')

Para la instanciaciónde Listeners es recomendada la utilización de Handler de manera similar a como se muestra a continuación

class CreativeListener(Listener):
    def __init__(self, api_url: str, host: str, port: int):
        handler = ApiHandler(api_url)
        handler = NewHandler(handler)

Última actualización