Git Product home page Git Product logo

connected-component's Introduction

Connected component using Map-reduce on Apache Spark

Description

Computing Connected Components of a graph is a well studied problem in Graph Theory and there have been many state of the art algorithms that perform pretty well in a single machine environment. But many of these algorithms perform poorly when we apply them in a distributed setting with hundreds of billions of nodes and edges. Our first choice was to use, GraphX, the graph computing engine that comes with Apache Spark. Although, GraphX implementation of the algorithm works reasonably well on smaller graphs (we tested up to ~10 million nodes and ~100 million edges), but its performance quickly degraded as we tried to scale to higher numbers.

We implemented connected component algorithm described in the paper Connected Components in Map Reduce and Beyond. We liked its approach for two reasons - (1) the algorithm was well suited for our technical stack (Apache Spark on HDFS) (2) more over, other than typical computational complexity, the algorithm also took communication complexity and data skew into account. The proposed algorithm is iterative but in practice with our dataset and scale, it was able to converge pretty fast with less than ten iterations.


Implementation

We implemented the algorithm on Apache Spark on HDFS using Scala. We also provide a sample graph generator and a driver program. You can tune the parameters of this generator to change the characteristics of the generated graph. The generator saves the generated graph on HDFS. You can use the driver program to read the generated graph and run the algorithm. The results of the algorithm is also stored on HDFS. Alternatively, you can call directly call the API to run the algorithm.

In the implementation, we represent a node by a unique Long number. Input to the algorithm is a List of Cliques. A Clique is a list of nodes that are connected together. For example, the cliques can be:

1:	List(1L, 2L, 3L)
2:	List(3L, 4L)
3:	List(1L, 5L)
4:	List(2L)
5:	List(6L)
6:	List(7L, 8L)
7:	List(6L, 8L)
8:	List(9L)

In this case, we have 8 cliques as the input. As you can see that cliques 1, 2, 3, 4 form one connected component, cliques 5, 6, 7 form the second connected component, and clique 8 forms the third connected component.

The main API to drive the algorithm is

ConnectedComponent.run(cliques:RDD[List[Long]], maxIterationCount: Int): (RDD([Long, Long)], Boolean, Int)

The API expects you to provide RDD of cliques and maximum number of iterations. It returns RDD[(Long, Long)] i.e. a RDD of 2-tuple. The second element of the tuple is the minimum node in a connected component and the first element is another node in the same component.

We first build a List of nodePairs (RDD[(Long, Long)]), from the list of given cliques. We then apply the Large Star and Small Star operations on the list of node pairs.

We implemented the Large Star algorithm as follows:

LargeStar 
Input: List of nodePair(a, b)
Output: List of new nodePairs and change in totalConnectivityChangeCount

1: For every nodePair(a, b) emit nodePair(a, b) and nodePair(b, a).  We call the first element of the tuple-2 as self and the second element as its neighbor
2: Reduce on self to get a list of its neighbors.
3: For every self, apply Large Star operation on its neighbors.  The operation results in a list of new nodePairs.
4: Count the change in connectivity, connectivtyChangeCount, by subtracting the length of the list of neighbors in step 3 from the new list of neighbors in step 4
5: Sum this change for every self to get total change in connectivity, totalConnectivityChangeCount
6: Return the list of new nodePairs and totalConnectivityChangeCount

We implemented the Small Star algorithm as follows:

SmallStar 
Input: List of nodePair(a, b)
Output: List of new nodePairs and change in totalConnectivityChangeCount

1: For every nodePair(a, b) emit nodePair(a, b) if a > b else emit nodePair(b, a)
2: Rest of the steps are same as that of Large Star.

We call the Large Star and Small Star alternatively till the sum of the totalConnectivityChangeCount becomes zero. The outputs are RDD of nodePairs, a flag to indicate whether the algorithm converged within the given number of iterations, and count of iterations it took the algorithm to converge. In our experiments with various datasets, we observed that the algorithm was able to converge within 5 iterations.

The second element of the resultant nodePair is the minimum node in the connected component. To get all the nodes in a components, you will need to run reduce operation with second element as the key. For example, to get all the connected components, you may use the following:

val (cc, didConverge, iterCount) = ConnectedComponent.run(cliques, maxIterCount)
If (didConverge) {
	val allComponents = cc.map(x => {
   		val minNode = x._2
		val otherNode = x._1
		(minNode, List(otherNode))
	}).reduceByKey((a, b) => b ::: a)
}

Conclusion

We tested our implementation on various data sizes - scaling up to ~100 billion nodes and ~800 billion edges. In all the cases, the algorithm converged in no more than 6 iterations. We indeed had to to try various Spark related configurations, including executor memory size, driver memory size, yarn memory overhead, network timeout, and number of partitions to successfully run the implementation.

We would love to hear your feedback. Please drop us a note at [email protected].

connected-component's People

Contributors

kumarshirish avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

connected-component's Issues

function largerStar might go wrong

In paper "connected component in mapreduce and beyond", while doing largerStar, it usually connect all strictly larger neighbors to the min neighbor including self.
But in the implementation, as shown in code snippet below:
((neighbor > self && neighbor != minNode) || (neighbor == self))
it will emit edge from self to minNode.

Test below easily demonstrate this:
before largeStar:
scala> nodePairs.collect.foreach(print _)
(1,8)(9,8)(8,5)(7,3)(3,2)(6,3)(6,4)(8,7)
after that:
scala> largeStar(nodePairs)._1.collect.foreach(print _)
(1,1)(8,1)(2,2)(3,2)(3,2)(6,2)(7,2)(4,4)(6,4)(5,5)(8,5)(6,3)(7,3)(8,3)(8,1)(9,1)(9,8)
which is obviously wrong.

Why is the input a list of cliques?

Thank you for open sourcing this code. First, I'd like to note that I was not able to send email to the address given in the README ([email protected]): the error was mail delivery failure.

My main issue is that the README states that the input to your algorithm is a list of cliques. However, finding all cliques in a graph is a very hard problem (NP-complete I believe) but a basic connected components algorithm is doable in at least linear time, so why does your code require finding all cliques first? Are you using a different definition of "clique"? The paper cited in the README does not mention cliques at all.

Documentation request for the proper spark related settings

On the README.md the following is stated:

We indeed had to to try various Spark related configurations, including executor memory size, driver memory size, yarn memory overhead, network timeout, and number of partitions to successfully run the implementation.

Would you be able to post some of those settings - and if you have the time also some note about why/under what conditions they were chosen? thanks!

Is there anyone meets "central nodes/paths" problem?

After I got the result of conencted-component, the connected graph is huge...
I wanna remove some central nodes/paths,which means the most frequently nodes connected isolated subgraphs.
I wanna remove those central nodes, so that huge graph can be splitted into two isolated subgraphs, which be better for analysis.

Is there any solution to solve this?

Thank you !

License

There is no LICENSE file as a part of this project. Under what license is it distributed?

Performance and input - questions

Hi Kumar,

First I would like to say thanks for having this code, it is really good comparing to GraphX performance, also the API is much easier to understand.

Our use case is to find a connected component in large graph. we are creating the graph based on other input and we can manipulate the way we are creating the edges (i.e creating them as a star (partially) or like a "linked list" and we are trying to find the best way to optimize the input in order to speed the connected component part.

We did some performance testing (with 30 spark executor where each executor has 12 cores) and we found that:
for Graph1:
vertex count: 1458232
edge count: 794924
connected components: 795098
It took 5 minutes to find connected components

In Graph2:
vertex count: 46165451
edge count: 37572770
connected components: 17908024
it took 8 hours to find connected components.

Several questions:

  1. what is the best "input" (graph shape/edges shape) that makes the code to run faster per connected component? star? linked-list?
    is it correct that having the maximum needed edges as an input will perform better than having the minimum needed edges to achieve the same connected components? we tried to save edges but we saw it gave us bad result comparing to other run with more edges.
    Examples:
    Graph (each line is clique):
    a,b,c
    a,b
    d,e,f

edges in scenario 1 (better results): (a,b)(a,c)(b,c)(d,e)(d,f)(e,f)
edges in scenario 2 : (a,b)(a,c)(d,e)(d,f)
Are there any differences in terms of performance? we can manipulate our input to achieve the same connected component with different edges, this is why we are asking.

  1. if I compare graph1 to graph2, how come Graph2 runs 80 times slower than Graph1? I did the following computation, let me know if I'm wrong:

Graph2 edges/Graph1 edges = ~47
Graph2 vertices/Graph1 vertices = ~31

31 + 47 = 78 which is close to 80, could this be the explanation how come it runs 80 times slower?

I couldn't find other way to contact with you to ask questions....
if you can send me an email to [email protected] we can discuss more.. please feel free to close this issue.

thanks!

ConnectivityChangeCount doesn't mean convergence

Hi,

I'm interested in this project. After I read the code, I think ConnectivityChangeCount only count the change in number of edges, not change of edges. This may lead to mistake.
Here is a counterexample:

val g = sc.parallelize(Array(List(1L,4L), List(2L,4L),List(2L,5L), 
                             List(3L,5L),List(1L,6L),List(3L,6L)))

Which build a graph:

  6
/    \
1  2  3
\ /\ /
 4  5

Then I run

val (a,b,c) = ConnectedComponent.run(sc,g,1000)
a.collect

Spark gave this:

Array[(Long, Long)] = Array((1,1), (2,2), (3,3), (4,1), (2,1), (5,2), (3,2), (6,1), (3,1))

Please check this case. Thank you!

Possible bug in algorithm?

Given cliques like: (3, 4) (5, 6) (1, 2) (9, 10) (8, 9) (7, 8) (6, 7)
The CC result is like: (1,1),(2,1),(3,3),(4,3),(5,5),(6,5),(7,5),(8,5),(6,5),(9,5),(6,5),(7,5),(10,6),(8,6)

It's quite weird that, the result shows "(5,5),(6,5),(7,5),(8,5),(6,5),(9,5),(6,5),(7,5)" and "(10,6),(8,6)" are 2 seperated subgraphs.

We're using quite old spark configurations. Spark verion is 2.3.0 and scala is 2.11 .

Benchmark details

Hi,

I'm interested in using this approach to connected-components.

Can you give me some details on the performance of this approach?

eg How many cores, how much RAM and how much time it took to process the 100billion nodes?

Thanks,
Jatinder Sangha

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.