Git Product home page Git Product logo

blockchain-etl / ethereum-etl-airflow Goto Github PK

View Code? Open in Web Editor NEW
389.0 16.0 183.0 2.92 MB

Airflow DAGs for exporting, loading, and parsing the Ethereum blockchain data. How to get any Ethereum smart contract into BigQuery https://towardsdatascience.com/how-to-get-any-ethereum-smart-contract-into-bigquery-in-8-mins-bab5db1fdeee

License: MIT License

Python 99.48% Shell 0.17% Dockerfile 0.35%
ethereum blockchain-analytics data-engineering cryptocurrency web3 apache-airflow crypto data-analytics etl gcp

ethereum-etl-airflow's Introduction

Ethereum ETL Airflow

Read this article: https://cloud.google.com/blog/products/data-analytics/ethereum-bigquery-how-we-built-dataset

Setting up Airflow DAGs using Google Cloud Composer

Create BigQuery Datasets

Create Google Cloud Storage bucket

Create Google Cloud Composer (version 2) environment

Create a new Cloud Composer environment:

export ENVIRONMENT_NAME=ethereum-etl-0

AIRFLOW_CONFIGS_ARR=(
    "celery-worker_concurrency=8"
    "scheduler-dag_dir_list_interval=300"
    "scheduler-min_file_process_interval=120"
)
export AIRFLOW_CONFIGS=$(IFS=, ; echo "${AIRFLOW_CONFIGS_ARR[*]}")

gcloud composer environments create \
    $ENVIRONMENT_NAME \
    --location=us-central1 \
    --image-version=composer-2.1.14-airflow-2.5.1 \
    --environment-size=medium \
    --scheduler-cpu=2 \
    --scheduler-memory=13 \
    --scheduler-storage=1 \
    --scheduler-count=1 \
    --web-server-cpu=1 \
    --web-server-memory=2 \
    --web-server-storage=512MB \
    --worker-cpu=2 \
    --worker-memory=13 \
    --worker-storage=10 \
    --min-workers=1 \
    --max-workers=8 \
    --airflow-configs=$AIRFLOW_CONFIGS

gcloud composer environments update \
    $ENVIRONMENT_NAME \
    --location=us-central1 \
    --update-pypi-packages-from-file=requirements_airflow.txt

Create variables in Airflow (Admin > Variables in the UI):

Variable Description
ethereum_output_bucket GCS bucket to store exported files
ethereum_provider_uris Comma separated URIs of Ethereum nodes
ethereum_destination_dataset_project_id Project ID of BigQuery datasets
notification_emails email for notifications

Check other variables in dags/ethereumetl_airflow/variables.py.

Updating package requirements

Suggested package requirements for Composer are stored in requirements_airflow.txt.

You can update the Composer environment using the follow script:

ENVIRONMENT_NAME="ethereum-etl-0"
LOCAL_REQUIREMENTS_PATH="$(mktemp)"

# grep pattern removes comments and whitespace:
cat "./requirements_airflow.txt" | grep -o '^[^#| ]*' > "$LOCAL_REQUIREMENTS_PATH"

gcloud composer environments update \
  "$ENVIRONMENT_NAME" \
  --location="us-central1" \
  --update-pypi-packages-from-file="$LOCAL_REQUIREMENTS_PATH"

Note: Composer can be very pedantic about conflicts in additional packages. You may have to fix dependency conflicts where you had no issues testing locally (when updating dependencies, Composer does something "cleverer" than just pip install -r requirements.txt). This is why eth-hash is currently pinned in requirements_airflow.txt. Typically we have found that pinning eth-hash and/or eth-rlp may make things work, though Your Mileage May Vary.

See this issue for further ideas on how to unblock problems you may encounter.

Upload DAGs

> ./upload_dags.sh <airflow_bucket>

Running Tests

pip install \
    -r requirements_test.txt \
    -r requirements_local.txt \
    -r requirements_airflow.txt
pytest -vv -s

Running locally

A docker compose definition has been provided to easily spin up a local Airflow instance.

To build the required image:

docker compose build

To start Airflow:

docker compose up airflow

The instance requires the CLOUDSDK_CORE_PROJECT environment variable to be set in most cases. Airflow Variables can be defined in variables.json.

Creating Table Definition Files for Parsing Events and Function Calls

Read this article: https://medium.com/@medvedev1088/query-ens-and-0x-events-with-sql-in-google-bigquery-4d197206e644

More Information

You can follow the instructions here for Polygon DAGs https://github.com/blockchain-etl/polygon-etl. The architecture there is very similar to Ethereum so in most case substituting polygon for ethereum will work. Contributions to this README file for porting documentation from Polygon to Ethereum are welcome.

ethereum-etl-airflow's People

Contributors

alifier avatar amiloski avatar araa47 avatar askeluv avatar ayadigithub avatar charlielewisme avatar cheungringo avatar controlcrepeat avatar daishuai323 avatar ege77er avatar htkao avatar imrankhan37 avatar iter-io avatar ivigamberdiev avatar jasonbxu avatar kome12 avatar marcinja avatar markusbkoch avatar medvedev1088 avatar moranmalik avatar ninjascant avatar nothingnix avatar onurerkin avatar prateekeng avatar qshao-gauntlet avatar rstevens2022 avatar saurabhbikram avatar timnooren avatar yazzyyaz avatar zigzag2002 avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

ethereum-etl-airflow's Issues

Rewrite export_dag.py to use PythonOperator instead of BashOperator and update README

The scope of this task is:

  • Rewrite export_dag.py to use PythonOperator instead of BashOperator (see Notes for context). Easiest way is to use functions from ethereumetl.cli package, e.g. to export blocks and transactions call ethereumetl.cli.export_block_and_transactions(...).
  • Update environment variables if necessary, e.g. ETHEREUMETL_REPO_BRANCH is not needed etc.
  • Add logging (for BashOperator set -o xtrace && set -o pipefail && made all steps to be logged, for PythonOperator this has to be done explicitly).
  • Test export_dag.py in Cloud Composer. Test load_dag.py in Cloud Composer (it can break due to Python version upgrade).
  • Update README with instructions on how to configure and deploy the dags. The instructions should include high-level steps for deployment.

Notes:

Streaming smart contract events and topics (ABI processing in python instead of BQ)

Right now we have ~500 events that we parse. Every day ~500MB of log data is generated. This sums up to 250GB parsed in BigQuery daily, ~7.5 TB per month. Which totals to ~$37 per month.

With 1500 events we'll spend ~$100 per month.

An alternative to parsing logs in BigQuery is export JSON file to GCS download locally in Airflow and filter all events in a dataset at once, then load to BigQuery (free). There is PoC for how to parse logs in Python here blockchain-etl/ethereum-etl@6710e6b.

Use traces.csv instead of receipts.csv for exporting contracts

Right now contract addresses are exported from receipts.csv file https://github.com/blockchain-etl/ethereum-etl-airflow/blob/master/dags/export_dag.py#L76. Because of this we are missing contracts created in internal transactions.

Both contract addresses and their bytecode can be retrieved from traces.csv, where trace_type=created. Bytecode is stored in the output column https://github.com/blockchain-etl/ethereum-etl#tracescsv. This might require improving extract_csv_column.py so that it allows extracting multiple columns. An alternative solution is just to feed traces.csv file to extract_contracts.py.

BigQuery Aggregator_ETH_USD_event_AnswerUpdated event not functioning

Iโ€™m interested in the USD-ETH feed and would like to query the Aggregator_ETH_USD_event_AnswerUpdated event database to get updated ETH prices. When querying in BigQuery's console, the database doesn't appear to be reporting results. See the following basic query: https://console.cloud.google.com/bigquery?sq=299458138959:7e6e08cd92ea4d21a2f70fa00291e46d

The BTC-USD chainlink price-feed dataset (Aggregator_BTC_USD_event_AnswerUpdated) in BigQuery does appear to be functional, as shown in this very basic query on BigQueryโ€™s console:
https://console.cloud.google.com/bigquery?sq=299458138959:a47ba9d480c44e18ab04be11ae68782a

I canโ€™t pinpoint why the BTC query seems to report results while the ETH query does not, so there might be an issue within this data pipeline on airflow.

Add contract parsers for SushiSwap

Context

We add "table definition" for contract events so that we can regularly parse these events from Ethereum logs. One benefit of this is that the events become available in public BigQuery tables, under the blockchain-etl project.

For more details on this, please read How to get any Ethereum smart contract into BigQuery.

In this ticket we are looking to add datasets for SushiSwap.

Proposed solution

Add table definition files for SushiSwap by following the checklist below.

Checklist

  1. 1. Identify all smart contracts for the 13 farms listed on https://app.sushiswap.org/farms
  2. 2. Generate table definition files for all the contracts in your list using this Contract Parser
  3. 3. Create Pull Request containing all the table definition files

Please note

  • For step 2 that you may have to manually edit the table definition files if there are multiple contracts with the same name. If they have the same name, add the relevant token, e.g. _YFI.

Before applying for this bounty, please list the contracts identified in step 1, and confirm you've understood the process.

It may be helpful to look at past PRs that are labeled contract parsing.

Add new Argent contracts to parsers

Run these contracts through the Contract Parser and create table definition files.

Have to update manually with _v2 since we already have older Argent contracts.

List of contracts in scope:

"0xc4baabb5b7dff84aa8023183e3ca0ba3b2fee519","ArgentENSManager"
"0xda1756bb923af5d1a05e277cb1e54f1d0a127890","ArgentENSResolver"
"0xb6d64221451edbac7736d4c3da7fc827457dec03","BaseWallet"
"0xc43472062b4e3763c775956988cc883d4b863d91","CompoundRegistry"
"0x3d4a342fecd590f59cc2245d21f0de88063c97f9","DappRegistry"
"0x7383757c8a2f4cbc6a21a26e1f33a0fd95e7bb77","MakerRegistry"
"0xc17d432bd8e8850fd7b32b0270f5afac65db0105","ModuleRegistry"
"0xa5c603e1c27a96171487aea0649b01c56248d2e8","MultiSigWallet"
"0xe8a76d2f37fe50b6a95d27fb92291fe0b57407d3","TokenPriceProvider"
"0x40c84310ef15b0c0e5c69d25138e0e16e8000fe9","WalletFactory"

Support for parsing events from child contracts

Background

A lot of projects have "factory" contracts that themselves produce "child" contracts:

  • Maker produces vault contracts
  • Uniswap produces pool contracts
  • Aragon produces DAO contracts
  • Mintbase produces token contracts
    ... etc

Problem

Currently, our event parsing only supports parsing specific contracts. This doesn't work when there's an open-ended space of child contracts that can be created.

In other words, we can't conveniently parse events for all Uniswap pools, or all Aragon DAOs, etc.

Proposed Solution

Add support for child contract parsing. Specifically add another SQL template (similar to this) for parsing of children events, joining with the output of the create events. Then make use of this SQL template in the creating of parse DAGs.

Addition Details

Example of parsing a "child contract" event (Minted) for Mintbase:

-- parser to identify "baby contracts"
CREATE TEMP FUNCTION
  PARSE_CREATE(data STRING, topics ARRAY<STRING>)
  RETURNS STRUCT<`store` STRING, `name` STRING, `symbol` STRING>
  LANGUAGE js AS """
    var parsedEvent = {"anonymous": false, "inputs": [{"indexed": true, "name": "store", "type": "address"}, {"indexed": false, "name": "name", "type": "string"}, {"indexed": false, "name": "symbol", "type": "string"}], "name": "StoreLaunch", "type": "event"}
    return abi.decodeEvent(parsedEvent, data, topics, false);
"""
OPTIONS
  ( library="https://storage.googleapis.com/ethlab-183014.appspot.com/ethjs-abi.js" );

-- parser to identify the events
CREATE TEMP FUNCTION
  PARSE_EVENT(data STRING, topics ARRAY<STRING>)
  RETURNS STRUCT<`id` STRING, `metaId` STRING>
  LANGUAGE js AS """
    var parsedEvent = {"anonymous": false, "inputs": [{"indexed": false, "name": "id", "type": "uint256"}, {"indexed": false, "name": "metaId", "type": "string"}], "name": "Minted", "type": "event"}
    return abi.decodeEvent(parsedEvent, data, topics, false);
"""
OPTIONS
  ( library="https://storage.googleapis.com/ethlab-183014.appspot.com/ethjs-abi.js" );

WITH
parsed_create_contract AS -- actually pull out baby contracts
(SELECT
    logs.block_timestamp AS block_timestamp
    ,logs.block_number AS block_number
    ,logs.transaction_hash AS transaction_hash
    ,logs.log_index AS log_index
    ,PARSE_CREATE(logs.data, logs.topics) AS parsed
FROM `bigquery-public-data.crypto_ethereum.logs` AS logs
WHERE address = '0x0e6541374e9d7dee2c53c15a1a00fbe41c7b7198'
  AND topics[SAFE_OFFSET(0)] = '0x0ad944db4a8d4a4eac124e381a6c461e1019e3f3e9b4a6b861b6552ca9947d9b'
),
create_contract AS (
SELECT
     block_timestamp
     ,block_number
     ,transaction_hash
     ,log_index
    ,parsed.store AS `store`
    ,parsed.name AS `name`
    ,parsed.symbol AS `symbol`
FROM parsed_create_contract
),

parsed_event AS
(SELECT
    logs.block_timestamp AS block_timestamp
    ,logs.block_number AS block_number
    ,logs.transaction_hash AS transaction_hash
    ,logs.log_index AS log_index
    ,PARSE_EVENT(logs.data, logs.topics) AS parsed
    ,address -- this had to be added for joins
FROM `bigquery-public-data.crypto_ethereum.logs` AS logs
WHERE topics[SAFE_OFFSET(0)] = '0xadef11a3979b8ceb0573eb6ef0678134a09c23a0d94e5ea47cd18ac3a9fc0194'
)
SELECT
     parsed_event.block_timestamp
     ,parsed_event.block_number
     ,parsed_event.transaction_hash
     ,parsed_event.log_index
    ,create_contract.`store`
    ,create_contract.`name`
    ,create_contract.`symbol`
    ,parsed.id AS `id`
    ,parsed.metaId AS `metaId`
FROM parsed_event
JOIN create_contract 
ON parsed_event.address = create_contract.store

Create CloudStorage classes to abstract file upload/download

Right now we support S3 and GCS as cloud storage providers and file upload/download is done directly in export DAG. Instead 2 classes S3CloudStorage and GCSCloudStorage can be created with common interface to abstract upload/download logic.

As an example:

class S3CloudStorage:
     def download_file(...)
     def upload_file(...)

upload_to_gcs and download_from_gcs can be moved from export DAG to GCSCloudStorage.

Add Chainlink contracts to log parser

Context

We add "table definition" for contract events so that we can regularly parse these events from Ethereum logs. One benefit of this is that the events become available in public BigQuery tables, under the blockchain-etl project.

For more details on this, please read How to get any Ethereum smart contract into BigQuery.

In this ticket we are looking to add datasets for Chainlink.

Proposed solution

Add table definition files for Chainlink by following the checklist below.

Checklist

  1. 1. Identify all smart contracts for Chainlink
  2. 2. Generate table definition files for all the contracts in your list using this Contract Parser
  3. 3. Create Pull Request containing all the table definition files

Please note

  • For step 1 you could use a site like DappRadar or ETH Gas Station to find the contracts.
  • For step 2 that you may have to manually edit the table definition files if there are multiple contracts with the same name. You can add e.g. _v2 so contract becomes contract_v2.

Before applying for this bounty, please list the contracts identified in step 1, and confirm you've understood the process.

It may be helpful to look at past PRs that are labeled contract parsing.

Add all KyberNetwork versions in event parsing

Background

There are 4 different versions of the KyberNetwork contract.

Contract Address: 0x65bF64Ff5f51272f729BDcD7AcFB00677ced86Cd
Past Contract Addresses:
v3 - 0x9ae49C0d7F8F9EF4B864e004FE86Ac8294E20950,
v2 - 0x91a502C678605fbCe581eae053319747482276b9,
v1 - 0x964F35fAe36d75B1e72770e244F6595B68508CF5

We currently parse 0x9ae49C0d7F8F9EF4B864e004FE86Ac8294E20950 -> KyberNetwork and 0x65bF64Ff5f51272f729BDcD7AcFB00677ced86Cd -> KyberNetwork_v2.

Proposal

Ensure we parse events from all versions, and align version naming as follows:

0x65bF64Ff5f51272f729BDcD7AcFB00677ced86Cd -> KyberNetwork_v4
0x9ae49C0d7F8F9EF4B864e004FE86Ac8294E20950 -> KyberNetwork_v3
0x91a502C678605fbCe581eae053319747482276b9 -> KyberNetwork_v2
0x964F35fAe36d75B1e72770e244F6595B68508CF5 -> KyberNetwork_v1

In practice that means we have to rename our current tables:

  1. KyberNetwork_v2 -> KyberNetwork_v4 for 0x65bF64Ff5f51272f729BDcD7AcFB00677ced86Cd
  2. KyberNetwork -> KyberNetwork_v3 for 0x9ae49C0d7F8F9EF4B864e004FE86Ac8294E20950

And add the two other ones:

  1. 0x91a502C678605fbCe581eae053319747482276b9
  2. 0x964F35fAe36d75B1e72770e244F6595B68508CF5

Missing`_history` table

I got the following error when running the parse_<dataset>_dag on airflow.

google.api_core.exceptions.NotFound: 404 Not found: Table <project_id>:<dataset_name>.BancorNetwork_event_FlashLoanCompleted_history was not found in location US

But I can't seem to find the code that initialises the table. Does anyone have any idea?

Implement table definition files for ENS events

  • Using table definition file schema defined here #17 implement table definition files for 0x contracts' events.
  • Using DAG builder, create the DAG for parsing ENS contracts' events.

The following contracts should be covered:

https://0x.org/wiki#Deployed-Addresses

Deployment on Airflow fails due to Dependencies

Hello! I've been trying to deploy this pipeline on GCP Composer and I hit pre-installed dependency issues.

I'll keep this thread updated as I work through all of them, but I'm attempting to get this pipeline deployed.

Specifically it starts with pre-installed google-ads requiring protobuf ~3.19.5 when >=3.20.0 is installed.

I'll update this thread once I find a set a of dependency versions that work :)

Add Curve.fi parsing

Curve.fi is a popular DEX for trading stablecoins (and BTC tokens). We should parse events from their contracts.

There might be more contracts but at least these ones should be parsed using https://contract-parser.d5.ai:

0x3b3ac5386837dc563660fb6a0937dfaa5924333b	Curve.fi: bCrv Token
0xb6c057591e073249f2d9d88ba59a46cfc9b59edb	Curve.fi: BUSD Deposit
0x79a8c46dea5ada233abaffd40f3a0a2b1e5a4f27	Curve.fi: BUSD Swap
0xc1db00a8e5ef7bfa476395cdbcc98235477cde4e	Curve.fi: Calc
0x845838df265dcd2c412a1dc9e959c7d08537f8a2	Curve.fi: cCrv Token
0xeb21209ae4c2c9ff2a86aca31e123764a3b6bc06	Curve.fi: Compound Deposit
0xa2b47e3d5c44877cca798226b7b8118f9bfb7a56	Curve.fi: Compound Swap
0xa50ccc70b6a011cffddf45057e39679379187287	Curve.fi: PAX Deposit
0x06364f10b501e868329afbc005b3492902d6c763	Curve.fi: PAX Swap
0xd905e2eaebe188fc92179b6350807d8bd91db0d8	Curve.fi: pCrv Token
0x7002b727ef8f5571cb5f9d70d13dbeeb4dfae9d1	Curve.fi: Registry
0x9fe350dfa5f66bc086243f21a8f0932514316627	Curve.fi: Ren Adapter
0x26d9980571e77ffb0349f9c801dd7ca9951fb656	Curve.fi: Ren Adapter 2
0x73ab2bd10ad10f7174a1ad5afae3ce3d991c5047	Curve.fi: Ren Adapter 3
0x93054188d876f558f4a66b2ef1d97d16edf0895b	Curve.fi: REN Swap
0x49849c98ae39fff122806c06791fa73784fb3675	Curve.fi: renCrv Token
0x104c1e66c67c385e6095ffcc6227d75c761dc019	Curve.fi: sBTC Adapter
0x02b3f51ac9202aa19be63d61a8c681579d6e3a51	Curve.fi: sBTC Adapter 2
0xaeade605d01fe9a8e9c4b3aa0130a90d62167029	Curve.fi: sBTC Adapter 3
0x7fc77b5c7614e1533320ea6ddc2eb61fa00a9714	Curve.fi: sBTC Swap
0x075b1bb99792c9e1041ba13afef80c91a1e70fb3	Curve.fi: sbtcCrv Token
0xc25a3a3b969415c80451098fa907ec722572917f	Curve.fi: sCrv Token
0xfcba3e75865d2d561be8d220616520c171f12851	Curve.fi: sUSD v2 Deposit
0xa5407eae9ba41422680e2e00537571bcc53efbfd	Curve.fi: sUSD v2 Swap
0x9fc689ccada600b6df723d9e47d84d76664a1f23	Curve.fi: tCrv Token
0xac795d2c97e60df6a99ff1c814727302fd747a80	Curve.fi: USDT Deposit
0x52ea46506b9cc5ef470c5bf89f17dc28bb35d85c	Curve.fi: USDT Swap
0xbbc81d23ea2c3ec7e56d39296f0cbb648873a5d3	Curve.fi: y Deposit
0x45f783cce6b7ff23b2ab2d70e416cdb7d6055f51	Curve.fi: y Swap
0xdf5e0e81dff6faf3a7e52ba697820c5e32d806a8	Curve.fi: yCrv Token

Chainlink parsing pointed at the wrong contracts

Here is the current ETH/USD Answer Updated address:

However the Chainlink Price Feeds documentation points at 0xb022E2970b3501d8d83eD07912330d178543C1eB which proxies to the aggregator 0xb95188f011e49a60fc6c743b1bc93b38651a204e


0x79febf6b9f76853edbcbc913e6aae8232cfb9de9 was last used roughly May 11 2020 (shortly after this code was merged)

image

Which is consistent when data was last reported in blockchain-etl:

image


I'm not sure which other events are also stale but spot checking a few most seemed to point to old contracts. I will see if anyone from the Chainlink community is interested in fixing this.

Implement table definition files for 0x events

  • Using table definition file schema defined here #17 implement table definition files for 0x contracts' events.
  • Using DAG builder, create the DAG for parsing 0x contracts' events.

The following contracts should be covered:

  • Exchange: 0x080bf510fcbf18b91105470639e9561022937712
  • WETH9 (EtherToken): 0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2
  • ZRXToken: 0xe41d2489571d322189246dafa5ebde1f4699f498

https://0x.org/wiki#Deployed-Addresses

Add Nuo Network contracts to log parser

Context

We add "table definition" for contract events so that we can regularly parse these events from Ethereum logs. One benefit of this is that the events become available in public BigQuery tables, under the blockchain-etl project.

For more details on this, please read How to get any Ethereum smart contract into BigQuery.

In this ticket we are looking to add datasets for Nuo Network.

Proposed solution

Add table definition files for Nuo Network by following the checklist below.

Checklist

  1. 1. Identify all smart contracts for Nuo Network
  2. 2. Generate table definition files for all the contracts in your list using this Contract Parser
  3. 3. Create Pull Request containing all the table definition files

Please note

  • For step 1 you could use a site like DappRadar to find the contracts.
  • For step 2 that you may have to manually edit the table definition files if there are multiple contracts with the same name. You can add e.g. _v2 so contract becomes contract_v2.

Before applying for this bounty, please list the contracts identified in step 1, and confirm you've understood the process.

It may be helpful to look at past PRs that are labeled contract parsing.

Add dYdX contracts to log parser

Context

We add "table definition" for contract events so that we can regularly parse these events from Ethereum logs. One benefit of this is that the events become available in public BigQuery tables, under the blockchain-etl project.

For more details on this, please read How to get any Ethereum smart contract into BigQuery.

In this ticket we are looking to add datasets for dYdX.

Proposed solution

Add table definition files for dYdX by following the checklist below.

Checklist

  1. 1. Identify all smart contracts for dYdX
  2. 2. Generate table definition files for all the contracts in your list using this Contract Parser
  3. 3. Create Pull Request containing all the table definition files

Please note

  • For step 1 you could use a site like DappRadar or ETH Gas Station to find the contracts.
  • For step 2 that you may have to manually edit the table definition files if there are multiple contracts with the same name. You can add e.g. _v2 so contract becomes contract_v2.
  • If the Contract Parser gives you an error, it might be because Etherscan doesn't have the contract ABI listed. Please check this and indicate here which contracts have this problem.

Before applying for this bounty, please list the contracts identified in step 1, and confirm you've understood the process.

It may be helpful to look at past PRs that are labeled contract parsing.

Add Aave to contract parser

Context

We add "table definition" for contract events so that we can regularly parse these events from Ethereum logs. One benefit of this is that the events become available in public BigQuery tables, under the blockchain-etl project.

For more details on this, please read How to get any Ethereum smart contract into BigQuery.

In this ticket we are looking to add datasets for Aave.

Proposed solution

Add table definition files for Aave by following the checklist below.

Checklist

  1. 1. Identify all smart contracts for Aave
  2. 2. Generate table definition files for all the contracts in your list using this Contract Parser
  3. 3. Create Pull Request containing all the table definition files

Please note

  • For step 1 you could use a site like DappRadar or ETH Gas Station to find the contracts.
  • For step 2 that you may have to manually edit the table definition files if there are multiple contracts with the same name. You can add e.g. _v2 so contract becomes contract_v2.

Before applying for this bounty, please list the contracts identified in step 1, and confirm you've understood the process.

It may be helpful to look at past PRs that are labeled contract parsing.

Missing `blocks` table when running first time

Hi,

I followed the steps for GCP and created the project and datasets. When running for first time, I get the error in ethereum_verify_streaming_dag

[2022-07-27 03:40:24,285] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: ethereum_verify_streaming_dag.verify_blocks_have_latest 2022-07-27T02:30:00+00:00 [queued]>
[2022-07-27 03:40:24,356] {taskinstance.py:670} INFO - Dependencies all met for <TaskInstance: ethereum_verify_streaming_dag.verify_blocks_have_latest 2022-07-27T02:30:00+00:00 [queued]>
[2022-07-27 03:40:24,356] {taskinstance.py:880} INFO - 
--------------------------------------------------------------------------------
[2022-07-27 03:40:24,356] {taskinstance.py:881} INFO - Starting attempt 6 of 6
[2022-07-27 03:40:24,356] {taskinstance.py:882} INFO - 
--------------------------------------------------------------------------------
[2022-07-27 03:40:24,392] {taskinstance.py:901} INFO - Executing <Task(BigQueryOperator): verify_blocks_have_latest> on 2022-07-27T02:30:00+00:00
[2022-07-27 03:40:24,406] {standard_task_runner.py:54} INFO - Started process 1648 to run task
[2022-07-27 03:40:24,718] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'ethereum_verify_streaming_dag', 'verify_blocks_have_latest', '2022-07-27T02:30:00+00:00', '--job_id', '33761', '--pool', 'default_pool', '--raw', '-sd', '/opt/airflow/dags/repo/dags/ethereum_verify_streaming_dag.py', '--cfg_path', '/tmp/tmp5tv7mx2g']
[2022-07-27 03:40:24,719] {standard_task_runner.py:78} INFO - Job 33761: Subtask verify_blocks_have_latest
[2022-07-27 03:40:24,884] {logging_mixin.py:120} INFO - Running <TaskInstance: ethereum_verify_streaming_dag.verify_blocks_have_latest 2022-07-27T02:30:00+00:00 [running]> on host airflow-worker-0.airflow-worker.etlapp.svc.cluster.local
[2022-07-27 03:40:25,024] {bigquery_operator.py:252} INFO - Executing: select if(
(
select timestamp_diff(
  current_timestamp(),
  (select max(timestamp)
  from `elaborate-baton-357506.crypto_ethereum.blocks` as blocks
  where date(timestamp) >= date_add('2022-07-27', INTERVAL -1 DAY)),
  MINUTE)
) < 1, 1,
cast((select 'Blocks are lagging by more than 1 minutes') as INT64))
[2022-07-27 03:40:25,953] {taskinstance.py:1150} ERROR - BigQuery job failed. Final error was: {'reason': 'notFound', 'message': 'Not found: Table elaborate-baton-357506:crypto_ethereum.blocks was not found in location asia-south1'}. The job was: {'kind': 'bigquery#job', 'etag': 'Q5nYP18pPfEi2HSIjDP7sg==', 'id': 'elaborate-baton-357506:asia-south1.job_kbdzE9BYIJP-ueO-O_trlXAuvRcw', 'selfLink': 'https://bigquery.googleapis.com/bigquery/v2/projects/elaborate-baton-357506/jobs/job_kbdzE9BYIJP-ueO-O_trlXAuvRcw?location=asia-south1', 'user_email': '[email protected]', 'configuration': {'query': {'query': "select if(\n(\nselect timestamp_diff(\n  current_timestamp(),\n  (select max(timestamp)\n  from `elaborate-baton-357506.crypto_ethereum.blocks` as blocks\n  where date(timestamp) >= date_add('2022-07-27', INTERVAL -1 DAY)),\n  MINUTE)\n) < 1, 1,\ncast((select 'Blocks are lagging by more than 1 minutes') as INT64))", 'priority': 'INTERACTIVE', 'useLegacySql': False}, 'jobType': 'QUERY'}, 'jobReference': {'projectId': 'elaborate-baton-357506', 'jobId': 'job_kbdzE9BYIJP-ueO-O_trlXAuvRcw', 'location': 'asia-south1'}, 'statistics': {'creationTime': '1658893225735', 'startTime': '1658893225826', 'endTime': '1658893225826'}, 'status': {'errorResult': {'reason': 'notFound', 'message': 'Not found: Table elaborate-baton-357506:crypto_ethereum.blocks was not found in location asia-south1'}, 'errors': [{'reason': 'notFound', 'message': 'Not found: Table elaborate-baton-357506:crypto_ethereum.blocks was not found in location asia-south1'}], 'state': 'DONE'}}
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 984, in _run_raw_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/contrib/operators/bigquery_operator.py", line 262, in execute
    job_id = self.bq_cursor.run_query(
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 915, in run_query
    return self.run_with_configuration(configuration)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/contrib/hooks/bigquery_hook.py", line 1347, in run_with_configuration
    raise Exception(
Exception: BigQuery job failed. Final error was: {'reason': 'notFound', 'message': 'Not found: Table elaborate-baton-357506:crypto_ethereum.blocks was not found in location asia-south1'}. The job was: {'kind': 'bigquery#job', 'etag': 'Q5nYP18pPfEi2HSIjDP7sg==', 'id': 'elaborate-baton-357506:asia-south1.job_kbdzE9BYIJP-ueO-O_trlXAuvRcw', 'selfLink': 'https://bigquery.googleapis.com/bigquery/v2/projects/elaborate-baton-357506/jobs/job_kbdzE9BYIJP-ueO-O_trlXAuvRcw?location=asia-south1', 'user_email': '[email protected]', 'configuration': {'query': {'query': "select if(\n(\nselect timestamp_diff(\n  current_timestamp(),\n  (select max(timestamp)\n  from `elaborate-baton-357506.crypto_ethereum.blocks` as blocks\n  where date(timestamp) >= date_add('2022-07-27', INTERVAL -1 DAY)),\n  MINUTE)\n) < 1, 1,\ncast((select 'Blocks are lagging by more than 1 minutes') as INT64))", 'priority': 'INTERACTIVE', 'useLegacySql': False}, 'jobType': 'QUERY'}, 'jobReference': {'projectId': 'elaborate-baton-357506', 'jobId': 'job_kbdzE9BYIJP-ueO-O_trlXAuvRcw', 'location': 'asia-south1'}, 'statistics': {'creationTime': '1658893225735', 'startTime': '1658893225826', 'endTime': '1658893225826'}, 'status': {'errorResult': {'reason': 'notFound', 'message': 'Not found: Table elaborate-baton-357506:crypto_ethereum.blocks was not found in location asia-south1'}, 'errors': [{'reason': 'notFound', 'message': 'Not found: Table elaborate-baton-357506:crypto_ethereum.blocks was not found in location asia-south1'}], 'state': 'DONE'}}
[2022-07-27 03:40:25,956] {taskinstance.py:1187} INFO - Marking task as FAILED. dag_id=ethereum_verify_streaming_dag, task_id=verify_blocks_have_latest, execution_date=20220727T023000, start_date=20220727T034024, end_date=20220727T034025
[2022-07-27 03:40:29,224] {local_task_job.py:102} INFO - Task exited with return code 1

I have the following DAGs running

Screenshot 2022-07-27 at 9 16 47 AM

What step prepared the tables?

Make parsed logs real-time

All datasets in blockchain-etl.* in BigQuery are updated daily currently. We can make them real-time by stitching historical and latest data parts as follows:

  1. <dataset>_internal.<table_name>_history table containing historical data and updated daily.
  2. <dataset>_internal.<table_name>_live view that parses data from the logs table and returns both historical and latest data.
  3. <dataset>.<table_name> view is created that stitches <dataset>_internal.<table_name>_history and <dataset>_internal.<table_name>_live together, which contains both historical and latest data:
select * from <dataset>_internal.<table_name>_history` where block_timestamp <= <previous_day>
union all
select * from <dataset>_internal.<table_name>_live` where block_timestamp > <previous_day>

Use ethers.js instead of ethjs-abi to parse logs and traces

Currently using ethjs-abi which is not maintained https://github.com/ethjs/ethjs-abi and causes "Number can only safely store up to 53 bits" error. ethers.js is a better alternative. ethers.js has been compiled for use in BigQuery https://github.com/blockchain-etl/ethers.js-bigquery

The issue in ethjs-abi causes error "Number can only safely store up to 53 bits" when parsing logs with indexed fields of type string: 31ca38c

Notes:

Add Axie Infinity events

Contracts:

  1. 0xb28a3dd24036151c819c6d401f7a222d9aa3671b
  2. 0xf4985070ce32b6b1994329df787d1acc9a2dd9e2
  3. 0x3d5be9a472d6b5c8d45b4b3a3bffb80e0c52ef15
  4. 0xf5b0a3efb8e8e4c201e2a935f110eaaf3ffecb8d
  5. 0x73d7b530d181ef957525c6fbe2ab8f28bf4f81cf
  6. 0x60ce035dc589c3fd185b224a7ca03c598948973b
  7. 0x92bf969865c80eda082fd5d8b4e28da4d58e1c3a
  8. 0x7a11462a2adaed5571b91e34a127e4cbf51b152c
  9. 0x2299a91cc0bffd8c7f71349da8ab03527b79724f
  10. 0x66536a95cc83ee672da58b6d3e57f7582184d5df

In brief, here's what you have to do:
For each contract above,

  1. Paste address into Contract Parser
  2. Use dataset name axie
  3. Download event files
  4. Add downloaded files to dags/resources/stages/parse/table_definitions/axie

Some contracts might have identical names - if so please rename so there's no duplication.

For more details, read this post.

List of contracts taken from: https://dappradar.com/app/282/axie-infinity (only included the Etherscan-verified ones).

add `contracts.bytecode_hash` field

Rationale for hashing the bytecode is that there are many addresses with identical code. We can find the unique set of contracts with less i/o if there's a hash field. This also allows for more efficient search for new contracts vs. all previous in combination with the date partition.

Can use FARM_FINGERPRINT for this:
https://cloud.google.com/bigquery/docs/reference/standard-sql/hash_functions#farm_fingerprint

Change needs to be here, etc:
https://github.com/blockchain-etl/ethereum-etl-airflow/blob/master/dags/resources/stages/raw/schemas/contracts.json#L9

Handle proxies in log parsing

Background:
Quite a few contracts are proxies for a separate ("target") address, and in many cases we can't parse the event logs of these in the same way as we normally do.

Example: https://etherscan.io/address/0x49d716DFe60b37379010A75329ae09428f17118d

The above example run through the Contract Parser doesn't yield the events of the target address that we expect.

However, when pasting in the target contract 0x5d6bdeb18ba6e2be5cb87c7cfaee9cd07d000428, we get the set of events we are interested in.

But the target's BQ SQL doesn't yield any events! When we use the BQ generated for the target address, and the address for the proxy contract, we get the result we want.

Proposed solution:
Add a (possibly null) "proxy" field which when non-empty overrides the address field in our SQL template.

Failure building polygon_etl

Step #1: airflow variables command error: argument COMMAND: invalid choice: '/home/airflow/gcs/data//airflow_variables.json' (choose from 'delete', 'export', 'get', 'import', 'list', 'set'), see help above.
Step #1: command terminated with exit code 2
Step #1: ERROR: (gcloud.composer.environments.run) kubectl returned non-zero status code.
Step #1: usage: airflow variables [-h] COMMAND ...

[](url
cloud_build_84fbf515-6ca2-44f8-bf27-0201bfbb0063.log
)

Any help will be greatly appreciated

Missing logs_by_topic table

Hi all,

When attempting to run the DAGs for the first time, we are unable to see where the "logs_by_topic" table is getting populated for our log event.

WITH parsed_logs AS
(SELECT
    logs.block_timestamp AS block_timestamp
    ,logs.block_number AS block_number
    ,logs.transaction_hash AS transaction_hash
    ,logs.log_index AS log_index
    ,logs.address AS contract_address
    ,`<project-id>-internal.ethereum_<entity>_blockchain_etl.parse_<smart-contract>_event_<event-name>`(logs.data, logs.topics) AS parsed
FROM `<project-id>-internal.crypto_ethereum_partitioned.logs_by_topic_0x8c5` AS logs
WHERE

  address in (lower('<address>'))

  AND topics[SAFE_OFFSET(0)] = '<topic>'


  -- live


  )
SELECT
     block_timestamp
     ,block_number
     ,transaction_hash
     ,log_index
     ,contract_address

    ,parsed.owner AS `owner`
    ,parsed.spender AS `spender`
    ,parsed.value AS `value`
FROM parsed_logs
WHERE parsed IS NOT NULL

The section in question is this guy:

...
FROM `<project-id>-internal.crypto_ethereum_partitioned.logs_by_topic_0x8c5` AS logs
...

We know that this is part of the "LIVE" realtime update section, but what is actually populating the table with the topics that we specify? Is this being done in a different repo?

Implement DAG builder for parsing Ethereum logs

The DAG builder is simply a Python function that generates an Airflow DAG given a list of parameters. An example of a DAG builder can be found here.

The input for the DAG builder for parsing Ethereum logs is a list of json files, each of which is a table definition file. An example table definition file is given below:

{
    "table": {
        "project_name": "blockchain-etl",
        "dataset_name": "zeroex",
        "table_name": "Exchange_event_LogFill",
        "table_description": "Lorem ipsum.",
        "schema": [
            {
                "name": "block_timestamp",
                "description": "Lorem ipsum.",
                "type": "TIMESTAMP"
            },
            {
                "name": "maker",
                "type": "STRING"
            },
            {
                "name": "taker",
                "type": "STRING"
            },
            {
                "name": "feeRecipient",
                "type": "STRING"
            },
            {
                "name": "makerToken",
                "type": "STRING"
            },
            {
                "name": "takerToken",
                "type": "STRING"
            },
            {
                "name": "filledMakerTokenAmount",
                "type": "STRING"
            },
            {
                "name": "filledTakerTokenAmount",
                "type": "STRING"
            },
            {
                "name": "paidMakerFee",
                "type": "STRING"
            },
            {
                "name": "paidTakerFee",
                "type": "STRING"
            },
            {
                "name": "tokens",
                "type": "STRING"
            },
            {
                "name": "orderHash",
                "type": "STRING"
            }
        ]
    },
    "parser": {
        "type": "log",
        "contract_address": "0x12459c951127e0c374ff9105dda097662a027093",
        "abi": {
            "anonymous": false,
            "inputs": [
                {
                    "indexed": true,
                    "name": "maker",
                    "type": "address"
                },
                {
                    "indexed": false,
                    "name": "taker",
                    "type": "address"
                },
                {
                    "indexed": true,
                    "name": "feeRecipient",
                    "type": "address"
                },
                {
                    "indexed": false,
                    "name": "makerToken",
                    "type": "address"
                },
                {
                    "indexed": false,
                    "name": "takerToken",
                    "type": "address"
                },
                {
                    "indexed": false,
                    "name": "filledMakerTokenAmount",
                    "type": "uint256"
                },
                {
                    "indexed": false,
                    "name": "filledTakerTokenAmount",
                    "type": "uint256"
                },
                {
                    "indexed": false,
                    "name": "paidMakerFee",
                    "type": "uint256"
                },
                {
                    "indexed": false,
                    "name": "paidTakerFee",
                    "type": "uint256"
                },
                {
                    "indexed": true,
                    "name": "tokens",
                    "type": "bytes32"
                },
                {
                    "indexed": false,
                    "name": "orderHash",
                    "type": "bytes32"
                }
            ],
            "name": "LogFill",
            "type": "event"
        },
        "field_mapping": {
            "TODO": "if necessary define rules for mapping abi fields to BigQuery table columns"
        }
    }
}

The output is an Airflow DAG:

  • For each table definition file a PythonOperator task should be created which executes a BigQuery query job with destination table (an example can be found here).
  • The SQL for the BigQuery job should be generated from table and parser definitions, using Jinja template (an example Jinja template can be found here. An example log parsing query can be found here).

Too many parse DAGs cause Airflow to be stuck because of task scheduling contention

Problem: Airflow schedules parse DAGs before load DAG is finished. When more than 15 parse DAGs are waiting for load DAG to finish, it occupies all available execution slots which causes the load DAG itself to stall. A kind of a deadlock.

Possible solutions:

  • Separate parse DAGs to another Airflow instance.
  • Put back parse DAGs by a few hours to ensure they start only after load DAG is finished.

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.