Git Product home page Git Product logo

Comments (6)

6fj avatar 6fj commented on September 28, 2024

感谢你这边的输入,请持续关注这个issue的状态

from psi.

winnylyc avatar winnylyc commented on September 28, 2024

我之前的输入应该信息不是很足😓,secretnote上的错误信息应该不是很准确,另外上次实验存在重复的key所以本质应该无法运行。
我这边重新生成了不存在重复key的数据,并直接使用python运行。下面尝试了将label_columns设为[],但是不能运行。想要请问一下是不是暂时还没有unlabeled PSI相关的实现?
下面提供了运行结果。
Sender(Server):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'alice'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

npq = 1
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6_unique.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path=f"/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq{npq}",
    num_per_query=npq,
    label_max_len=55,
    bucket_size=1000000
)

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 18:43:48,666 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 18:43:49.412 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '0.0.0.0:59179', 'bob': '127.0.0.1:53341'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-05-23 18:43:50.036 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=2406) 2024-05-23 18:43:50.032 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 59179, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=2406) 2024-05-23 18:43:50.035 INFO grpc_proxy.py:379 [alice] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 18:43:50.674 INFO barriers.py:333 [alice] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 18:43:50.674 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 0 attemp, up to 3600 attemps.
2024-05-23 18:43:53.678 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 1 attemp, up to 3600 attemps.
(SPURuntime(device_id=None, party=alice) pid=3107) WARNING:root:config=mode: MODE_SERVER_SETUP
(SPURuntime(device_id=None, party=alice) pid=3107) pir_protocol: PIR_PROTOCOL_KEYWORD_PIR_APSI
(SPURuntime(device_id=None, party=alice) pid=3107) pir_server_config {
(SPURuntime(device_id=None, party=alice) pid=3107)   input_path: "/root/project/psi1/alice_exactpsi_1e6_unique.csv"
(SPURuntime(device_id=None, party=alice) pid=3107)   setup_path: "/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq1"
(SPURuntime(device_id=None, party=alice) pid=3107)   key_columns: "name"
(SPURuntime(device_id=None, party=alice) pid=3107)   label_max_len: 55
(SPURuntime(device_id=None, party=alice) pid=3107)   bucket_size: 1000000
(SPURuntime(device_id=None, party=alice) pid=3107)   apsi_server_config {
(SPURuntime(device_id=None, party=alice) pid=3107)     oprf_key_path: "/root/project/psi1/alice_oprf_key"
(SPURuntime(device_id=None, party=alice) pid=3107)     num_per_query: 1
(SPURuntime(device_id=None, party=alice) pid=3107)   }
(SPURuntime(device_id=None, party=alice) pid=3107) }
(SPURuntime(device_id=None, party=alice) pid=3107)
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.445] [info] [pir.cc:245] table_params hash_func_count:1
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.445] [info] [pir.cc:247] table_params max_items_per_bin:55
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.445] [info] [pir.cc:250] seal_params poly_modulus_degree:2048
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.445] [info] [pir.cc:252] query_params query_powers size:55
(SPURuntime(device_id=None, party=alice) pid=3107) [2024-05-23 18:43:58.455] [info] [pir.cc:267] bucket:0 bucket_setup_path:/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq1/bucket_0
2024-05-23 18:43:58.476 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(80e22aed7718a125377d0994e1b53538ecef50370100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=2464, ip=192.168.15.7, actor_id=377d0994e1b53538ecef503701000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f81b0409f00>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.pir_setup() (pid=3107, ip=192.168.15.7, actor_id=66e4abb0bac5fbda63689d7f01000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1245, in pir_setup
    report = psi.pir(config)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 148, in pir
    report_str = libpsi.libs.pir(config.SerializeToString(), link)
RuntimeError: what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f6d8952b828
#1 psi::apsi::Launch()+0x7f6d895332f9
#2 psi::RunPir()+0x7f6d8943c003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f6d894365d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f6d894367e0
#5 pybind11::cpp_function::dispatcher()+0x7f6d89417fed
#6 cfunction_call+0x4fd907,upstream_seq_id: 5#0, downstream_seq_id: 6.
2024-05-23 18:43:58.476 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f6d8952b828
#1 psi::apsi::Launch()+0x7f6d895332f9
#2 psi::RunPir()+0x7f6d8943c003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f6d894365d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f6d894367e0
#5 pybind11::cpp_function::dispatcher()+0x7f6d89417fed
#6 cfunction_call+0x4fd907

 to bob.
2024-05-23 18:43:58.477 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-23 18:43:58.477 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-23 18:43:58.477 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-23 18:43:58.477 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=2464, ip=192.168.15.7, actor_id=377d0994e1b53538ecef503701000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7f81b0409f00>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.pir_setup() (pid=3107, ip=192.168.15.7, actor_id=66e4abb0bac5fbda63689d7f01000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1245, in pir_setup
    report = psi.pir(config)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 148, in pir
    report_str = libpsi.libs.pir(config.SerializeToString(), link)
RuntimeError: what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f6d8952b828
#1 psi::apsi::Launch()+0x7f6d895332f9
#2 psi::RunPir()+0x7f6d8943c003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f6d894365d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f6d894367e0
#5 pybind11::cpp_function::dispatcher()+0x7f6d89417fed
#6 cfunction_call+0x4fd907
2024-05-23 18:43:58.477 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-23 18:43:58.478 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-23 18:43:58.478 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-23 18:43:58.479 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.

Receiver(Client):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

npq = 1
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6_unique.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path=f"/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq{npq}",
    num_per_query=npq,
    label_max_len=55,
    bucket_size=1000000
)

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-23 18:43:51,091 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-23 18:43:51.803 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '127.0.0.1:59179', 'bob': '0.0.0.0:53341'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-05-23 18:43:52.553 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=2981) 2024-05-23 18:43:52.548 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 53341, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=2981) 2024-05-23 18:43:52.551 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-23 18:43:53.244 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-23 18:43:53.244 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps.
2024-05-23 18:43:58.482 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
Traceback (most recent call last):
  File "/root/project/psi2/sf_connect_unlabeled.py", line 46, in <module>
    spu.pir_setup(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1990, in pir_setup
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 521, in pir_setup
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 613, in get
    raise e
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=2981, ip=192.168.15.7, actor_id=4f32dff258c018cdbff1ac3001000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7f98089b5d50>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice
^C2024-05-23 18:45:37.998 WARNING api.py:60 [bob] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-23 18:45:37.998 WARNING api.py:325 [bob] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-23 18:45:37.999 INFO api.py:337 [bob] -- [Anonymous_job] No wait for data sending.
2024-05-23 18:45:38.000 INFO message_queue.py:70 [bob] -- [Anonymous_job] Notify message polling thread[DataSendingQueueThread] to exit.
2024-05-23 18:45:38.000 INFO message_queue.py:70 [bob] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-23 18:45:38.000 INFO api.py:352 [bob] -- [Anonymous_job] Shutdowned rayfed.
2024-05-23 18:45:38.000 CRITICAL api.py:356 [bob] -- [Anonymous_job] Exit now due to the previous error.
Exception ignored in: <module 'threading' from '/root/anaconda3/envs/psi/lib/python3.10/threading.py'>
Traceback (most recent call last):
  File "/root/anaconda3/envs/psi/lib/python3.10/threading.py", line 1567, in _shutdown
    lock.acquire()
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 65, in _signal_handler
    _shutdown(intended=False)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 357, in _shutdown
    sys.exit(1)
SystemExit: 1

from psi.

6fj avatar 6fj commented on September 28, 2024

hi @winnylyc

能否尝试将 label_max_len 同时设为 0 ?

from psi.

winnylyc avatar winnylyc commented on September 28, 2024

感谢您的回复!
我这边尝试了一下,似乎还是不行😓,下面是代码和输出。
Sender(Server):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'alice'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

npq = 1
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6_unique.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path=f"/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq{npq}",
    num_per_query=npq,
    label_max_len=0,
    bucket_size=1000000
)

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-24 10:29:06,237 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-24 10:29:07.584 INFO api.py:233 [alice] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '0.0.0.0:59179', 'bob': '127.0.0.1:53341'}, 'CURRENT_PARTY_NAME': 'alice', 'TLS_CONFIG': {}}
2024-05-24 10:29:08.255 INFO barriers.py:284 [alice] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=1837) 2024-05-24 10:29:08.251 INFO grpc_proxy.py:359 [alice] -- [Anonymous_job] ReceiverProxy binding port 59179, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=1837) 2024-05-24 10:29:08.253 INFO grpc_proxy.py:379 [alice] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-24 10:29:08.940 INFO barriers.py:333 [alice] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-24 10:29:08.940 INFO barriers.py:520 [alice] -- [Anonymous_job] Try ping ['bob'] at 0 attemp, up to 3600 attemps.
(SPURuntime(device_id=None, party=alice) pid=2042) WARNING:root:config=mode: MODE_SERVER_SETUP
(SPURuntime(device_id=None, party=alice) pid=2042) pir_protocol: PIR_PROTOCOL_KEYWORD_PIR_APSI
(SPURuntime(device_id=None, party=alice) pid=2042) pir_server_config {
(SPURuntime(device_id=None, party=alice) pid=2042)   input_path: "/root/project/psi1/alice_exactpsi_1e6_unique.csv"
(SPURuntime(device_id=None, party=alice) pid=2042)   setup_path: "/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq1"
(SPURuntime(device_id=None, party=alice) pid=2042)   key_columns: "name"
(SPURuntime(device_id=None, party=alice) pid=2042)   bucket_size: 1000000
(SPURuntime(device_id=None, party=alice) pid=2042)   apsi_server_config {
(SPURuntime(device_id=None, party=alice) pid=2042)     oprf_key_path: "/root/project/psi1/alice_oprf_key"
(SPURuntime(device_id=None, party=alice) pid=2042)     num_per_query: 1
(SPURuntime(device_id=None, party=alice) pid=2042)   }
(SPURuntime(device_id=None, party=alice) pid=2042) }
(SPURuntime(device_id=None, party=alice) pid=2042)
2024-05-24 10:29:11.165 WARNING cleanup.py:154 [alice] -- [Anonymous_job] Failed to send ObjectRef(8849b62d89cb30f9c4aba1b6791ea8e6170e26cf0100000001000000) to bob, error: ray::SenderProxyActor.send() (pid=1945, ip=192.168.15.7, actor_id=c4aba1b6791ea8e6170e26cf01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fd04d2e9f90>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.pir_setup() (pid=2042, ip=192.168.15.7, actor_id=29ba65c288e8d8664bc7d8d201000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1245, in pir_setup
    report = psi.pir(config)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 148, in pir
    report_str = libpsi.libs.pir(config.SerializeToString(), link)
RuntimeError: what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f7a76196828
#1 psi::apsi::Launch()+0x7f7a7619e2f9
#2 psi::RunPir()+0x7f7a760a7003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f7a760a15d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f7a760a17e0
#5 pybind11::cpp_function::dispatcher()+0x7f7a76082fed
#6 cfunction_call+0x4fd907,upstream_seq_id: 5#0, downstream_seq_id: 6.
2024-05-24 10:29:11.165 INFO cleanup.py:161 [alice] -- [Anonymous_job] Sending error what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f7a76196828
#1 psi::apsi::Launch()+0x7f7a7619e2f9
#2 psi::RunPir()+0x7f7a760a7003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f7a760a15d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f7a760a17e0
#5 pybind11::cpp_function::dispatcher()+0x7f7a76082fed
#6 cfunction_call+0x4fd907

 to bob.
2024-05-24 10:29:11.166 WARNING cleanup.py:127 [alice] -- [Anonymous_job] Signal SIGINT to exit.
2024-05-24 10:29:11.166 WARNING api.py:60 [alice] -- [Anonymous_job] Stop signal received (e.g. via SIGINT/Ctrl+C), try to shutdown fed. Press CTRL+C (or send SIGINT/SIGKILL/SIGTERM) to skip.
2024-05-24 10:29:11.166 WARNING api.py:325 [alice] -- [Anonymous_job] Shutdowning rayfed unintendedly...
2024-05-24 10:29:11.167 ERROR api.py:330 [alice] -- [Anonymous_job] Cross-silo sending error occured. ray::SenderProxyActor.send() (pid=1945, ip=192.168.15.7, actor_id=c4aba1b6791ea8e6170e26cf01000000, repr=<fed.proxy.barriers.SenderProxyActor object at 0x7fd04d2e9f90>)
  At least one of the input arguments for this task could not be computed:
ray.exceptions.RayTaskError: ray::SPURuntime.pir_setup() (pid=2042, ip=192.168.15.7, actor_id=29ba65c288e8d8664bc7d8d201000000, repr=SPURuntime(device_id=None, party=alice))
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1245, in pir_setup
    report = psi.pir(config)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/spu/psi.py", line 148, in pir
    report_str = libpsi.libs.pir(config.SerializeToString(), link)
RuntimeError: what:
        [external/psi/psi/utils/batch_provider.cc:160] unsupported.
stacktrace:
#0 psi::apsi::PirServerSetup()+0x7f7a76196828
#1 psi::apsi::Launch()+0x7f7a7619e2f9
#2 psi::RunPir()+0x7f7a760a7003
#3 psi::BindLibs()::{lambda()#5}::operator()()+0x7f7a760a15d5
#4 pybind11::cpp_function::initialize<>()::{lambda()#3}::_FUN()+0x7f7a760a17e0
#5 pybind11::cpp_function::dispatcher()+0x7f7a76082fed
#6 cfunction_call+0x4fd907
2024-05-24 10:29:11.167 INFO api.py:337 [alice] -- [Anonymous_job] No wait for data sending.
2024-05-24 10:29:11.168 INFO message_queue.py:70 [alice] -- [Anonymous_job] Notify message polling thread[ErrorSendingQueueThread] to exit.
2024-05-24 10:29:11.169 INFO api.py:352 [alice] -- [Anonymous_job] Shutdowned rayfed.
2024-05-24 10:29:11.169 CRITICAL api.py:356 [alice] -- [Anonymous_job] Exit now due to the previous error.
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.138] [info] [pir.cc:245] table_params hash_func_count:1
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.138] [info] [pir.cc:247] table_params max_items_per_bin:55
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.138] [info] [pir.cc:250] seal_params poly_modulus_degree:2048
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.138] [info] [pir.cc:252] query_params query_powers size:55
(SPURuntime(device_id=None, party=alice) pid=2042) [2024-05-24 10:29:11.150] [info] [pir.cc:267] bucket:0 bucket_setup_path:/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq1/bucket_0

Receiver(Client):

import secretflow as sf
import spu
import time

cluster_config = {
    'parties' : {
        'alice': {
            'address': '127.0.0.1:59179',
            'listen_addr': '0.0.0.0:59179'
        },
        'bob': {
            'address': '127.0.0.1:53341',
            'listen_addr': '0.0.0.0:53341'
        }
    },
    'self_party': 'bob'
}
sf.shutdown
sf.init(address='local', cluster_config=cluster_config)
cluster_def = {
    "nodes": [
        {
            "party": "alice",
            "address": "127.0.0.1:45413"
        },
        {
            "party": "bob",
            "address": "127.0.0.1:47480"
        },
    ],
    "runtime_config": {
        "protocol": spu.spu_pb2.SEMI2K,
        "field": spu.spu_pb2.FM128
    },
}

spu = sf.SPU(
    cluster_def,
    link_desc={
        "connect_retry_times": 60,
        "connect_retry_interval_ms": 1000,
    }
)

npq = 1
spu.pir_setup(
    server="alice",
    input_path="/root/project/psi1/alice_exactpsi_1e6_unique.csv",
    key_columns=['name'],
    label_columns=[],
    oprf_key_path="/root/project/psi1/alice_oprf_key",
    setup_path=f"/root/project/psi1/alice_exactpsi_setup_1e6_len55_unique_npq{npq}",
    num_per_query=npq,
    label_max_len=0,
    bucket_size=1000000
)

输出:

/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
/root/anaconda3/envs/psi/lib/python3.10/subprocess.py:1796: RuntimeWarning: os.fork() was called. os.fork() is incompatible with multithreaded code, and JAX is multithreaded, so this will likely lead to a deadlock.
  self.pid = _posixsubprocess.fork_exec(
2024-05-24 10:29:06,237 INFO worker.py:1621 -- Started a local Ray instance.
2024-05-24 10:29:07.600 INFO api.py:233 [bob] -- [Anonymous_job] Started rayfed with {'CLUSTER_ADDRESSES': {'alice': '127.0.0.1:59179', 'bob': '0.0.0.0:53341'}, 'CURRENT_PARTY_NAME': 'bob', 'TLS_CONFIG': {}}
2024-05-24 10:29:08.255 INFO barriers.py:284 [bob] -- [Anonymous_job] Succeeded to create receiver proxy actor.
(ReceiverProxyActor pid=1838) 2024-05-24 10:29:08.251 INFO grpc_proxy.py:359 [bob] -- [Anonymous_job] ReceiverProxy binding port 53341, options: (('grpc.enable_retries', 1), ('grpc.so_reuseport', 0), ('grpc.max_send_message_length', 524288000), ('grpc.max_receive_message_length', 524288000), ('grpc.service_config', '{"methodConfig": [{"name": [{"service": "GrpcService"}], "retryPolicy": {"maxAttempts": 5, "initialBackoff": "5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE"]}}]}'))...
(ReceiverProxyActor pid=1838) 2024-05-24 10:29:08.254 INFO grpc_proxy.py:379 [bob] -- [Anonymous_job] Successfully start Grpc service without credentials.
2024-05-24 10:29:08.936 INFO barriers.py:333 [bob] -- [Anonymous_job] SenderProxyActor has successfully created.
2024-05-24 10:29:08.937 INFO barriers.py:520 [bob] -- [Anonymous_job] Try ping ['alice'] at 0 attemp, up to 3600 attemps.
2024-05-24 10:29:11.172 WARNING api.py:607 [bob] -- [Anonymous_job] Encounter RemoteError happend in other parties, error message: FedRemoteError occurred at alice
Traceback (most recent call last):
  File "/root/project/psi2/sf_connect_unlabeled.py", line 46, in <module>
    spu.pir_setup(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/spu.py", line 1990, in pir_setup
    return dispatch(
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 111, in dispatch
    return _registrar.dispatch(self.device_type, name, self, *args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/device/register.py", line 80, in dispatch
    return self._ops[device_type][name](*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/device/kernels/spu.py", line 521, in pir_setup
    return sfd.get(res)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/secretflow/distributed/primitive.py", line 156, in get
    return fed.get(object_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 613, in get
    raise e
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/api.py", line 602, in get
    values = ray.get(ray_refs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/auto_init_hook.py", line 24, in auto_init_wrapper
    return fn(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/client_mode_hook.py", line 103, in wrapper
    return func(*args, **kwargs)
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/ray/_private/worker.py", line 2524, in get
    raise value.as_instanceof_cause()
ray.exceptions.RayTaskError(FedRemoteError): ray::ReceiverProxyActor.get_data() (pid=1838, ip=192.168.15.7, actor_id=cb60c17fef717f98f61a8d5f01000000, repr=<fed.proxy.barriers.ReceiverProxyActor object at 0x7fd726c7dd80>)
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 451, in result
    return self.__get_result()
  File "/root/anaconda3/envs/psi/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/root/anaconda3/envs/psi/lib/python3.10/site-packages/fed/proxy/barriers.py", line 236, in get_data
    raise data
fed.exceptions.FedRemoteError: FedRemoteError occurred at alice

from psi.

6fj avatar 6fj commented on September 28, 2024

hi @winnylyc

感谢你的反馈,pir这边我们正处在代码调整中,会将目前这些问题一并解决。我们大概会在6月底之前完成。

from psi.

winnylyc avatar winnylyc commented on September 28, 2024

感谢您的反馈!期待PIR部分的调整!

from psi.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.