Git Product home page Git Product logo

storm-hdfs's Introduction

Storm HDFS

Storm components for interacting with HDFS file systems

Usage

The following example will write pipe("|")-delimited files to the HDFS path hdfs://localhost:54310/foo. After every 1,000 tuples it will sync filesystem, making that data visible to other HDFS clients. It will rotate files when they reach 5 megabytes in size.

// use "|" instead of "," for field delimiter
RecordFormat format = new DelimitedRecordFormat()
        .withFieldDelimiter("|");

// sync the filesystem after every 1k tuples
SyncPolicy syncPolicy = new CountSyncPolicy(1000);

// rotate files when they reach 5MB
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

FileNameFormat fileNameFormat = new DefaultFileNameFormat()
        .withPath("/foo/");

HdfsBolt bolt = new HdfsBolt()
        .withFsUrl("hdfs://localhost:54310")
        .withFileNameFormat(fileNameFormat)
        .withRecordFormat(format)
        .withRotationPolicy(rotationPolicy)
        .withSyncPolicy(syncPolicy);

Packaging a Topology

When packaging your topology, it's important that you use the maven-shade-plugin as opposed to the maven-assembly-plugin.

The shade plugin provides facilities for merging JAR manifest entries, which the hadoop client leverages for URL scheme resolution.

If you experience errors such as the following:

java.lang.RuntimeException: Error preparing HdfsBolt: No FileSystem for scheme: hdfs

it's an indication that your topology jar file isn't packaged properly.

If you are using maven to create your topology jar, you should use the following maven-shade-plugin configuration to create your topology jar:

<plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>1.4</version>
    <configuration>
        <createDependencyReducedPom>true</createDependencyReducedPom>
    </configuration>
    <executions>
        <execution>
            <phase>package</phase>
            <goals>
                <goal>shade</goal>
            </goals>
            <configuration>
                <transformers>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                    <transformer
                            implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                        <mainClass></mainClass>
                    </transformer>
                </transformers>
            </configuration>
        </execution>
    </executions>
</plugin>

Specifying a Hadoop Version

By default, storm-hdfs uses the following Hadoop dependencies:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.2.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

If you are using a different version of Hadoop, you should exclude the Hadoop libraries from the storm-hdfs dependency and add the dependencies for your preferred version in your pom.

Hadoop client version incompatibilites can manifest as errors like:

com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)

Customization

Record Formats

Record format can be controlled by providing an implementation of the org.apache.storm.hdfs.format.RecordFormat interface:

public interface RecordFormat extends Serializable {
    byte[] format(Tuple tuple);
}

The provided org.apache.storm.hdfs.format.DelimitedRecordFormat is capable of producing formats such as CSV and tab-delimited files.

File Naming

File naming can be controlled by providing an implementation of the org.apache.storm.hdfs.format.FileNameFormat interface:

public interface FileNameFormat extends Serializable {
    void prepare(Map conf, TopologyContext topologyContext);
    String getName(long rotation, long timeStamp);
    String getPath();
}

The provided org.apache.storm.hdfs.format.DefaultFileNameFormat will create file names with the following format:

 {prefix}{componentId}-{taskId}-{rotationNum}-{timestamp}{extension}

For example:

 MyBolt-5-7-1390579837830.txt

By default, prefix is empty and extenstion is ".txt".

Sync Policies

Sync policies allow you to control when buffered data is flushed to the underlying filesystem (thus making it available to clients reading the data) by implementing the org.apache.storm.hdfs.sync.SyncPolicy interface:

public interface SyncPolicy extends Serializable {
    boolean mark(Tuple tuple, long offset);
    void reset();
}

The HdfsBolt will call the mark() method for every tuple it processes. Returning true will trigger the HdfsBolt to perform a sync/flush, after which it will call the reset() method.

The org.apache.storm.hdfs.sync.CountSyncPolicy class simply triggers a sync after the specified number of tuples have been processed.

File Rotation Policies

Similar to sync policies, file rotation policies allow you to control when data files are rotated by providing a org.apache.storm.hdfs.rotation.FileRotation interface:

public interface FileRotationPolicy extends Serializable {
    boolean mark(Tuple tuple, long offset);
    void reset();
}

The org.apache.storm.hdfs.rotation.FileSizeRotationPolicy implementation allows you to trigger file rotation when data files reach a specific file size:

FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

File Rotation Actions

Both the HDFS bolt and Trident State implementation allow you to register any number of RotationActions. What RotationActions do is provide a hook to allow you to perform some action right after a file is rotated. For example, moving a file to a different location or renaming it.

public interface RotationAction extends Serializable {
    void execute(FileSystem fileSystem, Path filePath) throws IOException;
}

Storm-HDFS includes a simple action that will move a file after rotation:

public class MoveFileAction implements RotationAction {
    private static final Logger LOG = LoggerFactory.getLogger(MoveFileAction.class);

    private String destination;

    public MoveFileAction withDestination(String destDir){
        destination = destDir;
        return this;
    }

    @Override
    public void execute(FileSystem fileSystem, Path filePath) throws IOException {
        Path destPath = new Path(destination, filePath.getName());
        LOG.info("Moving file {} to {}", filePath, destPath);
        boolean success = fileSystem.rename(filePath, destPath);
        return;
    }
}

If you are using Trident and sequence files you can do something like this:

        HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
                .withFileNameFormat(fileNameFormat)
                .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
                .withRotationPolicy(rotationPolicy)
                .withFsUrl("hdfs://localhost:54310")
                .addRotationAction(new MoveFileAction().withDestination("/dest2/"));

Support for HDFS Sequence Files

The org.apache.storm.hdfs.bolt.SequenceFileBolt class allows you to write storm data to HDFS sequence files:

        // sync the filesystem after every 1k tuples
        SyncPolicy syncPolicy = new CountSyncPolicy(1000);

        // rotate files when they reach 5MB
        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, Units.MB);

        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                .withExtension(".seq")
                .withPath("/data/");

        // create sequence format instance.
        DefaultSequenceFormat format = new DefaultSequenceFormat("timestamp", "sentence");

        SequenceFileBolt bolt = new SequenceFileBolt()
                .withFsUrl("hdfs://localhost:54310")
                .withFileNameFormat(fileNameFormat)
                .withSequenceFormat(format)
                .withRotationPolicy(rotationPolicy)
                .withSyncPolicy(syncPolicy)
                .withCompressionType(SequenceFile.CompressionType.RECORD)
                .withCompressionCodec("deflate");

The SequenceFileBolt requires that you provide a org.apache.storm.hdfs.bolt.format.SequenceFormat that maps tuples to key/value pairs:

public interface SequenceFormat extends Serializable {
    Class keyClass();
    Class valueClass();

    Writable key(Tuple tuple);
    Writable value(Tuple tuple);
}

Trident API

storm-hdfs also includes a Trident state implementation for writing data to HDFS, with an API that closely mirrors that of the bolts.

        Fields hdfsFields = new Fields("field1", "field2");

        FileNameFormat fileNameFormat = new DefaultFileNameFormat()
                .withPath("/trident")
                .withPrefix("trident")
                .withExtension(".txt");

        RecordFormat recordFormat = new DelimitedRecordFormat()
                .withFields(hdfsFields);

        FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(5.0f, FileSizeRotationPolicy.Units.MB);

       HdfsState.Options options = new HdfsState.HdfsFileOptions()
               .withFileNameFormat(fileNameFormat)
               .withRecordFormat(recordFormat)
               .withRotationPolicy(rotationPolicy)
               .withFsUrl("hdfs://localhost:54310");

        StateFactory factory = new HdfsStateFactory().withOptions(options);

        TridentState state = stream
                .partitionPersist(factory, hdfsFields, new HdfsUpdater(), new Fields());

To use the sequence file State implementation, use the HdfsState.SequenceFileOptions:

       HdfsState.Options seqOpts = new HdfsState.SequenceFileOptions()
               .withFileNameFormat(fileNameFormat)
               .withSequenceFormat(new DefaultSequenceFormat("key", "data"))
               .withRotationPolicy(rotationPolicy)
               .withFsUrl("hdfs://localhost:54310")
               .addRotationAction(new MoveFileAction().toDestination("/dest2/"));

License

Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

storm-hdfs's People

Contributors

ptgoetz avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

storm-hdfs's Issues

Implementing an opaque transactional HdfsState

Thanks much for this contrib.

First, could you confirm that the current HdfsState implementation is non-transactional state and so there is no guarantee that data gets written to HDFS exactly once?

Second, wanted your opinion on implementing an opaque transactional state for writes to HDFS:

A naive implementation of maintaining the state of the file as of previous batch separate from the current file will likely be expensive to implement without the support of file appends. For instance, in a sample implementation, every batch of writes will end up in its own file with no batching efficiencies for downstream consumers.

An alternative implementation could be to have the file f and the prev batch b as 2 separate files where f is always in an open state while b is written afresh and closed for every batch. The name of the file to store b could itself be the "previous tx id". When the current write happens with a different tx id, the execute() function reads b and writes to f. It then deletes b. A new file with the current tx id is created and this stores the current b.

At the time of rotation, b is read and written to f which is rotated away. File storing b is emptied since we will need the tx id for the next write.

When the current write happens with the same tx id as the previous attempt, then b is overwritten with the current batch's data. In particular, f is untouched.

Appreciate your feedback
Thanks
-Ranga

DelimitedRecordFormat fails to write appropriate fields to hdfs

DelimitedRecordFormat works only when all fields in tuple are written in the order they are emitted. (or only first field is to be written to HDFS). Writing selective fields from tuple does not work always.

Attached is patch

diff --git a/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java b/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java
index e7137e5..2bd3abb 100644
--- a/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java
+++ b/src/main/java/org/apache/storm/hdfs/bolt/format/DelimitedRecordFormat.java
@@ -77,7 +77,7 @@
Fields fields = this.fields == null ? tuple.getFields() : this.fields;
int size = fields.size();
for(int i = 0; i < size; i++){

  •        sb.append(tuple.getValue(i));
    
  •        sb.append(tuple.getValueByField(fields.get(i)));
         if(i != size - 1){
             sb.append(this.fieldDelimiter);
         }
    

storm-core.jar 0.9.1-incubating-SNAPSHOT is missing in apache maven repo?

Got the latest version (e7adf3f), unable to compile it with maven:
The POM for org.apache.storm:storm-core:jar:0.9.1-incubating-SNAPSHOT is missing, no dependency information available ... [ERROR] Failed to execute goal on project storm-hdfs: Could not resolve dependencies for project org.apache.storm:storm-hdfs:jar:0.0.2-SNAPSHOT: Could not find artifact org.apache.storm:storm-core:jar:0.9.1-incubating-SNAPSHOT

Same happens to 0.9.0.1:
[ERROR] Failed to execute goal on project storm-hdfs: Could not resolve dependencies for project org.apache.storm:storm-hdfs:jar:0.0.2-SNAPSHOT: Could not find artifact org.apache.storm:storm-core:jar:0.9.0.1 in central (http://repo.maven.apache.org/maven2)

Am I doing something wrong or the storm-core.jar is not yet in the repo?


Yet I have checked http://mvnrepository.com/search.html?query=storm-core and http://search.maven.org/
Found 0.9.0.1 at https://clojars.org/storm/storm-core, but the group id is old though: <groupId>storm</groupId>

MapReduce reads file currently being written to by "Storm-hdfs".

While the file is open for appending by the bolt, you could "hadoop fs -cat /file" to see the content whenever the hsync is called by the bolt, but the size of the file shows "0".

when you do mapreduce job hive query on the file, the file is not visible to the the hadoop job until it is closed (when bolt does the rotation)

What could be done to make it available to Hadoop job while it is being appended.?

Storm-HDFS is not working in HDP2.2

I am getting the following exception while executing a sample storm hdfs code in local mode

20311 [Thread-9-hdfs_bolt] WARN org.apache.hadoop.ipc.Client - Exception encountered while connecting to the server : java.lang.IllegalArgumentException: Failed to specify server's Kerberos principal name
20328 [Thread-15-test1 spout] INFO backtype.storm.daemon.task - Emitting: test1 spout default [i am at two with nature]
20345 [Thread-11-hdfs_bolt] WARN org.apache.hadoop.ipc.Client - Exception encountered while connecting to the server : java.lang.IllegalArgumentException: Failed to specify server's Kerberos principal name
20365 [Thread-9-hdfs_bolt] ERROR backtype.storm.util - Async loop died!
java.lang.RuntimeException: Error preparing HdfsBolt: Failed on local exception: java.io.IOException: java.lang.IllegalArgumentException: Failed to specify server's Kerberos principal name; Host Details : local host is: "xxxx.yyyy.com/00.00.00.001"; destination host is: "abc.yyyy.com":8020;
at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:96) ~[storm-hdfs-0.1.2.jar:na]
at backtype.storm.daemon.executor$fn__3441$fn__3453.invoke(executor.clj:692) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:461) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Thread.java:744) [na:1.7.0_45]

    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.7.0_45]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) ~[na:1.7.0_45]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.7.0_45]
    at java.lang.reflect.Method.invoke(Method.java:606) ~[na:1.7.0_45]
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) ~[hadoop-common-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) ~[hadoop-common-2.6.0.2.2.0.0-2041.jar:na]
    at com.sun.proxy.$Proxy15.create(Unknown Source) ~[na:na]
    at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1726) ~[hadoop-hdfs-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1668) ~[hadoop-hdfs-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1593) ~[hadoop-hdfs-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397) ~[hadoop-hdfs-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393) ~[hadoop-hdfs-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[hadoop-common-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393) ~[hadoop-hdfs-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337) ~[hadoop-hdfs-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908) ~[hadoop-common-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889) ~[hadoop-common-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786) ~[hadoop-common-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:775) ~[hadoop-common-2.6.0.2.2.0.0-2041.jar:na]
    at org.apache.storm.hdfs.bolt.HdfsBolt.createOutputFile(HdfsBolt.java:126) ~[storm-hdfs-0.1.2.jar:na]
    at org.apache.storm.hdfs.bolt.AbstractHdfsBolt.prepare(AbstractHdfsBolt.java:93) ~[storm-hdfs-0.1.2.jar:na]
    ... 4 common frames omitted

I was trying to execute as a java jar. with the following command-
java -cp storm-.1.jar:lib/*:lib/log4j.properties promo.storm.sample.StormSampleTopology

My lib folder has the following jar files-

1

java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)

2014-02-04 11:28:27 b.s.util [ERROR] Async loop died!
java.lang.RuntimeException: Error preparing HdfsBolt: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero).; Host Details : loc
al host is: "xxxx"; destination host is: "xxxx":8020;
at org.apache.storm.hdfs.trident.HdfsState.prepare(HdfsState.java:92) ~[stormjar.jar:na]
at org.apache.storm.hdfs.trident.HdfsStateFactory.makeState(HdfsStateFactory.java:27) ~[stormjar.jar:na]
at storm.trident.planner.SubtopologyBolt.prepare(SubtopologyBolt.java:52) ~[storm-core-0.9.0.1.jar:na]
at storm.trident.topology.TridentBoltExecutor.prepare(TridentBoltExecutor.java:214) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.daemon.executor$fn__3498$fn__3510.invoke(executor.clj:674) ~[storm-core-0.9.0.1.jar:na]
at backtype.storm.util$async_loop$fn__444.invoke(util.clj:401) ~[storm-core-0.9.0.1.jar:na]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]
at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37]

It is throwing error here, any idea.

try{
Configuration hdfsConfig = new Configuration();
this.fs = FileSystem.get(URI.create(this.options.fsUrl), hdfsConfig);
out = this.fs.create(new Path(this.options.path, this.options.fileNameFormat.getName(this.rotation, System.currentTimeMillis())));
} catch (Exception e){
throw new RuntimeException("Error preparing HdfsBolt: " + e.getMessage(), e);
}

Unable to set directory or filename dynamically based on the information in a tuple?

Hi there,
Thanks much for this contrib.
We kind of design a log aggregating system which collects logs from sources. The system puts all the log lines into a single Kafka topic. With the help of storm-kafka, we could consume each log line right now, but encounter a problem when we are going to transform each line into an HDFS file.

Sounds like storm-hdfs can only specify the directory and file name at the very first stage. We could not route different log lines from different log sources to different HDFS files.

by the way, we rewrote a whole framework like what you offered to by-pass this problem but ran into a performance issue when frequently appending and closing HDFS file, which made us give up.

Is there any plan that storm-hdfs is able to support this scenario in the future? Thanks!

Can you add a FileTimeRotationPolicy(3600, Units.SECONDS);

I want to rotate the file every 10 minutes regardless of the size. The rotated file will land in a HIVE table directory. This way I can have an SLA for data liveness when someone queries a hive table.

FileTimeRotationPolicy(600, Units.SECONDS); //For 10 minutes
FileTimeRotationPolicy(60, Units.MINUTES); //For One Hour

Unable to write in CDH hadoop distribution

Hi, Using the github project we are able to write into APACHE hadoop. But when we are trying to write in Clouderra hadoop hdfs its failing. Its creating the file but with zero bytes. We are not able to write anything in that file.
Exception :
2015-02-24 16:57:27 o.a.h.h.DFSClient [INFO] Exception in createBlockOutputStream
java.net.ConnectException: Connection timed out
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.6.0_31]
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:567) ~[na:1.6.0_31]
at org.apache.hadoop.net.SocketIOWithTimeout.connect(SocketIOWithTimeout.java:206) ~[hadoop-common-2.5.1.jar:na]
at org.apache.hadoop.net.NetUtils.connect(NetUtils.java:529) ~[hadoop-common-2.5.1.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java:1515) ~[hadoop-hdfs-2.3.0-cdh5.1.2.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1318) [hadoop-hdfs-2.3.0-cdh5.1.2.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1271) [hadoop-hdfs-2.3.0-cdh5.1.2.jar:na]
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:525) [hadoop-hdfs-2.3.0-cdh5.1.2.jar:na]
2015-02-24 16:57:27 o.a.h.h.DFSClient [INFO] Abandoning BP-284372577-192.168.192.1-1420735649037:blk_1073915189_174366
2015-02-24 16:57:27 o.a.h.h.DFSClient [INFO] Excluding datanode 192.168.192.6:50010

Add Trident MapState implementation

Question,
Got confused, It says that it is supposed to have three types of states,
non-transactional , transactional, opaque-transactional to work with the trident spouts to achieve the exact once semantic, so which one is this?

am I going to have duplicates in my state update? if so how should I implement the transactional? and also opaque-transactional doesn't apply this case right?

The yaml config file

The HdfsFileTopology to be run on local cluster needs a YAML config file. What is this YAML config file?
Also I tried running, for trial perposes ( just to see if i'm mentioning the arguments correctly?) the topology as
HdfsFileTopology test-topology something.yaml <, and it just ends and gives out the correct format to use the topology>

Usage: HdfsFileTopology [topology name] yaml config file

Getting build failure while building with storm-core 0.9.3

[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 23.296 s
[INFO] Finished at: 2016-12-19T14:13:57+05:30
[INFO] Final Memory: 8M/153M
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal on project chapter1: Could not resolve dependencies for project storm.blueprints:chapter1:jar:1.0-SNAPSHOT: Failed to collect dependencies at org.apache.storm:storm-core:jar:0.9.3: Failed to read artifact descriptor for org.apache.storm:storm-core:jar:0.9.3: Could not transfer artifact org.apache.storm:storm-core:pom:0.9.3 from/to central (https://repo.maven.apache.org/maven2): Connect to repo.maven.apache.org:443 [repo.maven
.apache.org/151.101.16.215] failed: Connection timed out: connect -> [Help 1]

I am using the below dependency

<dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>0.9.3</version>
</dependency>

A modified MoveFileRotation to not rotate zero files

MoveFileRotation rotates even 0-sized files. For non-continous streams and especially when microbatching, the number of 0-sized files can become problematic. A modification to the MoveFileRotation to delete 0-sized files instead of moving would be useful. Probably as a configurable option.

E.g.:
public void execute(FileSystem fs, Path filePath) throws IOException {
Path destPath=new Path(destination,filePath.getName());
long size=fs.getFileStatus(filePath).getLen();
if(size>0) {
LOG.info("Moving file {} to {}",filePath,destPath);
boolean success=fs.rename(filePath,destPath);
if(!success)
LOG.warn("Unable to move file {} to {}",filePath,destPath);
} else {
LOG.info("Deleting 0-sized file {}",filePath);
boolean success=fs.delete(filePath);
if(!success)
LOG.warn("Delete of file {} failed",filePath);
}
return;
}

Enable the Hdfs Compression type in Hdfs Bolt

My Requirement is need to compress Hdfs loaded data. when tried to implement the code by extending the HdfsBolt. it was compressed data but when i tried to decompress the data. i'm getting the this error java.lang.IllegalStateException: java.io.IOException: invalid block type. i think i'm doing mistake while compress the data. could you please help me to resolve this issue. my code for your review.

package com.eltyx.singlePart;

import java.io.IOException;
import java.util.EnumSet;
import java.util.Map;

import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.storm.hdfs.bolt.HdfsBolt;
import org.apache.storm.hdfs.bolt.format.RecordFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;

public class HdfsCompressionBolt extends HdfsBolt {

/**
 * 
 */
private static final long serialVersionUID = 7655170056529495962L;
private static final Logger LOG = LoggerFactory
        .getLogger(HdfsCompressionBolt.class);
private CompressionCodec codec;
private String compression_class;
private transient FSDataOutputStream __out;
private transient CompressionOutputStream output;
private long offset = 0;

public HdfsCompressionBolt withCompressionClass(
        Class<? extends CompressionCodec> classz) {
    this.compression_class = classz.getName();
    return this;
}

@SuppressWarnings("rawtypes")
@Override
public void doPrepare(Map conf, TopologyContext topologyContext,
        OutputCollector collector) throws IOException {
    super.doPrepare(conf, topologyContext, collector);
    Configuration confs = new Configuration();
    CompressionCodecFactory factory = new CompressionCodecFactory(confs);
    this.codec = factory.getCodecByClassName(this.compression_class);
}

@Override
public void execute(Tuple tuple) {
    try {
        RecordFormat __format = (RecordFormat) FieldUtils.readField(this,
                "format", true);
        __out = (FSDataOutputStream) FieldUtils
                .readField(this, "out", true);
        byte[] bytes = __format.format(tuple);
        synchronized (this.writeLock) {
            output = codec.createOutputStream(__out,
                    codec.createCompressor());
            output.write(bytes);
            this.offset += bytes.length;

            if (this.syncPolicy.mark(tuple, this.offset)) {
                if (this.__out instanceof HdfsDataOutputStream) {
                    ((HdfsDataOutputStream) this.__out).hsync(EnumSet
                            .of(SyncFlag.UPDATE_LENGTH));
                    this.output.flush();
                } else {
                    this.__out.hsync();
                }
                this.syncPolicy.reset();
            }
        }

        this.collector.ack(tuple);

        if (this.rotationPolicy.mark(tuple, this.offset)) {
            // this.output.close();
            rotateOutputFile(); // synchronized
            this.offset = 0;
            this.rotationPolicy.reset();
        }
    } catch (IOException e) {
        LOG.warn("write/sync failed.", e);
        this.collector.fail(tuple);
    } catch (IllegalAccessException e) {
        e.printStackTrace();
    }
}

}

Write to HDFS hive partition.

In HdfsState.java, would you please add a way to config to write dynamic path.
I need this to write to hdfs hive partition by day.

It is like
drwxr-xr-x - hdfs tbuser 0 2013-11-18 23:20 /staging/anvil/processed/parsed/access/dc=2013-11-18
drwxr-xr-x - hdfs tbuser 0 2013-11-19 23:15 /staging/anvil/processed/parsed/access/dc=2013-11-19
drwxr-xr-x - hdfs tbuser 0 2013-11-20 11:31 /staging/anvil/processed/parsed/access/dc=2013-11-20

Thanks

HdfsState.Options options = new HdfsState.Options()
.withFileNameFormat(fileNameFormat)
.withRecordFormat(recordFormat)
.withRotationPolicy(rotationPolicy)
.withFsUrl("hdfs://localhost:54310")
.withPath(path);

Graceful Handling of Topology Kill

I'm wondering if there are any good approaches to ensure that tuples are flushed to HDFS when the topology is killed? Under normal operation, the bolt flushes according to the sync policy, but it acks every tuple even if they haven't been yet flushed. If the topology is shut down (storm issues kill -9 during a normal shut down) or if the bolt worker dies, the tuples left in its buffer are left untracked.

a mahout proplem

i have installed and configuration mahout and hadoop.when i input 'mahout' in cmd line,it can output mahout algoritm. but when run some example exm kmeans, it have a proplem:
tput.fileoutputformat.compress.type
14/12/09 16:08:03 INFO client.RMProxy: Connecting to ResourceManager at master/10.67.238.65:8080
Exception in thread "main" java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.; Host Details : local host is: "master/10.67.238.65"; destination host is: "master":8080;
at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:764)
at org.apache.hadoop.ipc.Client.call(Client.java:1414)
at org.apache.hadoop.ipc.Client.call(Client.java:1363)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206)
at com.sun.proxy.$Proxy18.getNewApplication(Unknown Source)
at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationClientProtocolPBClientImpl.getNewApplication(ApplicationClientProtocolPBClientImpl.java:193)
at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:190)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:103)
at com.sun.proxy.$Proxy19.getNewApplication(Unknown Source)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.getNewApplication(YarnClientImpl.java:165)
at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createApplication(YarnClientImpl.java:173)
at org.apache.hadoop.mapred.ResourceMgrDelegate.getNewJobID(ResourceMgrDelegate.java:179)
at org.apache.hadoop.mapred.YARNRunner.getNewJobID(YARNRunner.java:230)
at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:357)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1285)
at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1282)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at org.apache.hadoop.mapreduce.Job.submit(Job.java:1282)
at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1303)
at org.apache.mahout.fpm.pfpgrowth.PFPGrowth.startParallelCounting(PFPGrowth.java:309)
at org.apache.mahout.fpm.pfpgrowth.PFPGrowth.runPFPGrowth(PFPGrowth.java:214)
at org.apache.mahout.fpm.pfpgrowth.PFPGrowth.runPFPGrowth(PFPGrowth.java:242)
at org.apache.mahout.fpm.pfpgrowth.FPGrowthDriver.run(FPGrowthDriver.java:137)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
at org.apache.mahout.fpm.pfpgrowth.FPGrowthDriver.main(FPGrowthDriver.java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.util.ProgramDriver$ProgramDescription.invoke(ProgramDriver.java:72)
at org.apache.hadoop.util.ProgramDriver.run(ProgramDriver.java:145)
at org.apache.hadoop.util.ProgramDriver.driver(ProgramDriver.java:153)
at org.apache.mahout.driver.MahoutDriver.main(MahoutDriver.java:194)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message end-group tag did not match expected tag.
at com.google.protobuf.InvalidProtocolBufferException.invalidEndTag(InvalidProtocolBufferException.java:94)
at com.google.protobuf.CodedInputStream.checkLastTagWas(CodedInputStream.java:124)
at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:202)
at com.google.protobuf.AbstractParser.parsePartialDelimitedFrom(AbstractParser.java:241)
at com.google.protobuf.AbstractParser.parseDelimitedFrom(AbstractParser.java:253)
at com.google.protobuf.AbstractParser.parseDelimitedFrom(AbstractParser.java:259)
at com.google.protobuf.AbstractParser.parseDelimitedFrom(AbstractParser.java:49)
at org.apache.hadoop.ipc.protobuf.RpcHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcHeaderProtos.java:2364)
at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1055)
at org.apache.hadoop.ipc.Client$Connection.run(Client.java:949)
master:~ #

HDFS run job:org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for ttprivate/taskTracker/root/jobcache/job_201410161548_0035/jobToken

master:/home/hadoop-1.1.2 # hadoop jar /home/mahout/mahout-distribution-0.8/mahout-examples-0.8-job.jar org.apache.mahout.clustering.syntheticcontrol.kmeans.Job
Warning: $HADOOP_HOME is deprecated.

14/12/10 18:50:23 INFO kmeans.Job: Running with default arguments
14/12/10 18:50:23 INFO kmeans.Job: Preparing Input
14/12/10 18:50:24 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
14/12/10 18:50:24 INFO input.FileInputFormat: Total input paths to process : 1
14/12/10 18:50:24 INFO util.NativeCodeLoader: Loaded the native-hadoop library
14/12/10 18:50:24 WARN snappy.LoadSnappy: Snappy native library not loaded
14/12/10 18:50:25 INFO mapred.JobClient: Running job: job_201410161548_0035
14/12/10 18:50:26 INFO mapred.JobClient: map 0% reduce 0%
14/12/10 18:50:26 INFO mapred.JobClient: Task Id : attempt_201410161548_0035_m_000002_0, Status : FAILED
Error initializing attempt_201410161548_0035_m_000002_0:
org.apache.hadoop.util.DiskChecker$DiskErrorException: Could not find any valid local directory for ttprivate/taskTracker/root/jobcache/job_201410161548_0035/jobToken
at org.apache.hadoop.fs.LocalDirAllocator$AllocatorPerContext.getLocalPathForWrite(LocalDirAllocator.java:381)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:146)
at org.apache.hadoop.fs.LocalDirAllocator.getLocalPathForWrite(LocalDirAllocator.java:127)
at org.apache.hadoop.mapred.TaskTracker.localizeJobTokenFile(TaskTracker.java:4492)
at org.apache.hadoop.mapred.TaskTracker.initializeJob(TaskTracker.java:1285)
at org.apache.hadoop.mapred.TaskTracker.localizeJob(TaskTracker.java:1226)
at org.apache.hadoop.mapred.TaskTracker$5.run(TaskTracker.java:2603)
at java.lang.Thread.run(Thread.java:722)

14/12/10 18:50:26 WARN mapred.JobClient: Error reading task outputhttp://slave2:50060/tasklog?plaintext=true&attemptid=attempt_201410161548_0035_m_000002_0&filter=stdout
14/12/10 18:50:26 WARN mapred.JobClient: Error reading task outputhttp://slave2:50060/tasklog?plaintext=true&attemptid=attempt_201410161548_0035_m_000002_0&filter=stderr

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.