Git Product home page Git Product logo

Comments (7)

mlarose avatar mlarose commented on July 23, 2024

Hi @thunderbug1

As is your example can't be used to reproduce the bug because there are too many missing, omitted parts.

I did create a contrived example to reproduce your claim (see below).

Perhaps one of the issues you could have run into:

  1. If you want to inject your resource from the arg list, it should be derived from ConfigurableResource (i surmise zeep.Client didn't) AND you should not use the required_resource_keys at the same time.
  2. If you prefer to use the required_resource_keys, then you can access your resource as context.resources.<resource_name>

Also note that the repository semantic is still supported but deprecated and you should prefer the Definitions approach instead on newer code. I did try with both, though.

import math

from dagster import asset, Output, resource, DynamicPartitionsDefinition, with_resources, repository, \
    ConfigurableResource

ticket_partitions = DynamicPartitionsDefinition(name="ticket_partitions")


class TicketClientResource(ConfigurableResource):
    def get_tickets(self, tickets: list[str], slice: int):
        return tickets[slice*5:(slice+1)*5]


@resource()
def ticket_client() -> TicketClientResource:
    return TicketClientResource()


@asset()
def ticket_numbers(context) -> Output:
    ticket_ids = []
    for i in range(18):
        ticket_ids.append(f"Ticket #{i}")

    slice_size = 5
    nb_slices = math.ceil(len(ticket_ids)/slice_size)
    for i in range(nb_slices):
        context.instance.add_dynamic_partitions(ticket_partitions.name, [f"{i}"])
    return Output(value=ticket_ids, metadata={"json_key_count": len(ticket_ids), "slice_size": slice_size, "nb_slices": nb_slices})


@asset(
    partitions_def=ticket_partitions,
    # required_resource_keys={"client"}, if you want to use from context instead.
)
def tickets(context, ticket_numbers, client: TicketClientResource):
    partition = context.partition_key
    ticket_ids = client.get_tickets(ticket_numbers, slice=int(partition))
    context.log.info(f"Ticket ids: {ticket_ids}")
    return Output(value=ticket_ids, metadata={"tickets": ticket_ids})

# Also works with the newer definition semantic:

# defs = Definitions(
#     assets=[tickets, ticket_numbers],
#     resources={
#         'client': ticket_client()
#     },
# )

# Or with the repository semantic which the OP was using:

@repository
def repo():
    return [with_resources(
        [tickets, ticket_numbers],
        resource_defs={
            "client": ticket_client
        }
    )]

from dagster.

thunderbug1 avatar thunderbug1 commented on July 23, 2024

Thank you for the fast reply!

If you want to inject your resource from the arg list, it should be derived from ConfigurableResource (i surmise zeep.Client didn't) AND you should not use the required_resource_keys at the same time.

That is correct, the error I got did not really help to determine this, maybe an assertion should be added to the code to ensure this and that a good error message is returned.

you should not use the required_resource_keys at the same time

oh ok, I tried many different ways, that was just one of them.

Also note that the repository semantic is still supported but deprecated and you should prefer the Definitions approach instead on newer code. I did try with both, though.

it is a bit confusing which parts of the SDK are deprecated and which are not.
I am using Repository here based on this answer
After introducing the partitioning I needed to add define_asset_job for the partitioned assets or otherwise it would not load anymore.
The define_asset_job function, however, seems to be incompatible with a Definition for the same assets. Do you have a suggestion for this?

I implemented the code you suggested but I get this error:

image

Here is the code I used:

import asyncio
import csv
import os
from typing import Any, Dict, List

import zeep
from ticket_analysis.omnitracker_api import get_client, get_ticket, get_ticket_emails
from ticket_analysis.llm import translate_email

from dagster import AssetExecutionContext, ConfigurableResource, AssetObservation, AssetSelection, Config, DagsterType, Field, ResourceDefinition, Definitions, DynamicPartitionsDefinition, String, asset, Output, MetadataValue, define_asset_job, repository, resource, with_resources
import logging
logging.basicConfig(format="%(levelname)s - %(name)s -  %(message)s", level=logging.INFO)


ticket_partitions = DynamicPartitionsDefinition(name="ticket_id")

class TicketClientResource(ConfigurableResource):
    def __init__(self):
        self.client = get_client()
    def get_tickets(self, tickets: list[Dict[str, str]], partition_ticket_number: str)  -> List[Dict[str, Any]]:
        fetched_tickets = get_ticket(self.client, partition_ticket_number)
        return [fetched_tickets]

@resource()
def omnitracketAPI_client() -> TicketClientResource:
    return TicketClientResource()

def ticket_numbers(context: AssetExecutionContext):
    dataset_path = os.path.join(os.path.dirname(__file__), "static_files/Ticketnummern_2024-03-20.csv")
    with open(dataset_path, "r") as fd:
        ticket_ids : List[Dict[str, str]] = [row for row in csv.DictReader(fd)]
    context.instance.add_dynamic_partitions(ticket_partitions.name, [row["ticket_number"] for row in ticket_ids])
    return Output(value=ticket_ids, metadata={"samples": MetadataValue.json(ticket_ids[:10])})

@asset(partitions_def=ticket_partitions, 
    #    required_resource_keys={"client"}
       )
def tickets(context: AssetExecutionContext, 
            ticket_numbers: List[Dict[str, str]], 
            client : TicketClientResource):
    partition_ticket_number = context.partition_key

    ticket = client.get_tickets(ticket_numbers, partition_ticket_number)[0]
    return Output(value=ticket, metadata={"json_key_count": MetadataValue.json(len(ticket.keys()))})

@repository
def repo():
    return [with_resources(
    [tickets, ticket_numbers],
    resource_defs={
        "client": omnitracketAPI_client
    },
), 
define_asset_job(
    name="download tickets from API",
    selection=AssetSelection.assets(tickets, ticket_numbers),
    partitions_def=ticket_partitions,
)]

from dagster.

mlarose avatar mlarose commented on July 23, 2024

can you provide an example that reproduces your error without requiring all unrelated imports?

from dagster.

thunderbug1 avatar thunderbug1 commented on July 23, 2024

It seems as if I can't really reproduce that error since I rebuilt my devcontainer, however, now I get a different error. Here is the error message and the minimum example:

dagster._core.errors.DagsterUserCodeLoadError: Error occurred during the loading of Dagster definitions in
executable_path=/usr/local/bin/python, module_name=ticket_analysis, working_directory=/code/dagster/ticket_analysis

  File "/usr/local/lib/python3.12/site-packages/dagster/_grpc/server.py", line 408, in __init__
    self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
                                                              ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dagster/_grpc/server.py", line 235, in __init__
    with user_code_error_boundary(
  File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
    self.gen.throw(value)
  File "/usr/local/lib/python3.12/site-packages/dagster/_core/errors.py", line 297, in user_code_error_boundary
    raise new_error from e

The above exception was caused by the following exception:
AttributeError: 'function' object has no attribute 'with_resources'

  File "/usr/local/lib/python3.12/site-packages/dagster/_core/errors.py", line 287, in user_code_error_boundary
    yield
  File "/usr/local/lib/python3.12/site-packages/dagster/_grpc/server.py", line 242, in __init__
    loadable_targets = get_loadable_targets(
                       ^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dagster/_grpc/utils.py", line 50, in get_loadable_targets
    else loadable_targets_from_python_module(module_name, working_directory)
         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dagster/_core/workspace/autodiscovery.py", line 35, in loadable_targets_from_python_module
    module = load_python_module(
             ^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dagster/_core/code_pointer.py", line 134, in load_python_module
    return importlib.import_module(module_name)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in import_module
    return _bootstrap._gcd_import(name[level:], package, level)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
  File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
  File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
  File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
  File "<frozen importlib._bootstrap_external>", line 995, in exec_module
  File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
  File "/code/dagster/ticket_analysis/ticket_analysis/__init__.py", line 3, in <module>
    from . import assets
  File "/code/dagster/ticket_analysis/ticket_analysis/assets.py", line 59, in <module>
    @repository
     ^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dagster/_core/definitions/decorators/repository_decorator.py", line 405, in repository
    return _Repository()(definitions_fn)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dagster/_core/definitions/decorators/repository_decorator.py", line 118, in __call__
    repository_definitions = fn()
                             ^^^^
  File "/code/dagster/ticket_analysis/ticket_analysis/assets.py", line 61, in repo
    return [with_resources(
            ^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/site-packages/dagster/_core/execution/with_resources.py", line 108, in with_resources
    transformed_defs.append(cast(T, definition.with_resources(resource_defs)))
                                    ^^^^^^^^^^^^^^^^^^^^^^^^^

import csv
import os
from typing import Any, Dict, List

from ticket_analysis.omnitracker_api import get_client, get_ticket
import zeep 
from dagster import AssetExecutionContext, ConfigurableResource, AssetSelection, DynamicPartitionsDefinition, asset, Output, MetadataValue, define_asset_job, repository, resource, with_resources
import logging
logging.basicConfig(format="%(levelname)s - %(name)s -  %(message)s", level=logging.INFO)


ticket_partitions = DynamicPartitionsDefinition(name="ticket_id")

class TicketClientResource(ConfigurableResource):
    def get_tickets(self, tickets: list[Dict[str, str]], partition_ticket_number: str)  -> List[Dict[str, Any]]:
        self.client : zeep.Client = get_client()
        fetched_tickets = get_ticket(self.client, partition_ticket_number)
        return [fetched_tickets]

@resource()
def omnitracketAPI_client() -> TicketClientResource:
    return TicketClientResource()

def ticket_numbers(context: AssetExecutionContext):
    dataset_path = os.path.join(os.path.dirname(__file__), "static_files/closed_tickets.csv")
    with open(dataset_path, "r") as fd:
        ticket_ids : List[Dict[str, str]] = [row for row in csv.DictReader(fd)]
    context.instance.add_dynamic_partitions(ticket_partitions.name, [row["ticket_number"] for row in ticket_ids])
    return Output(value=ticket_ids, metadata={"samples": MetadataValue.json(ticket_ids[:10])})

@asset(partitions_def=ticket_partitions, 
    #    required_resource_keys={"client"}
       )
def tickets(context: AssetExecutionContext, 
            ticket_numbers: List[Dict[str, str]], 
            client : TicketClientResource):
    partition_ticket_number = context.partition_key

    ticket = client.get_tickets(ticket_numbers, partition_ticket_number)[0]
    return Output(value=ticket, metadata={"json_key_count": MetadataValue.json(len(ticket.keys()))})

@repository
def repo():
    return [with_resources(
        [tickets, ticket_numbers],
        resource_defs={
            "client": omnitracketAPI_client
        }
    ),
    define_asset_job(
    name="download tickets from API",
    selection=AssetSelection.assets(tickets, ticket_numbers),
    partitions_def=ticket_partitions,
)
        ]

from dagster.

mlarose avatar mlarose commented on July 23, 2024

Hi @thunderbug1,

I can't really help you starting from this example. Once I mock all your dependencies (ticket_analysis. omnitracker_api, zeep, the csv file that I don't have) and fix code errors (define_asset_job should throw you an error about "download tickets from API" not being a valid name, you missed an @asset decorator on the ticket_numbers function), I end up in a state where I don't reproduce your error.

You would have to provide an example that can be loaded as is and that does reproduce the error you are mentioning.

from dagster.

thunderbug1 avatar thunderbug1 commented on July 23, 2024

oh good point, I must have removed the @asset decorator when I removed unneeded parts. When I add it to the code above I get the error from before again:
image

Is it maybe depending on the dagster version? I am using 1.6.11

from dagster.

mlarose avatar mlarose commented on July 23, 2024

I had checked out 1.6.11 as well.

from dagster.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.