Git Product home page Git Product logo

turbine's Introduction

Turbine

Turbine fetches company financial statement data filled with the SEC by leveraging the EDGAR API as well as historical price data from Yahoo Finance.

Table of Contents

Features

  • Configurabe: Turbine is fully customizable and configurable. Name, poller/parser/sender implementation can be configured using a .ini file.
  • Logging: Build in logging that can be configured using a .ini file. Any exceptions are automatically caught and properly logged.
  • Multithreading: For maximum performance you can define how many pollers, parsers and senders Turbine should spawn. It autimatically optimizes based on available CPUs.
  • Transaction Handler: You want to move successfully proccessed files to an archive directory and failed-to-proccess files to an error folder? No problem. Just implement your custom Transaction Handler to get notified when a poll just finished or failed processing which you can then handle as you see fit.
  • Monitoring: Turbine can be configured to listen on a specified port. Using the build in client a user can remotely connect to it and run basic queries or stop Turbine.

Architecture

The main workers of the framework are the Poller, Parser and the Sender.

DirectoryWatcher: The DirectoryWatcher watches the input directory for any incoming request.json files. These files contain information about what data Turbine should fetch. A sample request.json file is listed below:

{
    "topic": "concept",
    "resources": [
        {
            "tickers": [
                "AAPL",
                "MSFT"
            ],
            "concepts": [
                {
                    "name": "AccountsPayableCurrent",
                    "year": 2014
                }
            ]
        },
        {
            "tickers": [
                "BIIB"
            ],
            "concepts": [
                {
                    "name": "LongTermDebt",
                    "year": 2014
                }
            ]
        }
    ]
}

Poller: Poller threads connect to the source and poll for data. The source can be a database, file system, Web API, etc. The data is cached on disc as JSON or CSV file and then passed on to the Parser for further processing. Pollers will look if the requested data is cached before trying to fetch it from the original source. If you want to force the Pollers to re-fetch the data from the source you must erase the cache directory.

Parser: Parsers get the data files polled by the Pollers. They extract data and make it available for persistence. For that they dynamically load extractors from disc that handle the different data formats. The results are then passed on to the Senders.

Sender: Sender threads are responsible for persisting data into an SQLite database.

System Landscape

Setup IDE

Here is an example on how to setup VSCode for use with this project. You can use any IDE you like.

$ git clone https://github.com/lucaslouca/turbine
$ cd turbine

# Create and activate virtual environment
$ python3 -m venv env
$ source env/bin/activate

Install the required libraries:

(env) $ pip install -r requirements.txt 

Finally open the project in VSCode:

(env) $ code .

Make sure VSCode also opens the project in the correct virtual environment.

Create a new launch configuration. Below is a run configuration to launch Turbine within VSCode:

{
    "version": "0.2.0",
    "configurations": [
        {
            "name": "Turbine",
            "type": "python",
            "request": "launch",
            "program": "${workspaceRoot}/main.py",
            "console": "integratedTerminal",
            "args": [
                "-c",
                "config/connector.ini"
            ]
        }
    ]
}

Configuration

Below is a sample configuration:

[CONNECTOR]
Name=Turbine
Host=127.0.0.1
Port=35813

[DIRECTORY_WATCHER]
Directory=in

[POLLER_concepts]
Class=implementation.poller_concept.PollerConcept
Topic=concept
Args={}
MinThreads=2
MaxThreads=2

[POLLER_price]
Class=implementation.poller_price.PollerPrice
Topic=price
Args={}
MinThreads=2
MaxThreads=2

[PARSER]
Class=implementation.parser.Parser
Args={}
MinThreads=2
MaxThreads=2

[SENDER]
Class=implementation.sender.Sender
Args={"db":"database.db"}
MinThreads=2
MaxThreads=2

# Optional
[TRANSACTION_HANDLER]
Class=implementation.file_transaction_handler.FileTransactionHandler
Args={"error_dir":"out/error"}

How to run

$ source env/bin/activate
(env) $ python main.py

CONNECTORS
-------------------------------------------------------------
[0] - Turbine
Connector to start> 0
...

To start the Turbine in the background just run it using screen:

(env) $ screen -S "turbine" -dm python main.py

You can see that the framework automatically picks up any configuration files found in the config directory and lists them as a run option.

Extractors

You can easily create your custom extractors by subclassing the AbstractExtractor class and place the file into the extractors folder. Turbine will pick them up automatically on the next restart.

Important: Your extractor class name must be the same as the file name of the Python file that contains it. For example, if your custom extractor is called YourCustomExtractor, then it must be placed in a Python file called YourCustomExtractor.py under the extractors directory.

Here is an example of an extractor that processes CSV files:

class PriceExtractor(AbstractExtractor):
    @overrides(AbstractExtractor)
    def supports_input(self, request: DataExtractionRequest):
        return request.file_extension and request.file_extension.upper() == ".CSV"

    ...

    @ overrides(AbstractExtractor)
    def extract(self, request: DataExtractionRequest):
        self.log(f"Proccessing '{request.file}'")
        result = []
        rows = self._read_rows_from_file(file=request.file)
        for row in rows:
            try:
                result.append(
                    Price(
                        ticker=Ticker(symbol=request.ticker),
                        date=dt.datetime.strptime(row['Date'], "%Y-%m-%d").date(),
                        open=float(row['Open']),
                        high=float(row['High']),
                        low=float(row['Low']),
                        close=float(row['Close']),
                        adj_close=float(row['Adj Close']),
                        volume=float(row['Volume'])
                    )
                )
            except Exception as e:
                self.log_exception(exception=e)

        return result

In the above example the method supports_input(...) simply tells the framework that this extractor can only be used for files that have the extension .csv.

The extract(...) method should return a list of model objects that hold the extracted values.

Notice also how the extractor comes with logging capabilities as well using self.log(...).

Client

The framework comes also with a monitoring client. You can use this client to connect to your running connector and view its status or kill it.

(env) $ python3 client.py 127.0.0.1 -p 35813
=============================================================
Turbine
-------------------------------------------------------------
   Started: 2021-08-28 11:21:24.364306+02:00
    Poller: PollerConcept, PollerPrice - Total threads: (4)
    Parser: Parser - Total threads: (2)
    Sender: Sender - Total threads: (2)
-------------------------------------------------------------
Connector commands:
-------------------------------------------------------------
      stop: stop connector
     stats: show statistics
      head: show the first items in the queues
=============================================================
Client commands:
-------------------------------------------------------------
      quit: quit client
=============================================================

command>stats
         Polled: 8
         Parsed: 8
      Completed: 8
         Errors: 0

command>stop
(env) $ 

turbine's People

Contributors

lucaslouca avatar

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.