Comments (7)
Hi @thunderbug1
As is your example can't be used to reproduce the bug because there are too many missing, omitted parts.
I did create a contrived example to reproduce your claim (see below).
Perhaps one of the issues you could have run into:
- If you want to inject your resource from the arg list, it should be derived from
ConfigurableResource
(i surmise zeep.Client didn't) AND you should not use therequired_resource_keys
at the same time. - If you prefer to use the required_resource_keys, then you can access your resource as
context.resources.<resource_name>
Also note that the repository semantic is still supported but deprecated and you should prefer the Definitions approach instead on newer code. I did try with both, though.
import math
from dagster import asset, Output, resource, DynamicPartitionsDefinition, with_resources, repository, \
ConfigurableResource
ticket_partitions = DynamicPartitionsDefinition(name="ticket_partitions")
class TicketClientResource(ConfigurableResource):
def get_tickets(self, tickets: list[str], slice: int):
return tickets[slice*5:(slice+1)*5]
@resource()
def ticket_client() -> TicketClientResource:
return TicketClientResource()
@asset()
def ticket_numbers(context) -> Output:
ticket_ids = []
for i in range(18):
ticket_ids.append(f"Ticket #{i}")
slice_size = 5
nb_slices = math.ceil(len(ticket_ids)/slice_size)
for i in range(nb_slices):
context.instance.add_dynamic_partitions(ticket_partitions.name, [f"{i}"])
return Output(value=ticket_ids, metadata={"json_key_count": len(ticket_ids), "slice_size": slice_size, "nb_slices": nb_slices})
@asset(
partitions_def=ticket_partitions,
# required_resource_keys={"client"}, if you want to use from context instead.
)
def tickets(context, ticket_numbers, client: TicketClientResource):
partition = context.partition_key
ticket_ids = client.get_tickets(ticket_numbers, slice=int(partition))
context.log.info(f"Ticket ids: {ticket_ids}")
return Output(value=ticket_ids, metadata={"tickets": ticket_ids})
# Also works with the newer definition semantic:
# defs = Definitions(
# assets=[tickets, ticket_numbers],
# resources={
# 'client': ticket_client()
# },
# )
# Or with the repository semantic which the OP was using:
@repository
def repo():
return [with_resources(
[tickets, ticket_numbers],
resource_defs={
"client": ticket_client
}
)]
from dagster.
Thank you for the fast reply!
If you want to inject your resource from the arg list, it should be derived from ConfigurableResource (i surmise zeep.Client didn't) AND you should not use the required_resource_keys at the same time.
That is correct, the error I got did not really help to determine this, maybe an assertion should be added to the code to ensure this and that a good error message is returned.
you should not use the required_resource_keys at the same time
oh ok, I tried many different ways, that was just one of them.
Also note that the repository semantic is still supported but deprecated and you should prefer the Definitions approach instead on newer code. I did try with both, though.
it is a bit confusing which parts of the SDK are deprecated and which are not.
I am using Repository here based on this answer
After introducing the partitioning I needed to add define_asset_job for the partitioned assets or otherwise it would not load anymore.
The define_asset_job function, however, seems to be incompatible with a Definition for the same assets. Do you have a suggestion for this?
I implemented the code you suggested but I get this error:
Here is the code I used:
import asyncio
import csv
import os
from typing import Any, Dict, List
import zeep
from ticket_analysis.omnitracker_api import get_client, get_ticket, get_ticket_emails
from ticket_analysis.llm import translate_email
from dagster import AssetExecutionContext, ConfigurableResource, AssetObservation, AssetSelection, Config, DagsterType, Field, ResourceDefinition, Definitions, DynamicPartitionsDefinition, String, asset, Output, MetadataValue, define_asset_job, repository, resource, with_resources
import logging
logging.basicConfig(format="%(levelname)s - %(name)s - %(message)s", level=logging.INFO)
ticket_partitions = DynamicPartitionsDefinition(name="ticket_id")
class TicketClientResource(ConfigurableResource):
def __init__(self):
self.client = get_client()
def get_tickets(self, tickets: list[Dict[str, str]], partition_ticket_number: str) -> List[Dict[str, Any]]:
fetched_tickets = get_ticket(self.client, partition_ticket_number)
return [fetched_tickets]
@resource()
def omnitracketAPI_client() -> TicketClientResource:
return TicketClientResource()
def ticket_numbers(context: AssetExecutionContext):
dataset_path = os.path.join(os.path.dirname(__file__), "static_files/Ticketnummern_2024-03-20.csv")
with open(dataset_path, "r") as fd:
ticket_ids : List[Dict[str, str]] = [row for row in csv.DictReader(fd)]
context.instance.add_dynamic_partitions(ticket_partitions.name, [row["ticket_number"] for row in ticket_ids])
return Output(value=ticket_ids, metadata={"samples": MetadataValue.json(ticket_ids[:10])})
@asset(partitions_def=ticket_partitions,
# required_resource_keys={"client"}
)
def tickets(context: AssetExecutionContext,
ticket_numbers: List[Dict[str, str]],
client : TicketClientResource):
partition_ticket_number = context.partition_key
ticket = client.get_tickets(ticket_numbers, partition_ticket_number)[0]
return Output(value=ticket, metadata={"json_key_count": MetadataValue.json(len(ticket.keys()))})
@repository
def repo():
return [with_resources(
[tickets, ticket_numbers],
resource_defs={
"client": omnitracketAPI_client
},
),
define_asset_job(
name="download tickets from API",
selection=AssetSelection.assets(tickets, ticket_numbers),
partitions_def=ticket_partitions,
)]
from dagster.
can you provide an example that reproduces your error without requiring all unrelated imports?
from dagster.
It seems as if I can't really reproduce that error since I rebuilt my devcontainer, however, now I get a different error. Here is the error message and the minimum example:
dagster._core.errors.DagsterUserCodeLoadError: Error occurred during the loading of Dagster definitions in
executable_path=/usr/local/bin/python, module_name=ticket_analysis, working_directory=/code/dagster/ticket_analysis
File "/usr/local/lib/python3.12/site-packages/dagster/_grpc/server.py", line 408, in __init__
self._loaded_repositories: Optional[LoadedRepositories] = LoadedRepositories(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dagster/_grpc/server.py", line 235, in __init__
with user_code_error_boundary(
File "/usr/local/lib/python3.12/contextlib.py", line 158, in __exit__
self.gen.throw(value)
File "/usr/local/lib/python3.12/site-packages/dagster/_core/errors.py", line 297, in user_code_error_boundary
raise new_error from e
The above exception was caused by the following exception:
AttributeError: 'function' object has no attribute 'with_resources'
File "/usr/local/lib/python3.12/site-packages/dagster/_core/errors.py", line 287, in user_code_error_boundary
yield
File "/usr/local/lib/python3.12/site-packages/dagster/_grpc/server.py", line 242, in __init__
loadable_targets = get_loadable_targets(
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dagster/_grpc/utils.py", line 50, in get_loadable_targets
else loadable_targets_from_python_module(module_name, working_directory)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dagster/_core/workspace/autodiscovery.py", line 35, in loadable_targets_from_python_module
module = load_python_module(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dagster/_core/code_pointer.py", line 134, in load_python_module
return importlib.import_module(module_name)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/importlib/__init__.py", line 90, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "<frozen importlib._bootstrap>", line 1387, in _gcd_import
File "<frozen importlib._bootstrap>", line 1360, in _find_and_load
File "<frozen importlib._bootstrap>", line 1331, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 935, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 995, in exec_module
File "<frozen importlib._bootstrap>", line 488, in _call_with_frames_removed
File "/code/dagster/ticket_analysis/ticket_analysis/__init__.py", line 3, in <module>
from . import assets
File "/code/dagster/ticket_analysis/ticket_analysis/assets.py", line 59, in <module>
@repository
^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dagster/_core/definitions/decorators/repository_decorator.py", line 405, in repository
return _Repository()(definitions_fn)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dagster/_core/definitions/decorators/repository_decorator.py", line 118, in __call__
repository_definitions = fn()
^^^^
File "/code/dagster/ticket_analysis/ticket_analysis/assets.py", line 61, in repo
return [with_resources(
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/dagster/_core/execution/with_resources.py", line 108, in with_resources
transformed_defs.append(cast(T, definition.with_resources(resource_defs)))
^^^^^^^^^^^^^^^^^^^^^^^^^
import csv
import os
from typing import Any, Dict, List
from ticket_analysis.omnitracker_api import get_client, get_ticket
import zeep
from dagster import AssetExecutionContext, ConfigurableResource, AssetSelection, DynamicPartitionsDefinition, asset, Output, MetadataValue, define_asset_job, repository, resource, with_resources
import logging
logging.basicConfig(format="%(levelname)s - %(name)s - %(message)s", level=logging.INFO)
ticket_partitions = DynamicPartitionsDefinition(name="ticket_id")
class TicketClientResource(ConfigurableResource):
def get_tickets(self, tickets: list[Dict[str, str]], partition_ticket_number: str) -> List[Dict[str, Any]]:
self.client : zeep.Client = get_client()
fetched_tickets = get_ticket(self.client, partition_ticket_number)
return [fetched_tickets]
@resource()
def omnitracketAPI_client() -> TicketClientResource:
return TicketClientResource()
def ticket_numbers(context: AssetExecutionContext):
dataset_path = os.path.join(os.path.dirname(__file__), "static_files/closed_tickets.csv")
with open(dataset_path, "r") as fd:
ticket_ids : List[Dict[str, str]] = [row for row in csv.DictReader(fd)]
context.instance.add_dynamic_partitions(ticket_partitions.name, [row["ticket_number"] for row in ticket_ids])
return Output(value=ticket_ids, metadata={"samples": MetadataValue.json(ticket_ids[:10])})
@asset(partitions_def=ticket_partitions,
# required_resource_keys={"client"}
)
def tickets(context: AssetExecutionContext,
ticket_numbers: List[Dict[str, str]],
client : TicketClientResource):
partition_ticket_number = context.partition_key
ticket = client.get_tickets(ticket_numbers, partition_ticket_number)[0]
return Output(value=ticket, metadata={"json_key_count": MetadataValue.json(len(ticket.keys()))})
@repository
def repo():
return [with_resources(
[tickets, ticket_numbers],
resource_defs={
"client": omnitracketAPI_client
}
),
define_asset_job(
name="download tickets from API",
selection=AssetSelection.assets(tickets, ticket_numbers),
partitions_def=ticket_partitions,
)
]
from dagster.
Hi @thunderbug1,
I can't really help you starting from this example. Once I mock all your dependencies (ticket_analysis. omnitracker_api, zeep, the csv file that I don't have) and fix code errors (define_asset_job
should throw you an error about "download tickets from API" not being a valid name, you missed an @asset decorator on the ticket_numbers function), I end up in a state where I don't reproduce your error.
You would have to provide an example that can be loaded as is and that does reproduce the error you are mentioning.
from dagster.
oh good point, I must have removed the @asset decorator when I removed unneeded parts. When I add it to the code above I get the error from before again:
Is it maybe depending on the dagster version? I am using 1.6.11
from dagster.
I had checked out 1.6.11 as well.
from dagster.
Related Issues (20)
- Try to open global asset lineage results in TypeError: Cannot read properties of undefined (reading 'height') HOT 2
- `CheckSpec`s with `blocking` in `multi_asset`s stop the rest of the function from executing HOT 1
- DagsterDbtTranslator.get_asset_key / get_group_name should be non-static HOT 1
- AttributeError: 'AssetLayer' object has no attribute 'assets_def_for_asset' HOT 2
- Error In Custom User Deployment Image Doesn't Print Source Code
- azureBlobComputeLogManager not working HOT 1
- Inability to use list of resources as field on resource
- Invalid k8s pod names generated when asset name contains uppercase characters
- Resource key for io_manager not found, despite being defined in resources HOT 3
- Handling of job timeout error HOT 3
- Job doesn't retry if code-location user code server temporarily unreachable HOT 1
- Job fails to retry without any hints as to why HOT 2
- Deployment Ops overview inconsistent source asset naming compared to asset HOT 1
- load_asset_checks_from_package_module() seems to be broken after ugrading HOT 6
- Pop up window shown below `view full message` window
- Reload the code location files without redeploying the code location repository HOT 2
- New additional DAG files in the code location repository are not visible in the Dagster Cloud UI. HOT 1
- [dagster-dbt] `dagster.log_column_level_metadata()` post-hook causes CI failure HOT 1
- Op_tags per partition HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from dagster.