Importing pandas in a Dataflow process causes dill.load failure. It looks like a race condition because the error occurs more often on bigger machines (tested on n1-standard-16) than smaller (n1-standard-4). Maybe the issue is malformed bytecode (overridden concurrently?).
Error message from worker: Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 381, in get
processor = self.cached_bundle_processors[bundle_descriptor_id].pop()
IndexError: pop from empty list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 279, in loads
return dill.loads(s)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 827, in _import_module
return getattr(__import__(module, None, None, [obj]), obj)
File "/usr/local/lib/python3.7/site-packages/listing_analytics/listing_data_processing/workflow.py", line 6, in <module>
from listing_analytics.commons.result_datasets import listing_dataset_config, LISTING_REQUESTS_TABLE_NAME, LISTING_ITEMS_TABLE_NAME
File "/usr/local/lib/python3.7/site-packages/listing_analytics/commons/result_datasets.py", line 1, in <module>
from bigflow.bigquery import DatasetConfig
File "/usr/local/lib/python3.7/site-packages/bigflow/bigquery/__init__.py", line 1, in <module>
from .interactive import interactive_component as component
File "/usr/local/lib/python3.7/site-packages/bigflow/bigquery/interactive.py", line 7, in <module>
import pandas as pd
File "/usr/local/lib/python3.7/site-packages/pandas/__init__.py", line 52, in <module>
from pandas.core.api import (
File "/usr/local/lib/python3.7/site-packages/pandas/core/api.py", line 15, in <module>
from pandas.core.arrays import Categorical
File "/usr/local/lib/python3.7/site-packages/pandas/core/arrays/__init__.py", line 9, in <module>
from pandas.core.arrays.integer import IntegerArray, integer_array
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 724, in exec_module
File "<frozen importlib._bootstrap_external>", line 857, in get_code
File "<frozen importlib._bootstrap_external>", line 525, in _compile_bytecode
EOFError: marshal data too short
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 258, in _execute
response = task()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 315, in <lambda>
lambda: self.create_worker().do_instruction(request), request)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 484, in do_instruction
getattr(request, request_type), request.instruction_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 513, in process_bundle
instruction_id, request.process_bundle_descriptor_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 387, in get
self.data_channel_factory)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 854, in __init__
self.ops = self.create_execution_tree(self.process_bundle_descriptor)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 909, in create_execution_tree
descriptor.transforms, key=topological_height, reverse=True)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 908, in <listcomp>
(transform_id, get_operation(transform_id)) for transform_id in sorted(
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 798, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 892, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 892, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 890, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 798, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 892, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 892, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 890, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 798, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 892, in get_operation
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 892, in <dictcomp>
pcoll_id in descriptor.transforms[transform_id].outputs.items()
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 890, in <listcomp>
tag: [get_operation(op) for op in pcoll_consumers[pcoll_id]]
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 798, in wrapper
result = cache[args] = func(*args)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 895, in get_operation
transform_id, transform_consumers)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1182, in create_operation
return creator(self, transform_id, transform_proto, payload, consumers)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1526, in create_par_do
parameter)
File "/usr/local/lib/python3.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1562, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 283, in loads
return dill.loads(s)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 275, in loads
return load(file, ignore, **kwds)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 270, in load
return Unpickler(file, ignore=ignore, **kwds).load()
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self)
File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 827, in _import_module
return getattr(__import__(module, None, None, [obj]), obj)
File "/usr/local/lib/python3.7/site-packages/listing_analytics/listing_data_processing/workflow.py", line 6, in <module>
from listing_analytics.commons.result_datasets import listing_dataset_config, LISTING_REQUESTS_TABLE_NAME, LISTING_ITEMS_TABLE_NAME
File "/usr/local/lib/python3.7/site-packages/listing_analytics/commons/result_datasets.py", line 1, in <module>
from bigflow.bigquery import DatasetConfig
File "/usr/local/lib/python3.7/site-packages/bigflow/bigquery/__init__.py", line 1, in <module>
from .interactive import interactive_component as component
File "/usr/local/lib/python3.7/site-packages/bigflow/bigquery/interactive.py", line 7, in <module>
import pandas as pd
File "/usr/local/lib/python3.7/site-packages/pandas/__init__.py", line 52, in <module>
from pandas.core.api import (
File "/usr/local/lib/python3.7/site-packages/pandas/core/api.py", line 15, in <module>
from pandas.core.arrays import Categorical
File "/usr/local/lib/python3.7/site-packages/pandas/core/arrays/__init__.py", line 9, in <module>
from pandas.core.arrays.integer import IntegerArray, integer_array
File "<frozen importlib._bootstrap>", line 983, in _find_and_load
File "<frozen importlib._bootstrap>", line 967, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 677, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 724, in exec_module
File "<frozen importlib._bootstrap_external>", line 857, in get_code
File "<frozen importlib._bootstrap_external>", line 525, in _compile_bytecode
EOFError: marshal data too short