Comments (7)
To generate Langfuse subspans within your custom AsyncBaseTool
class, you need to integrate the LlamaIndexCallbackHandler
into the call
and acall
methods. Here is an example of how you can modify your QueryPipelineTool
class to include the necessary logic for creating and managing Langfuse subspans:
from langfuse.llama_index import LlamaIndexCallbackHandler
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager, CBEventType, EventPayload
from llama_index.core.tools import AsyncBaseTool, ToolOutput, ToolMetadata
from typing import Any
langfuse_callback_handler = LlamaIndexCallbackHandler()
Settings.callback_manager = CallbackManager([langfuse_callback_handler])
class QueryPipelineTool(AsyncBaseTool):
pipeline: QueryPipeline
def call(self, input: Any) -> ToolOutput:
with Settings.callback_manager.event(
CBEventType.FUNCTION_CALL,
payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
) as event:
pipeline_result = self.pipeline.run(query_str=input)
event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})
return ToolOutput(
content=str(pipeline_result),
tool_name="query_pipeline_tool",
raw_input={"query_str": input},
raw_output=pipeline_result
)
async def acall(self, input: Any) -> ToolOutput:
with Settings.callback_manager.event(
CBEventType.FUNCTION_CALL,
payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
) as event:
pipeline_result = await self.pipeline.arun(query_str=input)
event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})
return ToolOutput(
content=str(pipeline_result),
tool_name="query_pipeline_tool",
raw_input={"query_str": input},
raw_output=pipeline_result
)
@property
def metadata(self) -> ToolMetadata:
return ToolMetadata(
name="query_pipeline_tool",
description="tool description",
)
@metadata.setter
def metadata(self, value: ToolMetadata):
self._custom_metadata = value
def __init__(self, pipeline: QueryPipeline):
self.pipeline = pipeline
self._custom_metadata = None
tool = QueryPipelineTool(get_pipeline()) # this is a normal QueryPipeline from llamaindex
agent = OpenAIAgent.from_tools(
tools=[tool],
llm=OpenAI(model='gpt-4o', temperature=0),
)
agent.stream_chat(query)
In this example, the call
and acall
methods are wrapped with the Settings.callback_manager.event
context manager, which creates and manages the Langfuse subspans. The payload
parameter includes the input and output data for the function call, ensuring that the subspans are correctly generated and recorded.
To continue talking to Dosu, mention @dosu.
from llama_index.
To ensure that the query pipeline spans are also generated and visible in Langfuse, you need to wrap the relevant parts of the query pipeline execution within the call
and acall
methods with additional event spans. Here is an updated example that includes spans for both the function call and the query pipeline:
from langfuse.llama_index import LlamaIndexCallbackHandler
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager, CBEventType, EventPayload
from llama_index.core.tools import AsyncBaseTool, ToolOutput, ToolMetadata
from typing import Any
langfuse_callback_handler = LlamaIndexCallbackHandler()
Settings.callback_manager = CallbackManager([langfuse_callback_handler])
class QueryPipelineTool(AsyncBaseTool):
pipeline: QueryPipeline
def call(self, input: Any) -> ToolOutput:
with Settings.callback_manager.event(
CBEventType.FUNCTION_CALL,
payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
) as function_event:
with Settings.callback_manager.event(
CBEventType.QUERY,
payload={EventPayload.QUERY: {"query_str": input}},
) as query_event:
pipeline_result = self.pipeline.run(query_str=input)
query_event.on_end(payload={EventPayload.QUERY_RESULT: str(pipeline_result)})
function_event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})
return ToolOutput(
content=str(pipeline_result),
tool_name="query_pipeline_tool",
raw_input={"query_str": input},
raw_output=pipeline_result
)
async def acall(self, input: Any) -> ToolOutput:
with Settings.callback_manager.event(
CBEventType.FUNCTION_CALL,
payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
) as function_event:
with Settings.callback_manager.event(
CBEventType.QUERY,
payload={EventPayload.QUERY: {"query_str": input}},
) as query_event:
pipeline_result = await self.pipeline.arun(query_str=input)
query_event.on_end(payload={EventPayload.QUERY_RESULT: str(pipeline_result)})
function_event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})
return ToolOutput(
content=str(pipeline_result),
tool_name="query_pipeline_tool",
raw_input={"query_str": input},
raw_output=pipeline_result
)
@property
def metadata(self) -> ToolMetadata:
return ToolMetadata(
name="query_pipeline_tool",
description="tool description",
)
@metadata.setter
def metadata(self, value: ToolMetadata):
self._custom_metadata = value
def __init__(self, pipeline: QueryPipeline):
self.pipeline = pipeline
self._custom_metadata = None
tool = QueryPipelineTool(get_pipeline()) # this is a normal QueryPipeline from llamaindex
agent = OpenAIAgent.from_tools(
tools=[tool],
llm=OpenAI(model='gpt-4o', temperature=0),
)
agent.stream_chat(query)
In this example, the call
and acall
methods now include nested spans for both the function call and the query pipeline. This should ensure that the query pipeline spans are generated and visible in Langfuse alongside the function call span.
To continue talking to Dosu, mention @dosu.
from llama_index.
Hello, @ootkin! I'm here to help you with your bug and answer any questions you may have. I can also assist you in becoming a contributor while we wait for a human maintainer.
The function_call
does not generate Langfuse subspans for a custom AsyncBaseTool
class because the FunctionTool
class lacks the logic to create or manage Langfuse subspans. The call
and acall
methods in the FunctionTool
class do not include any references to Langfuse or subspan creation.
Here is the relevant part of the code showing the absence of Langfuse subspan logic:
def call(self, *args: Any, **kwargs: Any) -> ToolOutput:
"""Call."""
tool_output = self._fn(*args, **kwargs)
return ToolOutput(
content=str(tool_output),
tool_name=self.metadata.name,
raw_input={"args": args, "kwargs": kwargs},
raw_output=tool_output,
)
async def acall(self, *args: Any, **kwargs: Any) -> ToolOutput:
"""Call."""
tool_output = await self._async_fn(*args, **kwargs)
return ToolOutput(
content=str(tool_output),
tool_name=self.metadata.name,
raw_input={"args": args, "kwargs": kwargs},
raw_output=tool_output,
)
To generate Langfuse subspans, you need to add the appropriate logic to create and manage these subspans within the call
and acall
methods [1].
To continue talking to Dosu, mention @dosu.
from llama_index.
To correct the implementation and improve visibility over your pipeline and the average costs of your requests, you should ensure that the CallbackManager
is properly utilized to log and trace the events. The provided code already includes some callback management, but it might be missing detailed logging for each step in the pipeline.
Here is a revised version of the QueryPipelineTool
class with enhanced logging and tracing:
from langfuse.llama_index import LlamaIndexCallbackHandler
from llama_index.core import Settings
from llama_index.core.callbacks import CallbackManager, CBEventType, EventPayload
from llama_index.core.tools import AsyncBaseTool, ToolOutput, ToolMetadata
from typing import Any
langfuse_callback_handler = LlamaIndexCallbackHandler()
Settings.callback_manager = CallbackManager([langfuse_callback_handler])
class QueryPipelineTool(AsyncBaseTool):
pipeline: QueryPipeline
def call(self, input: Any) -> ToolOutput:
with Settings.callback_manager.event(
CBEventType.FUNCTION_CALL,
payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
) as function_event:
pipeline_result = self._run_pipeline(input)
function_event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})
return ToolOutput(
content=str(pipeline_result),
tool_name="query_pipeline_tool",
raw_input={"query_str": input},
raw_output=pipeline_result
)
async def acall(self, input: Any) -> ToolOutput:
with Settings.callback_manager.event(
CBEventType.FUNCTION_CALL,
payload={EventPayload.FUNCTION_CALL: {"query_str": input}},
) as function_event:
pipeline_result = await self._arun_pipeline(input)
function_event.on_end(payload={EventPayload.FUNCTION_OUTPUT: str(pipeline_result)})
return ToolOutput(
content=str(pipeline_result),
tool_name="query_pipeline_tool",
raw_input={"query_str": input},
raw_output=pipeline_result
)
def _run_pipeline(self, input: Any) -> Any:
with Settings.callback_manager.event(
CBEventType.QUERY,
payload={EventPayload.QUERY_STR: input},
) as query_event:
pipeline_result = self.pipeline.run(query_str=input)
query_event.on_end(payload={EventPayload.RESPONSE: str(pipeline_result)})
return pipeline_result
async def _arun_pipeline(self, input: Any) -> Any:
with Settings.callback_manager.event(
CBEventType.QUERY,
payload={EventPayload.QUERY_STR: input},
) as query_event:
pipeline_result = await self.pipeline.arun(query_str=input)
query_event.on_end(payload={EventPayload.RESPONSE: str(pipeline_result)})
return pipeline_result
@property
def metadata(self) -> ToolMetadata:
return ToolMetadata(
name="query_pipeline_tool",
description="tool description",
)
@metadata.setter
def metadata(self, value: ToolMetadata):
self._custom_metadata = value
def __init__(self, pipeline: QueryPipeline):
self.pipeline = pipeline
self._custom_metadata = None
tool = QueryPipelineTool(get_pipeline()) # this is a normal QueryPipeline from llamaindex
agent = OpenAIAgent.from_tools(
tools=[tool],
llm=OpenAI(model='gpt-4o', temperature=0),
)
agent.stream_chat(query)
In this example, the call
and acall
methods now include nested spans for both the function call and the query pipeline. The EventPayload.RESPONSE
key is used to log the pipeline result, ensuring that the query pipeline spans are generated and visible in Langfuse alongside the function call span [1].
@logan-markewich, this one's a bit out of my wheelhouse, could you step in and provide your guidance?
To continue talking to Dosu, mention @dosu.
from llama_index.
@dosu could you please provide an example?
from llama_index.
@dosu in this whay I dont see the query pipeline spans, just the FUNCTION_CALL span
from llama_index.
@dosu EventPayload.QUERY_RESULT
does not exists.
And the code that you provided does not work.
Anyone can help? We are loosing visibility over our pipline and the average costs of our requests.
from llama_index.
Related Issues (20)
- [Question]: How to insert/delete document to/from VectorStoreIndex when using IngestionPipeline? HOT 2
- Compatibility issue between Qdrant and DSPy when Qdrant is used as the VectorStoreIndex's storage context HOT 5
- [Question]: AttributeError: 'property' object has no attribute 'context_window' HOT 1
- [Question]: The created knowledge graph does not have edge relationships neo4j HOT 13
- [Documentation]: Some of the URL Not Working HOT 3
- [Question]: Unable to understand how document storage works in case nodes are deleted HOT 1
- [Documentation]: Broken 'Examples' Link HOT 3
- [Feature Request]: Add a notebook to show llamaindex agent works with graphRAG and Vertex AI
- [Bug]: File rename error in llama-index-finetuning/llama_index/finetuning/mistralai/utils.py HOT 1
- [Question]: How to enable "Calling function" print out after querying from Multi-Document Agent example HOT 3
- [Question]: Access LLM's response object CompleteResponse() attribute `additional_kwarg` in RAG HOT 2
- [Bug]: Error in initializing neo4j HOT 2
- Indexes cannot be created correctly using the MilvusVectorStore. HOT 12
- How should the dim parameter value of MilvusVectorStore be calculated? HOT 4
- [Bug]: ERROR: Failed building wheel for pystemmer HOT 1
- How to deploy open-source embedding models in auto-merging retriever: ValueError: shapes (1024,) and (384,) not aligned: 1024 (dim 0) != 384 (dim 0) HOT 3
- [Bug]: No module named 'llama_index.llms.openai.base HOT 1
- [Bug]: [OpenAILike] Cannot use llm_chat_callback on an instance without a callback_manager attribute HOT 4
- [Feature Request]: Version pinning for sub packages HOT 2
- I wonder how to use llama_index to retrieve the Milvus collection after it is created and indexed using the MilvusVectorStore. HOT 4
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from llama_index.