Git Product home page Git Product logo

Comments (17)

TomSteenbergen avatar TomSteenbergen commented on August 26, 2024 1

Just ran the materialization with the same feature set by changing DEFAULT_BATCH_SIZE from 10,000 to 1,000,000 and keeping the batch size of the Postgres online store to 5000, but it doesn't change anything as the max batch size observed that's returned by pyarrow.Table.to_batches is 2890, still well below 10,000

from feast.

tokoko avatar tokoko commented on August 26, 2024

Performance comparison sounds good. Both use COPY command under the hood, so I wouldn't expect too much of a difference, but stranger things have happened :). adbc might have an upper hand in user-friendliness as input is just arrow tables, not sure how easy it will be to prepare the input for psycopg COPY command.

from feast.

job-almekinders avatar job-almekinders commented on August 26, 2024

Thanks for your input! We will probably give it a spin sometime soon, so let's see what the results are :)

from feast.

TomSteenbergen avatar TomSteenbergen commented on August 26, 2024

So unfortunately COPY (even using adbc) is not faster than the current execute_values of psycopg2.

COPY excels when you are inserting a large file at once. However, the materialize method calls the _materialize_one method of the LocalMaterializationJob class, which loops over many smaller batches (source) using for batch in table.to_batches(DEFAULT_BATCH_SIZE):. DEFAULT_BATCH_SIZE is set to 10,000, but the to_batches method on a pyarrow.Table only uses that as a max chunk size. The docs note the following:

Maximum number of rows for each RecordBatch chunk. Individual chunks may be smaller depending on the chunk layout of individual columns.

This is indeed what happens (at least with my dataset and Postgres database). The chunks are oftentimes a lot smaller than 10,000.

So the online_write_batch method on the PostgreSQLOnlineStore class is called many times during a single materialization. Changing to COPY here is therefore not beneficial, especially since COPY doesn't support an ON CONFLICT clause like INSERT, which is required to properly update feature rows that are already present in the online store. So you need to COPY into some staging table first and then still call an INSERT using a CONFICT ON clause.

So how best to proceed?

  • Changing the LocalMaterializationJob._materialize_one method to not batch seems difficult as that will impact ALL materialization methods for all online stores.
  • Yet, it seems like we cannot speed up the PostgreSQLOnlineStore.online_write_batch method much more as it is called so often with such small batches.
  • Increasing the batch size that the pyarrow.Table method returns also does not seem possible according to the docs.

Anyone any suggestions?

(P.S. I also had a look at the SnowflakeMaterializationEngine.materialize_to_external_online_store method but that also calls the online_write_batch in many smaller batches of which the size is not controllable: for i, df in enumerate(cursor.fetch_pandas_batches()): (source).

from feast.

TomSteenbergen avatar TomSteenbergen commented on August 26, 2024

Yet, it seems like we cannot speed up the PostgreSQLOnlineStore.online_write_batch method much more as it is called so often with such small batches.

There is one small improvement possible, and that is getting rid of the batching within PostgreSQLOnlineStore.online_write_batch, where we batch again but now with batch size 5000 (source). This does result in some speed up in case the batches passed by LocalMaterializationJob._materialize_one exceed 5000.

from feast.

tokoko avatar tokoko commented on August 26, 2024

@TomSteenbergen Can you share some numbers as well? How much of an improvement are we talking about if we do away with batching in the local materialization engine?

from feast.

TomSteenbergen avatar TomSteenbergen commented on August 26, 2024

@tokoko Testing using a feature view with 100,000 entities and 6 features (so 600,000 table rows):

  • Current batched implementation with execute_values from psycopg2: 192.6 seconds
  • Single COPY using adbc_ingest: 142.2 seconds
    • 2.6 seconds spent on the COPY query
    • 133.6 seconds spent on the INSERT ON CONFLICT query (!!!)
      • With larger feature sets, this is actually taking so longer that I had to kill the query
  • Change batch size of Postgres online store (current implementation uses 5000):
    • 1000: 215.6 seconds
    • 2000: 203.9 seconds
    • 10,000: 198.6
    • 20,000: 202.1
    • 100,000: 179.5 seconds

from feast.

tokoko avatar tokoko commented on August 26, 2024

@TomSteenbergen thanks, are the last numbers with materialization engine batching left intact or disabled? Can't you also set DEFAULT_BATCH_SIZE to a very large number and run workloads like that?

from feast.

TomSteenbergen avatar TomSteenbergen commented on August 26, 2024

@TomSteenbergen thanks, are the last numbers with materialization engine batching left intact or disabled?

Intact, I did not alter DEFAULT_BATCH_SIZE for the last tests under Change batch size of Postgres online store. Just the batch size of 5000 in the PostgresOnlineStore.

Can't you also set DEFAULT_BATCH_SIZE to a very large number and run workloads like that?

I can, but changing that number will affect materialization for all online stores. Note that many of the returned batches by pyarrow.Table.to_batches are actually smaller than DEFAULT_BATCH_SIZE. See comment here

from feast.

tokoko avatar tokoko commented on August 26, 2024

I can, but changing that number will affect materialization for all online stores.

Yeah, I got that. It might be worth it to mess with other online stores if it proves to be important for performance here (or we can make materialization engine ask online store for the value instead of using a default one).

Note that many of the returned batches by pyarrow.Table.to_batches are actually smaller than DEFAULT_BATCH_SIZE. See comment here

I'm thinking maybe that's the case because pyarrow is following some heuristics and keeping those numbers below DEFAULT_BATCH_SIZE. Maybe with high enough batch size, actual batch sizes will also increase... just speculating.

Leaving all that aside, this seems to be much more of an upsert rather than an insert problem. We might have more luck optionally switching to "batch upsert" mode when the dataset to materialize is large enough (write to staging, lock table, run update and insert statements in a transaction). This means quite a few design changes in both materialization engines and online store interfaces, though. We might also need to distinguish between full table materializations and small inserts from /push endpoint. I wonder how valid these concerns are for other online stores.

from feast.

tokoko avatar tokoko commented on August 26, 2024

@TomSteenbergen Another request if it's not a trouble. Can you get rid of to_batches call altogether and call ingest on the whole pyarrow table?

from feast.

TomSteenbergen avatar TomSteenbergen commented on August 26, 2024

@TomSteenbergen Another request if it's not a trouble. Can you get rid of to_batches call altogether and call ingest on the whole pyarrow table?

With ingest you mean the second approach, using COPY through adbc_ingest? That is exactly what I did here: a single COPY using adbc_ingest on the whole table without any batching by the LocalMaterializationEngine or by PostgresOnlineStore.

from feast.

TomSteenbergen avatar TomSteenbergen commented on August 26, 2024

this seems to be much more of an upsert rather than an insert problem.

Agreed. Inserting data in a staging table with a COPY command and just replacing the feature table with the staging table is quick. But if we need to keep any older rows in the table that are not updated by the materialization call, then we need to upsert using INSERT ON CONFLICT, which is rather slow.

Perhaps we should, in addition to materialize, introduce a different method that doesn't upsert but just replaces the feature table in the online store with the latest feature values observed in the offline store, and thus get rid of any old data. Not sure how to name this new method, perhaps synchronize.

(This might be a nice method to have in any case, e.g. when an earlier materialization run materialized some erroneous data to the online store.)

from feast.

tokoko avatar tokoko commented on August 26, 2024

Agreed. Inserting data in a staging table with a COPY command and just replacing the feature table with the staging table is quick. But if we need to keep any older rows in the table that are not updated by the materialization call, then we need to upsert using INSERT ON CONFLICT, which is rather slow.

Not necessarily, instead of INSERT ON CONFLICT, you can lock the table and run two statements: one for updates (with inner join) and another for remaining inserts (whatever's left with an antijoin). That should be a lot faster. The main problem here is that other materialization engines (for example spark) run these method calls in parallel. We'll have to somehow change the whole thing to run in two phases (first to insert in parallel and then run merge after all inserts are done).

Perhaps we should, in addition to materialize, introduce a different method that doesn't upsert but just replaces the feature table in the online store with the latest feature values observed in the offline store, and thus get rid of any old data. Not sure how to name this new method, perhaps synchronize.

Maybe, but if we solve the materialize problem above this could just be (remove + materialize). Plus, this is a lot harder to solve for other online stores, redis collocates data with entities rather than feature views, it's not easy to truncate the existing data.

from feast.

TomSteenbergen avatar TomSteenbergen commented on August 26, 2024

Not necessarily, instead of INSERT ON CONFLICT, you can lock the table and run two statements: one for updates (with inner join) and another for remaining inserts (whatever's left with an antijoin). That should be a lot faster.

@tokoko That's an option indeed, but why would that be a lot faster? A quick Google search seems to indicate that INSERT ON CONFLICT is the most performant method for upserting data in Postgres. 😞

Plus, this is a lot harder to solve for other online stores, redis collocates data with entities rather than feature views, it's not easy to truncate the existing data.

Fair point, indeed not the easiest route.

As a short term improvement, I can get rid of the batching in PostgreSQLOnlineStore as that doesn't seem to speed things up (see this comment). While there, I can also fix the tqdm progress bar that now is not correctly initialized (num entities instead of number of entities * number of features in feature view)

WDYT @tokoko?

from feast.

tokoko avatar tokoko commented on August 26, 2024

That's an option indeed, but why would that be a lot faster? A quick Google search seems to indicate that INSERT ON CONFLICT is the most performant method for upserting data in Postgres. 😞

hmm.. after a google search of my own, sure, I may be wrong. I still feel like there should be a more performant workaround though.. something like doing a merge with a simple join and write out results to a (second 😄) staging table, then truncate the existing table and move data from staging to main. idk, maybe, maybe not.

As a short term improvement, I can get rid of the batching in PostgreSQLOnlineStore as that doesn't seem to speed things up (see this comment). While there, I can also fix the tqdm progress bar that now is not correctly initialized (num entities instead of number of entities * number of features in feature view)

Sure, that certainly can't hurt.

from feast.

TomSteenbergen avatar TomSteenbergen commented on August 26, 2024

@tokoko Created a PR here: #4331

Now that we are using psycopg, we use executemany. I did the same performance comparison and found that a single execute_many call in online_write_batch is better:

  • (Current:) Batched executemany for every 5000 rows: 188 seconds
  • Single executemany: 180 seconds
  • Loop and call execute for each row within a pipeline: 197 seconds

from feast.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.