Git Product home page Git Product logo

Comments (7)

dosubot avatar dosubot commented on July 17, 2024 3

To generate Langfuse subspans within your custom AsyncBaseTool class, you need to integrate the LlamaIndexCallbackHandler into the call and acall methods. Here is an example of how you can modify your QueryPipelineTool class to include the necessary logic for creating and managing Langfuse subspans:

from langfuse.llama_index import LlamaIndexCallbackHandler
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager, CBEventType, EventPayload
from llama_index.core.tools import AsyncBaseTool, ToolOutput, ToolMetadata
from typing import Any

langfuse_callback_handler = LlamaIndexCallbackHandler()
Settings.callback_manager = CallbackManager([langfuse_callback_handler])

class QueryPipelineTool(AsyncBaseTool):
    pipeline: QueryPipeline

    def call(self, input: Any) -> ToolOutput:
        with Settings.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
        ) as event:
            pipeline_result = self.pipeline.run(query_str=input)
            event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})
        
        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    async def acall(self, input: Any) -> ToolOutput:
        with Settings.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
        ) as event:
            pipeline_result = await self.pipeline.arun(query_str=input)
            event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})
        
        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    @property
    def metadata(self) -> ToolMetadata:
        return ToolMetadata(
            name="query_pipeline_tool",
            description="tool description",
        )

    @metadata.setter
    def metadata(self, value: ToolMetadata):
        self._custom_metadata = value

    def __init__(self, pipeline: QueryPipeline):
        self.pipeline = pipeline
        self._custom_metadata = None

tool = QueryPipelineTool(get_pipeline())  # this is a normal QueryPipeline from llamaindex
agent = OpenAIAgent.from_tools(
    tools=[tool],
    llm=OpenAI(model='gpt-4o', temperature=0),
)

agent.stream_chat(query)

In this example, the call and acall methods are wrapped with the Settings.callback_manager.event context manager, which creates and manages the Langfuse subspans. The payload parameter includes the input and output data for the function call, ensuring that the subspans are correctly generated and recorded.

To continue talking to Dosu, mention @dosu.

from llama_index.

dosubot avatar dosubot commented on July 17, 2024 3

To ensure that the query pipeline spans are also generated and visible in Langfuse, you need to wrap the relevant parts of the query pipeline execution within the call and acall methods with additional event spans. Here is an updated example that includes spans for both the function call and the query pipeline:

from langfuse.llama_index import LlamaIndexCallbackHandler
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager, CBEventType, EventPayload
from llama_index.core.tools import AsyncBaseTool, ToolOutput, ToolMetadata
from typing import Any

langfuse_callback_handler = LlamaIndexCallbackHandler()
Settings.callback_manager = CallbackManager([langfuse_callback_handler])

class QueryPipelineTool(AsyncBaseTool):
    pipeline: QueryPipeline

    def call(self, input: Any) -> ToolOutput:
        with Settings.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
        ) as function_event:
            with Settings.callback_manager.event(
                CBEventType.QUERY,
                payload={EventPayload.QUERY: {"query_str": input}},
            ) as query_event:
                pipeline_result = self.pipeline.run(query_str=input)
                query_event.on_end(payload={EventPayload.QUERY_RESULT: str(pipeline_result)})
            function_event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})
        
        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    async def acall(self, input: Any) -> ToolOutput:
        with Settings.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
        ) as function_event:
            with Settings.callback_manager.event(
                CBEventType.QUERY,
                payload={EventPayload.QUERY: {"query_str": input}},
            ) as query_event:
                pipeline_result = await self.pipeline.arun(query_str=input)
                query_event.on_end(payload={EventPayload.QUERY_RESULT: str(pipeline_result)})
            function_event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})
        
        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    @property
    def metadata(self) -> ToolMetadata:
        return ToolMetadata(
            name="query_pipeline_tool",
            description="tool description",
        )

    @metadata.setter
    def metadata(self, value: ToolMetadata):
        self._custom_metadata = value

    def __init__(self, pipeline: QueryPipeline):
        self.pipeline = pipeline
        self._custom_metadata = None

tool = QueryPipelineTool(get_pipeline())  # this is a normal QueryPipeline from llamaindex
agent = OpenAIAgent.from_tools(
    tools=[tool],
    llm=OpenAI(model='gpt-4o', temperature=0),
)

agent.stream_chat(query)

In this example, the call and acall methods now include nested spans for both the function call and the query pipeline. This should ensure that the query pipeline spans are generated and visible in Langfuse alongside the function call span.

To continue talking to Dosu, mention @dosu.

from llama_index.

dosubot avatar dosubot commented on July 17, 2024 2

Hello, @ootkin! I'm here to help you with your bug and answer any questions you may have. I can also assist you in becoming a contributor while we wait for a human maintainer.

The function_call does not generate Langfuse subspans for a custom AsyncBaseTool class because the FunctionTool class lacks the logic to create or manage Langfuse subspans. The call and acall methods in the FunctionTool class do not include any references to Langfuse or subspan creation.

Here is the relevant part of the code showing the absence of Langfuse subspan logic:

def call(self, *args: Any, **kwargs: Any) -> ToolOutput:
    """Call."""
    tool_output = self._fn(*args, **kwargs)
    return ToolOutput(
        content=str(tool_output),
        tool_name=self.metadata.name,
        raw_input={"args": args, "kwargs": kwargs},
        raw_output=tool_output,
    )

async def acall(self, *args: Any, **kwargs: Any) -> ToolOutput:
    """Call."""
    tool_output = await self._async_fn(*args, **kwargs)
    return ToolOutput(
        content=str(tool_output),
        tool_name=self.metadata.name,
        raw_input={"args": args, "kwargs": kwargs},
        raw_output=tool_output,
    )

To generate Langfuse subspans, you need to add the appropriate logic to create and manage these subspans within the call and acall methods [1].

To continue talking to Dosu, mention @dosu.

from llama_index.

dosubot avatar dosubot commented on July 17, 2024 2

To correct the implementation and improve visibility over your pipeline and the average costs of your requests, you should ensure that the CallbackManager is properly utilized to log and trace the events. The provided code already includes some callback management, but it might be missing detailed logging for each step in the pipeline.

Here is a revised version of the QueryPipelineTool class with enhanced logging and tracing:

from langfuse.llama_index import LlamaIndexCallbackHandler
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager, CBEventType, EventPayload
from llama_index.core.tools import AsyncBaseTool, ToolOutput, ToolMetadata
from typing import Any

langfuse_callback_handler = LlamaIndexCallbackHandler()
Settings.callback_manager = CallbackManager([langfuse_callback_handler])

class QueryPipelineTool(AsyncBaseTool):
    pipeline: QueryPipeline

    def call(self, input: Any) -> ToolOutput:
        with Settings.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
        ) as function_event:
            pipeline_result = self._run_pipeline(input)
            function_event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})
        
        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    async def acall(self, input: Any) -> ToolOutput:
        with Settings.callback_manager.event(
            CBEventType.FUNCTION_CALL,
            payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
        ) as function_event:
            pipeline_result = await self._arun_pipeline(input)
            function_event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})
        
        return ToolOutput(
            content=str(pipeline_result),
            tool_name="query_pipeline_tool",
            raw_input={"query_str": input},
            raw_output=pipeline_result
        )

    def _run_pipeline(self, input: Any) -> Any:
        with Settings.callback_manager.event(
            CBEventType.QUERY,
            payload={EventPayload.QUERY_STR: input},
        ) as query_event:
            pipeline_result = self.pipeline.run(query_str=input)
            query_event.on_end(payload={EventPayload.RESPONSE: str(pipeline_result)})
        return pipeline_result

    async def _arun_pipeline(self, input: Any) -> Any:
        with Settings.callback_manager.event(
            CBEventType.QUERY,
            payload={EventPayload.QUERY_STR: input},
        ) as query_event:
            pipeline_result = await self.pipeline.arun(query_str=input)
            query_event.on_end(payload={EventPayload.RESPONSE: str(pipeline_result)})
        return pipeline_result

    @property
    def metadata(self) -> ToolMetadata:
        return ToolMetadata(
            name="query_pipeline_tool",
            description="tool description",
        )

    @metadata.setter
    def metadata(self, value: ToolMetadata):
        self._custom_metadata = value

    def __init__(self, pipeline: QueryPipeline):
        self.pipeline = pipeline
        self._custom_metadata = None

tool = QueryPipelineTool(get_pipeline())  # this is a normal QueryPipeline from llamaindex
agent = OpenAIAgent.from_tools(
    tools=[tool],
    llm=OpenAI(model='gpt-4o', temperature=0),
)

agent.stream_chat(query)

In this example, the call and acall methods now include nested spans for both the function call and the query pipeline. The EventPayload.RESPONSE key is used to log the pipeline result, ensuring that the query pipeline spans are generated and visible in Langfuse alongside the function call span [1].

@logan-markewich, this one's a bit out of my wheelhouse, could you step in and provide your guidance?

To continue talking to Dosu, mention @dosu.

from llama_index.

ootkin avatar ootkin commented on July 17, 2024

@dosu could you please provide an example?

from llama_index.

ootkin avatar ootkin commented on July 17, 2024

@dosu in this whay I dont see the query pipeline spans, just the FUNCTION_CALL span

from llama_index.

ootkin avatar ootkin commented on July 17, 2024

@dosu EventPayload.QUERY_RESULT does not exists.

And the code that you provided does not work.

Anyone can help? We are loosing visibility over our pipline and the average costs of our requests.

from llama_index.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.