| from __future__ import annotations |
| from copy import deepcopy |
| from typing import Any, Dict |
|
|
| import hydra |
| from langchain.tools import BaseTool |
|
|
| from aiflows.base_flows import AtomicFlow |
| from aiflows.messages import FlowMessage |
|
|
|
|
| class LCToolFlow(AtomicFlow): |
| r""" A flow that runs a tool using langchain. For example, a tool could be to excute a query with the duckduckgo search engine. |
| |
| *Configuration Parameters*: |
| |
| - `name` (str): The name of the flow. Default: "search" |
| - `description` (str): A description of the flow. This description is used to generate the help message of the flow. |
| Default: "useful when you need to look for the answer online, especially for recent events." |
| - `keep_raw_response` (bool): If True, the raw response of the tool is kept. Default: False |
| - `clear_flow_namespase_on_run_end` (bool): If True, the flow namespace is cleared at the end of the run. Default: False |
| - `backend` (Dict[str, Any]): The configuration of the backend. Default: langchain.tools.DuckDuckGoSearchRun |
| - Other parameters are inherited from the default configuration of AtomicFlow (see AtomicFlow) |
| |
| *Input Interface*: |
| |
| - `query` (str): the query to run the tool on |
| |
| *Output Interface*: |
| |
| - `observation` (str): the observation returned by the tool |
| |
| :param backend: The backend of the flow. It is a tool that is run by the flow. (e.g. duckduckgo search engine) |
| :type backend: BaseTool |
| :param \**kwargs: Additional arguments to pass to the flow. See :class:`aiflows.base_flows.AtomicFlow` for more details. |
| """ |
| |
| REQUIRED_KEYS_CONFIG = ["backend"] |
|
|
| SUPPORTS_CACHING: bool = False |
|
|
| backend: BaseTool |
|
|
| def __init__(self, backend: BaseTool, **kwargs) -> None: |
| super().__init__(**kwargs) |
| self.backend = backend |
| |
| @classmethod |
| def _set_up_backend(cls, config: Dict[str, Any]) -> BaseTool: |
| """ This method sets up the backend of the flow. |
| |
| :param config: The configuration of the backend. |
| :type config: Dict[str, Any] |
| :return: The backend of the flow. |
| """ |
| if config["_target_"].startswith("."): |
| |
| |
| |
| cls_parent_module = ".".join(cls.__module__.split(".")[:-1]) |
| config["_target_"] = cls_parent_module + config["_target_"] |
| tool = hydra.utils.instantiate(config, _convert_="partial") |
|
|
| return tool |
|
|
| @classmethod |
| def instantiate_from_config(cls, config: Dict[str, Any]) -> LCToolFlow: |
| """ This method instantiates the flow from a configuration file |
| |
| :param config: The configuration of the flow. |
| :type config: Dict[str, Any] |
| :return: The instantiated flow. |
| :rtype: LCToolFlow |
| """ |
| flow_config = deepcopy(config) |
|
|
| kwargs = {"flow_config": flow_config} |
|
|
| |
| kwargs["backend"] = cls._set_up_backend(config["backend"]) |
|
|
| |
| return cls(**kwargs) |
|
|
| def run(self, input_message: FlowMessage): |
| """ This method runs the flow. It runs the backend on the input data. |
| |
| :param input_message: The input message of the flow. |
| :type input_message: FlowMessage |
| """ |
| input_data = input_message.data |
| observation = self.backend.run(tool_input=input_data) |
| |
| reply = self.package_output_message( |
| input_message=input_message, |
| response = {"observation": observation} |
| ) |
| |
| self.send_message(reply) |
|
|
|
|