Git Product home page Git Product logo

langstream / langstream Goto Github PK

View Code? Open in Web Editor NEW
363.0 10.0 26.0 120.75 MB

LangStream. Event-Driven Developer Platform for Building and Running LLM AI Apps. Powered by Kubernetes and Kafka.

Home Page: https://langstream.ai

License: Apache License 2.0

Java 91.90% Shell 0.93% Python 6.59% Dockerfile 0.13% HTML 0.45%
genai gpt-4 kafka kafka-connect kubernetes llm vectordb huggingface langchain llamaindex

langstream's Introduction

LangStream

banner
license GitHub release (with filter)

Check out our website.

Have a question? Join our community on Slack or Linen!

For the complete documentation, go here.

Get the LangStream VS Code extension here.

Contents

CLI

Warning CLI requires Java 11+ to be already installed on your machine.

Installation

There are multiple ways to install the CLI.

  • MacOS:

    • Homebrew
    brew install LangStream/langstream/langstream
    
    • Binary with curl
    curl -Ls "https://raw.githubusercontent.com/LangStream/langstream/main/bin/get-cli.sh" | bash
    
  • Unix:

    • Binary with curl
    curl -Ls "https://raw.githubusercontent.com/LangStream/langstream/main/bin/get-cli.sh" | bash
    

Verify the binary is available:

langstream -V

Refer to the CLI documentation to learn more.

Try the sample application

Run the sample Chat Completions application on-the-fly:

export OPEN_AI_ACCESS_KEY=your-key-here
langstream docker run test \
   -app https://github.com/LangStream/langstream/blob/main/examples/applications/openai-completions \
   -s https://github.com/LangStream/langstream/blob/main/examples/secrets/secrets.yaml

In a different terminal window:

langstream gateway chat test -cg consume-output -pg produce-input -p sessionId=$(uuidgen)

chat

See more sample applications in the examples folder.

Create your own application

To create your own application, refer to the developer documentation.

Run LangStream on Kubernetes

LangStream is production-ready, and it's highly suggested deploying it on a Kubernetes cluster. The following Kubernetes distributions are supported:

  • Amazon EKS
  • Azure AKS
  • Google GKE
  • Minikube

To run a LangStream cluster, you need to the following external components:

  • Apache Kafka or Apache Pulsar cluster
  • S3 API-compatible storage or Azure Blob Storage (Amazon S3, Google Cloud Storage, Azure Blob Storage, MinIO)

Production-ready deployment

To install LangStream, you can use the langstream Helm chart:

helm repo add langstream https://langstream.ai/charts
helm repo update

Then create the values file. At this point you already need the storage service to be up and running.

In case you're using S3, you can use the following values:

codeStorage:
  type: s3
  configuration:
    access-key: <aws-access-key>
    secret-key: <aws-secret-key>

For Azure:

codeStorage:
  type: azure
  configuration:
    endpoint: https://<storage-account>.blob.core.windows.net
    container: langstream
    storage-account-name: <storage-account>
    storage-account-key: <storage-account-key>

Now install LangStream with it:

helm install -n langstream --create-namespace langstream langstream/langstream --values values.yaml
kubectl wait -n langstream deployment/langstream-control-plane --for condition=available --timeout=300s

Local deployment

To create a local LangStream cluster, it's recommended to use minikube. mini-langstream comes in help for installing and managing your local cluster.

To install mini-langstream:

  • MacOS:
brew install LangStream/langstream/mini-langstream
  • Unix:
curl -Ls "https://raw.githubusercontent.com/LangStream/langstream/main/mini-langstream/get-mini-langstream.sh" | bash

Then startup the cluster:

mini-langstream start

Deploy an application:

export OPEN_AI_ACCESS_KEY=<your-openai-api-key>
mini-langstream cli apps deploy my-app -app https://github.com/LangStream/langstream/tree/main/examples/applications/openai-completions -s https://github.com/LangStream/langstream/blob/main/examples/secrets/secrets.yaml

To stop the cluster:

mini-langstream delete

Refer to the mini-langstream documentation to learn more.

Development

Requirements for building the project:

  • Docker
  • Java 17
  • Git
  • Python 3.11+ and PIP

If you want to test local code changes, you can use mini-langstream.

mini-langstream dev start

This command will build the images in the minikube context and install all the LangStream services with the snapshot image.

Once the cluster is running, if you want to build abd load a new version of a specific service you can run:

mini-langstream dev build <service>

or for all the services

mini-langstream dev build

langstream's People

Contributors

cbornet avatar cdbartholomew avatar dave2wave avatar dependabot[bot] avatar devinbost avatar dlg99 avatar dobosz avatar eolivelli avatar gagravarr avatar mark878 avatar mendonk avatar mfortman11 avatar nicoloboschi avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

langstream's Issues

Webcrawler agent enhancements

Currently, the web crawler agent uses Jsoup to:

  • Connect to page URL
  • Get all href's on the page and add to a crawl queue
  • Get the current page's HTML (document.html())
  • Create a document from that HTML

Ideally, the agent could offer more features like:

  • Scrape the screen by rendering the page's HTML and create a Document from the text().
    • As a screen scrape it is assumed that no media (img, video, etc) will be included, just simple text.
  • Offer a filter by node id or node class
  • Offer the option to provide an abstract Jsoup processing function
  • Add select page head meta values to the Document header

Example screen scrape starting a certain node id and including a few head meta values
(so only the document's page content is retrieved, not the document navigation href's on every page pass):

  - name: "Crawl the WebSite"
    type: "webcrawler-source"
    configuration:
      seed-urls: "https://docs.langstream.ai/"
      allowed-domains: "https://docs.langstream.ai"
      ...
      asHtml: false
      asText: true
      parent-tag-id: "document-main"
      include-head-meta-names: "description, og:title, article:author, og:image"
  • asHtml could be the default
  • If parent-tag-id is not provided then grab the page .body()
  • Each value will be added as name/value header with the Document

Example custom function:

  - name: "Crawl the WebSite"
    type: "webcrawler-source"
    configuration:
      seed-urls: "https://docs.langstream.ai/"
      allowed-domains: "https://docs.langstream.ai"
      ...
      page-render-function: -|
          private string renderPage(Jsoup.Document doc){
              Element ele = doc.body();

              // could do all kinds of weird page navigation

              return ele.text();
          }

S3 code storage: keys authentication

Implement accessKeyID and accessKeySecret in the s3 code storage.
It would be nice to also being able to load the accessKeySecret from a existing secret

Make secrets optional when updating application

Sometimes you want to update an existing application but don't have the original secret values. Currently, an update that requires all manifests uses the PUT verb. Could the UPDATE verb be used to represent an update to certain parts of an application?

To specify that the secrets be removed using the UPDATEverb , provide a blank secrets file. Then all existing secrets would be removed and no new secrets would be created.

To specify that the secrets should remain unchanged up all other areas (pipeline, configuration, instance) be updated, don't provide any secrets file.

With this, you could create a separation of role. Where the platform operator creates the tenant, a security person "inits" an example application with all the needed secrets, and a developer updates with the intended function. The security person could provide a schema of secrets, for the developer to consume (but never see the actual values).

Operator: cannot delete an app that has problems during deployment

I have an application that cannot be "deployed", the "sga-runtime-deployer-test-c427f " pod reports an Authentication error to Pulsar.

I have deleted the application but the "sga-runtime-deployer-cleanup-test-6rf92 " pod don't work because of this error:

Events:                                                                                                                                                                                                                                
  Type     Reason       Age                From               Message                                                                                                                                                                  
  ----     ------       ----               ----               -------                                                                                                                                                                  
  Normal   Scheduled    95s                default-scheduler  Successfully assigned sga-test/sga-runtime-deployer-cleanup-test-6rf92 to minikube                                                                                       
  Warning  FailedMount  32s (x8 over 96s)  kubelet            MountVolume.SetUp failed for volume "app-secrets" : secret "test" not found  

Control plane/CLI: get apps secrets

One concern is that we should probably introduce some kind of roles. One common use case is that an admin prepare the empty app with the secrets and another person with lower permissions, use those secrets without knowing the actual value.
One note is that the apps logs could show the secret values as well

Packaging of the app occurs before downloading dependencies

I created a pipeline that requires the C* sink, like this:

 - name: "Write to AstraDB"
    type: "sink"
    input: "text-chunks"
    configuration:
      connector.class: com.datastax.oss.kafka.sink.CassandraSinkConnector
      key.converter: org.apache.kafka.connect.storage.StringConverter
      value.converter: org.apache.kafka.connect.storage.StringConverter
      cloud.secureConnectBundle: "{{{ secrets.cassandra.secure-connect-bundle }}}"
      auth.username: "{{{ secrets.cassandra.username }}}"
      auth.password: "{{{ secrets.cassandra.password }}}"
      topic.chunks-topic.documents.documents.mapping: "filename=value.filename, chunk_id=value.chunk_id, language=value.language, text=value.text, embeddings_vector=value.embeddings_vector, num_tokens=value.chunk_num_tokens"
      name: cassandra-sink

And this:

configuration:
  resources:
  - type: "open-ai-configuration"
    name: "OpenAI Azure configuration"
    configuration:
      url: "{{ secrets.open-ai.url }}"
      access-key: "{{ secrets.open-ai.access-key }}"
      provider: "azure"
  dependencies:
    - name: "Kafka Connect Sink for Apache Cassandra from DataStax"
      url: "https://github.com/datastax/kafka-sink/releases/download/1.5.0/kafka-connect-cassandra-sink-1.5.0.jar"
      sha512sum: "242bf60363d36bd426232451cac836a24ae8d510857372a128f601503ad77aa9eabf14c4f484ca0830b6a68d9e8664e3820739ad8dd3deee2c58e49a94a20a3c"
      type: "java-library"
~

The first time I deployed the app, the sink went into a crash loop with a ClassNotFound error. I shelled into the pod and noticed that the java/lib directory that should contain the jar for the C* sink was not in the /app-code-download directory.

I deleted the app and redeployed it. The logs messages look like this:

chrisbartholomew@cbartholomew-rmbp16 ~/git/streaming-gen-ai -  (main) $  ./bin/sga-cli --conf=conf/cli.yaml apps deploy i-love-ai -app examples/applications/i-love-ai/ -i examples/instances/astra.yaml -s examples/applications/i-love-ai/secrets.yaml
packaging app: /Users/chrisbartholomew/git/streaming-gen-ai/examples/applications/i-love-ai
app packaged
using instance: /Users/chrisbartholomew/git/streaming-gen-ai/examples/instances/astra.yaml
using secrets: /Users/chrisbartholomew/git/streaming-gen-ai/examples/applications/i-love-ai/secrets.yaml
deploying application: i-love-ai (28633 KB)
Dependency: examples/applications/i-love-ai/java/lib/kafka-connect-cassandra-sink-1.5.0.jar at /Users/chrisbartholomew/git/streaming-gen-ai/examples/applications/i-love-ai/java/lib/kafka-connect-cassandra-sink-1.5.0.jar
application i-love-ai deployed

From the logs, it looks like "packaging app" is happening before checking the dependency and downloading it.

The second time I deployed the app, the C* sink worked and the java/lib directory was present.

Split application zip into three: application, instance and secrets

Currently the expected zip for an application must contains the structure with:

  • instance.yaml
  • secrets.yaml (optional)
  • app files

The logic for building this zip is in the CLI.
It's better to move out the logic in the control plane endpoint to handle plain http calls correctly.

The result will be that the applications deploy endpoint will accept 3 different zips:

  1. applicazion code/config zip
  2. instance.yaml zip
  3. secrets.yaml zip

Kafka Connect provides some Netty libraries, not imported into the Cassandra Sink

The DataStax Kafka Connector relies in the presence of some Netty version on the Kafka Connect Classpath

Kafka Connect 3.4.0 brings there Netty classes:```

libs/netty-buffer-4.1.78.Final.jar libs/netty-handler-4.1.78.Final.jar libs/netty-transport-classes-epoll-4.1.78.Final.jar
libs/netty-codec-4.1.78.Final.jar libs/netty-resolver-4.1.78.Final.jar libs/netty-transport-native-epoll-4.1.78.Final.jar
libs/netty-common-4.1.78.Final.jar libs/netty-transport-4.1.78.Final.jar libs/netty-transport-native-unix-common-4.1.78.Final.jar



Unfortunately Kafka Connect doesn't provide classpath isolation and so connectors are free to use whatever the environment provides.

As a first step we should make everything work, so we should add the same dependencies on the classpath when we run Kafka Connect stuff.

We should relocate all the internal dependencies of our runtime somehow, but not all (for instance the Kafka Client?)


This is the error I see on the Kafka Connect Sink:

runtime Suppressed: java.lang.NoClassDefFoundError: io/netty/handler/codec/ByteToMessageDecoder │
│ runtime at com.datastax.oss.driver.internal.core.channel.ChannelFactory$ChannelFactoryInitializer.initChannel(ChannelFactory.java:352) │
│ runtime at io.netty.channel.ChannelInitializer.initChannel(ChannelInitializer.java:129) │
│ runtime at io.netty.channel.ChannelInitializer.handlerAdded(ChannelInitializer.java:112) │
│ runtime at io.netty.channel.AbstractChannelHandlerContext.callHandlerAdded(AbstractChannelHandlerContext.java:1114) │
│ runtime at io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(DefaultChannelPipeline.java:609) │
│ runtime at io.netty.channel.DefaultChannelPipeline.access$100(DefaultChannelPipeline.java:46) │
│ runtime at io.netty.channel.DefaultChannelPipeline$PendingHandlerAddedTask.execute(DefaultChannelPipeline.java:1463) │
│ runtime at io.netty.channel.DefaultChannelPipeline.callHandlerAddedForAllHandlers(DefaultChannelPipeline.java:1115) │
│ runtime at io.netty.channel.DefaultChannelPipeline.invokeHandlerAddedIfNeeded(DefaultChannelPipeline.java:650) │
│ runtime at io.netty.channel.AbstractChannel$AbstractUnsafe.register0(AbstractChannel.java:514) │
│ runtime at io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(AbstractChannel.java:429) │
│ runtime at io.netty.channel.AbstractChannel$AbstractUnsafe$1.run(AbstractChannel.java:486) │
│ runtime at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) │
│ runtime at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) │
│ runtime at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) │
│ runtime at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) │
│ runtime at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) │
│ runtime at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) │
│ runtime at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) │
│ runtime at java.base/java.lang.Thread.run(Unknown Source)


[cli] App lists should not show an empty "secret"

When I run
sga apps list

I see this output

% ./bin/sga-cli apps list
{
 “test” : {
  “name” : “test”,
  “instance” : {
   “resources” : { },
   “modules” : {
    “module-1” : {
     “id” : “module-1”,
     “pipelines” : {
      “pipeline-1” : {
       “id” : “pipeline-1”,
       “module” : “module-1”,
       “name” : “Generate data on Pulsar”,
       “agents” : [ {
        “connectableType” : “agent”,
        “connectableType” : “agent”,
        “id” : “source-1-id”,
        “name” : “source1",
        “type” : “source”,
        “input” : null,
        “output” : {
         “endpoint” : {
          “connectableType” : “topic”,
          “connectableType” : “topic”,
          “name” : “output-topic”,
          “keySchema” : null,
          “valueSchema” : null,
          “partitions” : 0,
          “creation-mode” : “create-if-not-exists”
         }
        },
        “configuration” : {
         “sourceType” : “data-generator”
        }
       } ]
      }
     },
     “topics” : {
      “output-topic” : {
       “connectableType” : “topic”,
       “connectableType” : “topic”,
       “name” : “output-topic”,
       “keySchema” : null,
       “valueSchema” : null,
       “partitions” : 0,
       “creation-mode” : “create-if-not-exists”
      }
     }
    }
   },
   “instance” : {
    “streamingCluster” : {
     “type” : “pulsar”,
     “configuration” : {
      “admin” : {
       “serviceUrl” : “https://pulsar-gcp-useast4.api.dev.streaming.datastax.com/”,
       “authPluginClassName” : “org.apache.pulsar.client.impl.auth.AuthenticationToken”,
       “authParams” : “token: {{ secrets.astra-token.token }}”
      },
      “defaultTenant” : “{{ secrets.astra-token.tenant }}“,
      “defaultNamespace” : “{{ secrets.astra-token.namespace }}”
     }
    },
    “computeCluster” : {
     “type” : “pulsar”,
     “configuration” : {
      “admin” : {
       “serviceUrl” : “https://pulsar-gcp-useast4.api.dev.streaming.datastax.com/”,
       “authPluginClassName” : “org.apache.pulsar.client.impl.auth.AuthenticationToken”,
       “authParams” : “token: {{ secrets.astra-token.token }}”
      },
      “defaultTenant” : “{{ secrets.astra-token.tenant }}“,
      “defaultNamespace” : “{{ secrets.astra-token.namespace }}”
     }
    },
    “globals” : null
   },
   “secrets” : null
  },
  “status” : null
 }
}

Please note that the application currently has some problem as the deply job fails with an authentication error.

The point of this issue is that "secrets" shouldn't be shown, or better, we should list only the "names" of the secrets that are present in the application (this will help UI and debugging)

We need a dedicated API to deal with secrets

Better order the sga helm chart to deploy the pods in the best order

I noticed that when deploying with the helm chart to minikube many of the pods restarted several times. This was due to two reasons.

  1. Minikube was doing too much at once and the health checks failed.
  2. Some pods are dependent on other pods being functional.

The sga-cli and sga-control-plane should be first.

Add "apps init" option to CLI, to stub out a development environment

Add the command sga-cli apps init "my-cool-app-name" that initializes a best practice app structure.

  • mkdir "applications"
  • touch "applications/pipeline.yaml"
  • touch "applications/configuration.yaml"
  • touch "secrets.yaml"
  • touch "instance.yaml"

Obviously, we could go quite far with initialization, like a questionnaire to build a more meaningful yaml. Initially, the yaml could simply include all required fields with value of "null".

How to Extract Text Page by Page with PDFBox

I asked ChatGPT:
To extract text from a PDF page by page in Java, you can use the Apache PDFBox library. PDFBox is an open-source Java library that provides various functionalities to work with PDF files. You can use it to extract text from individual pages of a PDF document. Here's a step-by-step guide on how to achieve this:

Step 1: Set up your Java project
Make sure you have a Java development environment set up and create a new Java project. You can use your preferred IDE or a simple text editor like Notepad++.

Step 2: Add Apache PDFBox dependency
Download the latest version of Apache PDFBox JAR from the Apache PDFBox website (https://pdfbox.apache.org/). Add the PDFBox JAR file to your project's classpath.

Step 3: Write Java code to extract text page by page

import java.io.File;
import java.io.IOException;

import org.apache.pdfbox.pdmodel.PDDocument;
import org.apache.pdfbox.text.PDFTextStripper;

public class PDFTextExtractor {

    public static void main(String[] args) {
        String pdfFilePath = "path/to/your/pdf/file.pdf"; // Replace with the actual path to your PDF file

        try (PDDocument document = PDDocument.load(new File(pdfFilePath))) {
            int totalPages = document.getNumberOfPages();
            PDFTextStripper pdfTextStripper = new PDFTextStripper();

            for (int page = 1; page <= totalPages; page++) {
                pdfTextStripper.setStartPage(page);
                pdfTextStripper.setEndPage(page);

                String pageText = pdfTextStripper.getText(document);
                System.out.println("Page " + page + " text:\n" + pageText);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Step 4: Run the code
Replace "path/to/your/pdf/file.pdf" in the pdfFilePath variable with the actual path to the PDF file you want to extract text from. Then, run the Java code, and it will extract the text page by page from the PDF and print it to the console.

Remember to handle exceptions properly in a production environment. In this example, we've used a try-with-resources statement to automatically close the PDDocument after processing, but you can add appropriate exception handling for more robust applications.

Make sure you comply with the licensing terms of the Apache PDFBox library and respect any copyright or usage restrictions for the PDF files you're working with.

Applications have a name length limitation due to pod names are at most 63 characters

From @ddieruf

Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Failure executing: POST at: 
https://10.96.0.1:443/apis/batch/v1/namespaces/sga-my-tenant/jobs. Message: Job.batch 
"sga-runtime-deployer-flatten-transformation-using-astra-streaming-example-applicat" is invalid: 
spec.template.labels: Invalid value: "sga-runtime-deployer-flatten-transformation-using-astra-streaming-example-applicat": 
must be no more than 63 characters. Received status: Status(apiVersion=v1, code=422, details=StatusDetails(causes=
[StatusCause(field=spec.template.labels, message=Invalid value: 
"sga-runtime-deployer-flatten-transformation-using-astra-streaming-example-applicat": must be no more than 63 characters, reason=FieldValueInvalid, additionalProperties={}), StatusCause(field=spec.template.labels, 
message=Invalid value: "sga-runtime-deployer-flatten-transformation-using-astra-streaming-example-applicat": must be no more than 63 characters, reason=FieldValueInvalid, additionalProperties={})], group=batch, 
kind=Job, name=sga-runtime-deployer-flatten-transformation-using-astra-streaming-example-applicat, retryAfterSeconds=null, uid=null, additionalProperties={}), kind=Status, message=Job.batch "sga-runtime-deployer-flatten-transformation-using-astra-streaming-example-applicat" is invalid: spec.template.labels: Invalid value: "sga-runtime-deployer-flatten-transformation-using-astra-streaming-example-applicat": must be no more than 63 characters, metadata=ListMeta(_continue=null, remainingItemCount=null, resourceVersion=null, selfLink=null, additionalProperties={}), reason=Invalid, status=Failure, additionalProperties={}).
    at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:671)
    at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.requestFailure(OperationSupport.java:651)
    at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.assertResponseCode(OperationSupport.java:600)
    at io.fabric8.kubernetes.client.dsl.internal.OperationSupport.lambda$handleResponse$0(OperationSupport.java:560)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
    at io.fabric8.kubernetes.client.http.StandardHttpClient.lambda$completeOrCancel$10(StandardHttpClient.java:140)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
    at io.fabric8.kubernetes.client.http.ByteArrayBodyHandler.onBodyDone(ByteArrayBodyHandler.java:52)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
    at io.fabric8.kubernetes.client.vertx.VertxHttpRequest.lambda$null$1(VertxHttpRequest.java:122)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:264)
    at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:246)
    at io.vertx.core.http.impl.HttpEventHandler.handleEnd(HttpEventHandler.java:76)
    at io.vertx.core.http.impl.HttpClientResponseImpl.handleEnd(HttpClientResponseImpl.java:250)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.lambda$new$0(Http1xClientConnection.java:444)
    at io.vertx.core.streams.impl.InboundBuffer.handleEvent(InboundBuffer.java:255)
    at io.vertx.core.streams.impl.InboundBuffer.write(InboundBuffer.java:134)
    at io.vertx.core.http.impl.Http1xClientConnection$StreamImpl.handleEnd(Http1xClientConnection.java:708)
    at io.vertx.core.impl.EventLoopContext.lambda$execute$2(EventLoopContext.java:78)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)

How to Use Lucene Analyzers

Lucene provides a robust set of tools to build search indexes and then find documents. In fact, Jonathan used Lucene's vector similarity as the basis for VectorSearch.

For our use cases we can take advantage of Lucene's rich set of Analyzers and Filters - https://lucene.apache.org/core/9_7_0/core/org/apache/lucene/analysis/package-summary.html

Solr makes use of Lucene and here is Solr's description on how it uses these features: https://solr.apache.org/guide/solr/latest/indexing-guide/document-analysis.html

This page ties back Solr configuration to Lucene and presents the pattern that is similar to what we are doing in SGA. https://solr.apache.org/guide/solr/latest/indexing-guide/analyzers.html

Rate limit S3 Source

Currently the S3 Source will issue HTTP requests continuously to S3.
We should have the possibility to decrease the frequency of the requests
Also we should handle all files from the list response and not only the first one to get a better throughput.

Step 2 of Deploy Your First Application needs Google Client-id

Step 2 of Deploy Your First Application doesn't work without google client-id because it has an optional gateway step.

./bin/langstream apps deploy openai-completions -app examples/applications/openai-completions -i examples/instances/kafka-kubernetes.yaml -s /tmp/secrets.yaml ./bin/langstream apps get openai-completions packaging app: /Users/mendon.kissling/Documents/GitHub/langstream/examples/applications/openai-completions app packaged deploying application: openai-completions (2 KB)

{ "type" : "about:blank", "title" : "Internal Server Error", "status" : 500, "detail" : "Missing context for compound variable 'secrets.google.client-id' on line 1. 'google' was not found.", "instance" : "/api/applications/default/openai-completions" } Request failed: 500

Create a default tenant

In order to quickly use SGA it's useful to already provide a default tenant. We can call it "default" or "public" or "sga"

Control plane and Api gateway take up to 1 minute to startup

These two services takes a lot to bootstrap. this cause readiness probe to restart the pods very often during upgrades.

Both are SpringBoot apps and both init the k8s client during the startup.

We should investigte the reason and potentially move some components to a lazy initialization, where reasonable

Handle headers/properties in GenAIToolKitAgent

Currently the TransformContext handles properties as a Map<String, String>. But Record has headers as Collection<(String, Object)>.
So we need to handle:

  • The fact that it's a Collection:
    • we can have multiple values for a given key
    • the order matters
  • The fact that the value is an Object and not a String

Query agent: allow to echo results

When debugging a step function flow, it would be helpful to be able to echo the results of outputs from prior steps to ensure they appear as expected (and have the expected types) at runtime.

CLI usability improvements

### Tasks
- [ ] Generate autocompletion
- [ ] Add enum values in the help
- [ ] Introduce profile
- [ ] Save a common configuration in the user home (~/.sgai/config)
- [ ] Create brew distribution

Support application versioning and rollbacks

The idea is to support application "revisions". For each deploy, a new revision is generated. Then the user could rollback to a previous revision.
The behavior is very similar to helm releases.

Improve code download security

The runtime pod has the s3 credentials loaded in the environment and it will be able to access artifacts of other tenants very easily.
I have two possible solutions in mind:

  • use a init container to download the code in a shared volume. The user code runs in another container in the same pod and the code is already there. The user could still have the chance to read the pod definition if it gets access to the pods in that namespace.
  • the pod runtime calls an intermediate service just passing the code artifact id and this service (control plane?) proxy the download to the pod. In this way the s3 credentials never leaves the sga namespace. The problem is that the pod has to make an authenticated call and the authz frameowrk will have to check if that tenant can access that code artifact id

the second one is more secure but very hard to implement now.
the first one is way more secure than the current implementation and it’s quick to implement

Check for bad text-splitter agent config

It would have been nice to be notified about a bad agent config before attempting deployment

Got a larger chunk overlap (100) than chunk size (3), should be smaller.

attempted this

  - name: "Split into chunks"
    type: "text-splitter"
    output: "output-topic" # optional
    configuration:
      splitter_type: "RecursiveCharacterTextSplitter"
      chunk_size: 3
      separators: ["\n\n", "\n", " ", ""]
      keep_separator: false
      chunk_overlap: 100
      length_function: "cl100k_base"

Improve deployer jobs timing

After the deploy is completed, the pod still hanging for half a minute due to the k8s client not being closed

Skip download of dependency if already exists

It appears that the download of a dependency is always done even if the file is already present. Since we have a checksum for the download, we should check that the file exists and it matches the checksum. If it matches, then the download can be skipped. This can save time when iterating on a new pipeline. Having to download a large Kafka connector can be slow.

Split runtime images in Java and Python

Each Java agent runs in the docker image with all the python dependencies that are useless at runtime. The main driver for this change is the image weight.
Only Python deps take more than 1GB over a total of 2GB.

Application upsert/apply

Currently:
deploy -> only create
update -> only update

We could add an apply mode which is basically an upsert

deploy -> create or update
create -> only create
update -> only update

the advantage of having the “apply” commands is that you don’t have to care if your app already exists.
the advantage of having the “create/update” commands is that you know exactly what you’re doing

Helm has "install" but it won't update. It has "update" with a flag of "-i" if you want to install if not exists.
Kubectl has "apply" because that's the state you want.

Add an info endpoint on control plane

Can the control plane have an additional endpoint maybe at /info where it provides details about the installation?

  • Versions of the services
  • number of replicas
  • Status

Better error when no app is found

I had configured the cli to use tenant-1 but was working on an app in tenant-2. When I ran the following:

./bin/sga-cli gateway produce text-processing simple-produce --value "Hi there"

I got:

SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.
The HTTP response from the server [500] did not permit the HTTP upgrade to WebSocket

When I looked at the gateway logs I discovered that:

java.lang.IllegalArgumentException: application text-processing not found

Because I had the wrong tenant set. It would have helped to receive a better message from the CLI.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.