Git Product home page Git Product logo

Comments (8)

andrewyates avatar andrewyates commented on July 28, 2024

We also tried splitting the filenames into chunks so that fewer forked processes are created. This didn't make a difference, but here's the modified script for reference:

def jprint(fn_list):
    idx, xs = fn_list
    for x in xs:
        fr = jfr(x)
        f = jbr(fr)
        while True:
            line = f.readLine()
            if line is None:
                print("break", idx)
                f.close()
                fr.close()
                break
            else:
                print("not none", idx)
    jnius.detach()


def chunks(lst, n):
    """Yield successive n-sized chunks from lst."""
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

if __name__ == "__main__":
    dir = sys.argv[1]
    pool_size = 5
    p = Pool(pool_size)
    fns = [os.path.join(dir, fn) for fn in os.listdir(dir) if os.path.isfile(os.path.join(dir, fn))]
    #fn_chunks = list(enumerate(chunks(fns, 1 + len(fns)//pool_size)))
    fn_chunks = list(enumerate(chunks(fns, 100)))
    p.map(jprint, fn_chunks)

from pyserini.

kevinmartinjos avatar kevinmartinjos commented on July 28, 2024

Using multithreading instead of multiprocessing seems to not reproduce the problem. The directory had around 2200 files, each file is around 5 mb. The script I used, though it's almost the same as the ones mentioned above :

import os
import threading
import sys
from multiprocessing import Pool
from jnius import autoclass
jstr = autoclass("java.lang.String")
jbr = autoclass("java.io.BufferedReader")
jfr = autoclass("java.io.FileReader")

def jprint(x):
    f = jbr(jfr(x))
    while True:
        line = f.readLine()
        if line is None:
            print("break")
            break
        else:
            print("not none")
if __name__ == "__main__":
    # jstr = autoclass("java.lang.String")
    print(jstr("main"))
    threads = []
    fns = [os.path.join(sys.argv[1], fn) for fn in os.listdir(sys.argv[1]) if os.path.isfile(os.path.join(sys.argv[1], fn))]

    for fn in fns:
        thread = threading.Thread(target=jprint, args=(fn,))
        threads.append(thread)
        thread.start()

    print("joining threads")
    for index, thread in threads:
        thread.join()

from pyserini.

lintool avatar lintool commented on July 28, 2024

Interestingly enough, this works for me:

from pyserini.collection import pycollection
from pyserini.index import pygenerator

collection = pycollection.Collection('HtmlCollection', 'collection/')
generator = pygenerator.Generator('JsoupGenerator')

segments = [fs for (i, fs) in enumerate(collection)]

def do_something(fs):
    for (j, doc) in enumerate(fs):
        parsed = generator.create_document(doc)
        docid = parsed.get('id')            # FIELD_ID
        raw = parsed.get('raw')             # FIELD_RAW
        contents = parsed.get('contents')   # FIELD_BODY
        print('{} {} {}...'.format(j, docid, contents.strip().replace('\n', ' ')[:50]))

import concurrent.futures

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
    executor.map(do_something, segments)

Which is a minimal modification of the example in the README.

This is on macOS.

Also:

$ ls collection/ | wc
    3204    3204   48060

The number of files is above what triggered errors on your end...

from pyserini.

lintool avatar lintool commented on July 28, 2024

Question is, efficiency of multithreading? To answer, I wrote this script, consuming Robust04:

from pyserini.collection import pycollection
from pyserini.index import pygenerator

collection = pycollection.Collection('TrecCollection', '/tuna1/collections/newswire/disk45/')
generator = pygenerator.Generator('JsoupGenerator')

segments = [fs for (i, fs) in enumerate(collection)]

def do_something(fs):
    for (j, doc) in enumerate(fs):
        parsed = generator.create_document(doc)
        docid = parsed.get('id')            # FIELD_ID
        raw = parsed.get('raw')             # FIELD_RAW
        contents = parsed.get('contents')   # FIELD_BODY
    print('Finished {}, {} docs'.format(doc, j))

import concurrent.futures
import time
import sys

tic = time.clock()
with concurrent.futures.ThreadPoolExecutor(max_workers=int(sys.argv[1])) as executor:
    executor.map(do_something, segments)
toc = time.clock()
print('{} threads, total time: {} seconds'.format(sys.argv[1], toc-tic))

Results (on my iMac Pro):

1 threads, total time: 90.505038 seconds
1 threads, total time: 90.314111 seconds
2 threads, total time: 120.587896 seconds
2 threads, total time: 121.853650 seconds
4 threads, total time: 163.897666 seconds
4 threads, total time: 167.067275 seconds
8 threads, total time: 179.601039 seconds
8 threads, total time: 176.766709 seconds

Due to the GIL, adding more threads actually decreases performance...

from pyserini.

lintool avatar lintool commented on July 28, 2024

Finally - okay, what about multiprocessing?

from pyserini.collection import pycollection
from pyserini.index import pygenerator

collection = pycollection.Collection('HtmlCollection', 'collection/')
generator = pygenerator.Generator('JsoupGenerator')

segments = [fs for (i, fs) in enumerate(collection)]

def do_something(fs):
    for (j, doc) in enumerate(fs):
        parsed = generator.create_document(doc)
        docid = parsed.get('id')            # FIELD_ID
        raw = parsed.get('raw')             # FIELD_RAW
        contents = parsed.get('contents')   # FIELD_BODY
        print('{} {} {}...'.format(j, docid, contents.strip().replace('\n', ' ')[:50]))

from multiprocessing import Pool

p = Pool(3)
p.map(do_something, segments)

We get the following error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/anaconda3/envs/python36/lib/python3.6/multiprocessing/pool.py", line 288, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/anaconda3/envs/python36/lib/python3.6/multiprocessing/pool.py", line 670, in get
    raise self._value
  File "/anaconda3/envs/python36/lib/python3.6/multiprocessing/pool.py", line 450, in _handle_tasks
    put(task)
  File "/anaconda3/envs/python36/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/anaconda3/envs/python36/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle _thread.lock objects

I think this provides a clue on what's going on... deep in the bowels of pyjnius, there's some locking that's preventing us from achieving parallelism... which would be consistent with @andrewyates 's initial hypothesis that we're encountering deadlock of some sort...

from pyserini.

lintool avatar lintool commented on July 28, 2024

Digging into the documentation, above error is because map uses a queue, and the queue requires objects to be pickle-able.

Final try, let's just blast out a bunch of processes:

from pyserini.collection import pycollection
from pyserini.index import pygenerator

collection = pycollection.Collection('HtmlCollection', 'collection/')

def do_something(fs):
    generator = pygenerator.Generator('JsoupGenerator')
    for (j, doc) in enumerate(fs):
        parsed = generator.create_document(doc)
        docid = parsed.get('id')            # FIELD_ID
        raw = parsed.get('raw')             # FIELD_RAW
        contents = parsed.get('contents')   # FIELD_BODY
        print('{} {} {}...'.format(j, docid, contents.strip().replace('\n', ' ')[:50]))

from multiprocessing import Process

for (i, fs) in enumerate(collection):
    proc = Process(target=do_something, args=(fs,))
    proc.start()

The results is a whole bunch of JVM errors from jnius...

I think we've reached the end of the road here; this seems like a dead end without really digging deep into pyjnius to figure out what's going on.

from pyserini.

kevinmartinjos avatar kevinmartinjos commented on July 28, 2024

@lintool
Here's a script to measure multiprocessing performance. On a server, and with a pool of 8 processes, reading robust04 was done in 24 seconds. The threaded script that you used earlier was done in 72 seconds with max-workers=1, and it took 288 seconds with max-workers = 8 (thus confirming what you had noted about the GIL earlier).

import os
from multiprocessing import Manager, Process, Pool
import time

def spawn_child_process_to_read_docs(data):

    from pyserini.index import pygenerator
    from pyserini.collection import pycollection
    path = data["rootdir"]
    ctype = data["ctype"]

    start = time.time()
    collection = pycollection.Collection(ctype, path)
    local_dict = {}
    generator = pygenerator.Generator("JsoupGenerator")
    for i, file_segment in enumerate(collection):
        doc_ids, doc_contents = read_file_segment(file_segment, generator)
        for i, doc_id in enumerate(doc_ids):
            local_dict[doc_id] = doc_contents[i]
 


def read_file_segment(file_segment, generator):
    doc_ids = []
    docs = []
    for j, doc in enumerate(file_segment):
        if doc.id is not None:
            parsed = generator.create_document(doc)
            if parsed is None:
                continue
            current_doc_id = parsed.get("id")
            contents = parsed.get("contents")    
            doc_ids.append(current_doc_id)
            docs.append(contents)

    return doc_ids, docs

if __name__ == "__main__":
    ctype = "TrecCollection"
    path = "/GW/NeuralIR/nobackup/Aquaint-TREC-3-4/NEWS_data"
    start_time = time.time()

    args_list = []
    for subdir in os.listdir(path):
        if os.path.isdir(path + "/" + subdir):
            args_list.append({"rootdir": path + "/" + subdir, "ctype": ctype})


    pool = Pool(processes=8)
    pool.map(spawn_child_process_to_read_docs, args_list)
    print("Getting all documents from disk took: {0}".format(time.time() - start_time))

from pyserini.

lintool avatar lintool commented on July 28, 2024

Had another idea for this, tried:

from pyserini.collection import pycollection
from pyserini.index import pygenerator

collection = pycollection.Collection('TrecCollection', '/tuna1/collections/newswire/disk45/')
paths = [path.toString() for path in collection.get_segment_paths().toArray()]

def do_something(p):
    segment_raw = collection.create_file_segment(pycollection.JPaths.get(p))
    iter = segment_raw.iterator()
    print(f'segment: {p}')

    while iter.hasNext():
        doc = iter.next()

import concurrent.futures
import time
import sys

tic = time.clock()
with concurrent.futures.ThreadPoolExecutor(max_workers=int(sys.argv[1])) as executor:
    executor.map(do_something, paths)
toc = time.clock()
print('{} threads, total time: {} seconds'.format(sys.argv[1], toc-tic))

I've tried to eliminate as much shared Java objects as possible; the only dependence above is on the Collection object, which is unavoidable, because we need it to create the segments.
Still doesn't work:

1 threads, total time: ~38 seconds
2 threads, total time: ~42 seconds
4 threads, total time: ~54 seconds
4 threads, total time: ~67 seconds

from pyserini.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.