netflix / suro Goto Github PK
View Code? Open in Web Editor NEWNetflix's distributed Data Pipeline
License: Apache License 2.0
Netflix's distributed Data Pipeline
License: Apache License 2.0
Create abstraction for MessageQueue so we can try various implementations instead of just LinkedBlockingQueue.
"/healthcheck" is too common and conflicts with potential Annotation scan of URI template "/healthcheck(/.*)
Let's change "/healthcheck" to "/surohealthcheck" and "/sinkstat" to "/surosinkstat"
Implementat a Disruptor based MessageQueue. Part of this will also be to determine whether distruptor actually makes sense.
After building suro-client and starting the server following instructions on the wiki, we're getting the following error:
ERROR org.apache.thrift.server.TNonblockingServer - Read an invalid frame size of 0. Are you using TFramedTransport on the client side?
Adding row id would be helpful for de-dup or finding lost messages.
Suro client can set up row id explicitly, otherwise, MD5Hash value based on byte[] payload would be added automatically.
Right now, Suro supports logging from Java only. A Python client should be developed.
Both SuroService and ThriftServer use System.exit(). They shouldn't. Only Suro driver should decide whether to use System.exit() or not.
[root@localhost suro]# ./gradlew clean build
FAILURE: Build failed with an exception.
What went wrong:
Could not resolve all dependencies for configuration ':classpath'.
Artifact 'com.google.code.findbugs:jsr305:1.3.9@jar' not found.
Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.
BUILD FAILED
Total time: 5.438 secs
[root@localhost suro]#
FileQueue4Sink is used in QueuedSink. Its run() function calls drainTo and check message list is full or expired. FileQueue4Sink doesn't do autocommit by default and this behavior is draining same messages multiple times.
The following fields should not be part of TagKey.java. Maybe we should put all the counters into a single utility class along with all the convenience methods.
public static final String SENT_COUNT = "sentMessageCount";
public static final String RECV_COUNT = "receivedMessageCount";
public static final String LOST_COUNT = "lostMessageCount";
public static final String RESTORED_COUNT = "restoredMessageCount";
public static final String RETRIED_COUNT = "retriedCount";
ConcurrentModificationException can be thrown on a rare error situations, where ConnectionPool is refreshed very frequently due to lots of malfunctioning suro servers.
public class SomeService {
private MessageProducer producer;
@Inject
public SomeService(MessageProducer producer) {
this.producer = producer;
}
public void doSomething() {
Future<Boolean> future = producer.produce(OutgoingMessage().withBody("blah blah").build());
future.get();
}
}
Message is the simple container of String routing key and byte[] message payload. For filtering the message, message payload should be deserialized and it can be cached for another filters. Also, some sink needs specific deserialized format, such as Map<String, Object> or Json string like ElasticSearch.
We didn't find the root cause of this unit test failure but instead of using ZkClient's test zookeeper server, using curator-test looks better.
Move common code such as DefaultObjectMapper into a common core module
In the master branch, there is DataConverter interface which is injectable. Instead of injectable DataConverter interface, we'd better use groovy script.
populationLatch.await(a few seconds, TimeUnit.SECONDS) should be...
Implement end-to-end message consumer - filter - route - produce using Rx.
Generic API for sending messages to SQS, Kafka, Suro, CassQueue, etc..
Fork join message queue as alternative to simple LinkedBlockingQueue. This should improve performance.
'''
2014. 5. 23 오후 4:11:40 org.apache.catalina.loader.WebappClassLoader clearReferencesThreads
심각: The web application [] appears to have started a thread named [NFLoadBalancer-PingTimer-suroClient] but has failed to stop it. This is very likely to create a memory leak.
...
...
2014. 5. 23 오후 4:11:40 org.apache.catalina.loader.WebappClassLoader clearReferencesThreads
심각: The web application [] appears to have started a thread named [AsyncSuroClientPoller-0] but has failed to stop it. This is very likely to create a memory leak.
2014. 5. 23 오후 4:11:40 org.apache.catalina.loader.WebappClassLoader clearReferencesThreads
...
...
정보: Stopping Coyote HTTP/1.1 on http-10004
Exception in thread "NFLoadBalancer-PingTimer-suroClient" java.lang.NullPointerException
at com.netflix.loadbalancer.BaseLoadBalancer$PingTask.run(BaseLoadBalancer.java:583)
at java.util.TimerThread.mainLoop(Timer.java:512)
at java.util.TimerThread.run(Timer.java:462)
'''
class SomeService {
// Optional inject of MessageConsumer to stop/start at will
@Inject
public SomeService(@Named("someconsumerid") MessageConsumer consumer) {
this.consumer = consumer;
}
public void start() {
consumer.start();
}
public void stop() {
consume.stop();
}
// This is how events will be consumed. Must ack() to delete from SQS.
@Consume("someconsumerid")
public void consume(IncomingMessage message) {
// ... do something
message.ack();
}
}
Suro message is using hadoop for Writable implementation but this looks an extra dependency because Writable implementation is used for only writing it to hadoop sequence file. Let's remove it but it can break backward compatibility because value class of hadoop sequence file is changed.
LifecycleManager.close() should call SinkManager.shutdown() for automatic resource management.
We have a requirement to route messages with the different topic name. 'alias' will be added to routing map configuration. If that property is valid, messages will be routed with that 'alias', not the original routingKey.
API for creating a consumer to SQS, CassQueue, Kafka, etc...
Whenever QueuedSink fails to write successfully data, it should retry messages not to lose messages. When the queue is full, messages will be dropped and this should be reported.
Steps:
sudo yum -y update
sudo sum -y install git
git clone https://github.com/Netflix/suro.git
cd suro
./gradlew build
com.netflix.suro.server.TestSuroControl > testOnlyExitCommandWorks FAILED
java.net.ConnectException at TestSuroControl.java:67
This error is intermittent. Retrying the builds sometimes succeeds. This may be because of the findAvailablePort method is returning a used port.
The source line is:
Socket client = new Socket("127.0.0.1", port);
com.netflix.suro.connection.TestConnectionOutPool > testOutPool FAILED
java.lang.AssertionError at TestConnectionOutPool.java:67
Source:
final ConnectionPool pool = injector.getInstance(ConnectionPool.class);
assertEquals(pool.getPoolSize(), 1);
EventBus filter is the part of Netflix Common now. Remove EventBus filter source code and make the dependency on Netflix Common.
Suro-server side sink should be reusable but its dependency is a little heavy including jetty, jersey, aws sdk, jets3t. Let's break down these dependencies into
suro-file and suro-s3.
Define reserved keywords first
Users can combine them together for generating remote prefix.
'date' needs date format string
'static' is static string value
'property' is replaced by the property value from ConfigurationManager.getConfigInstance().getProperty()
Currently, suro's input is only thrift server. We need to make them various including at least kafka consumer.
There will be one more dynamic configurable property called SuroServer.inputConfig json format.
If one of inputConfig element is 'thrift', all suroserver's configuration should be matched to the previous configuration which starts with SuroServer.
Break out Kafka into a separate sub-project
Treat all Sink and source types as independent modules that are enabled by adding them to the withModules of the injector creation step. Various implementations will be made available using guice mapbinder.
The current version of Suro uses an auto-incremented long variable as partition key, so messages are partitioned in the way of round-robin fashion by default. To support wider usecases Suro must have extension point to add custom partitioning logic.
Refer for more details https://groups.google.com/forum/#!topic/suro-users/6wz7ZDpbjFU
[2014-05-30 05:05:02.929] [Thread-5] WARN org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.internalRead(AbstractNonblockingServer.java:542) - Got an IOException in internalRead!
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_05]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_05]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_05]
at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[na:1.8.0_05]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:375) ~[na:1.8.0_05]
at org.apache.thrift.transport.TNonblockingSocket.read(TNonblockingSocket.java:141) ~[libthrift-0.9.1.jar:0.9.1]
at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.internalRead(AbstractNonblockingServer.java:537) [libthrift-0.9.1.jar:0.9.1]
at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.read(AbstractNonblockingServer.java:338) [libthrift-0.9.1.jar:0.9.1]
at org.apache.thrift.server.AbstractNonblockingServer$AbstractSelectThread.handleRead(AbstractNonblockingServer.java:203) [libthrift-0.9.1.jar:0.9.1]
at org.apache.thrift.server.TNonblockingServer$SelectAcceptThread.select(TNonblockingServer.java:202) [libthrift-0.9.1.jar:0.9.1]
at org.apache.thrift.server.TNonblockingServer$SelectAcceptThread.run(TNonblockingServer.java:158) [libthrift-0.9.1.jar:0.9.1]
[2014-05-30 05:05:02.929] [Thread-5] WARN org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.internalRead(AbstractNonblockingServer.java:542) - Got an IOException in internalRead!
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method) ~[na:1.8.0_05]
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39) ~[na:1.8.0_05]
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223) ~[na:1.8.0_05]
at sun.nio.ch.IOUtil.read(IOUtil.java:197) ~[na:1.8.0_05]
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:375) ~[na:1.8.0_05]
at org.apache.thrift.transport.TNonblockingSocket.read(TNonblockingSocket.java:141) ~[libthrift-0.9.1.jar:0.9.1]
at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.internalRead(AbstractNonblockingServer.java:537) [libthrift-0.9.1.jar:0.9.1]
at org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.read(AbstractNonblockingServer.java:338) [libthrift-0.9.1.jar:0.9.1]
at org.apache.thrift.server.AbstractNonblockingServer$AbstractSelectThread.handleRead(AbstractNonblockingServer.java:203) [libthrift-0.9.1.jar:0.9.1]
at org.apache.thrift.server.TNonblockingServer$SelectAcceptThread.select(TNonblockingServer.java:202) [libthrift-0.9.1.jar:0.9.1]
at org.apache.thrift.server.TNonblockingServer$SelectAcceptThread.run(TNonblockingServer.java:158) [libthrift-0.9.1.jar:0.9.1]
[2014-05-30 05:05:02.930] [Thread-5] WARN org.apache.thrift.server.AbstractNonblockingServer$FrameBuffer.internalRead(AbstractNonblockingServer.java:542) - Got an IOException in internalRead!
Javadoc generation failed.
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.