colinmarc / hdfs Goto Github PK
View Code? Open in Web Editor NEWA native go client for HDFS
License: MIT License
A native go client for HDFS
License: MIT License
I got a command like this:
hadoop fs -Dfs.default.name=hdfs://abc.com:8080 -Dhadoop.job.ugi=admin,supergroup -ls /foo/bar/
but I couldn't find any ugi setting in code. Any hint?
Tools are picking up the latest release tag, which is Feb 2015.
Could you generate a new release tag ?
I've a hdfs with 2 HA namenodes. I use HDPCLUSTER as the fs.defaultFS. So How can I get a client using HDPCLUSTER ? I tried but failed due to 'missing port' error
Line 9 in 69e5098
As it currently stands, the completion of hdfs l
is hdfs ls
.
The completion of hdfs l /v
is hdfs l /var/
.
But moving the cursor back to l
in hdfs l /var/
does not complete l
to ls
.
Also any directory that has 1000s of files (my case /var/) will hang in autocomplete.
i need to interact with HDFS inside a private subnet in AWS over a secure jumphost and am currently doing that over a sock5 proxy.
i submitted #62 to be able to setup the namenode connection and pass it to the client.
this works for the cases where i only need to talk to the metadata server.
However, if i want to write to a file, the blockwriter opens a non proxy connection, which does not work for me:
func (bw *BlockWriter) connectNext() error {
address := getDatanodeAddress(bw.currentPipeline()[0])
conn, err := net.DialTimeout("tcp", address, connectTimeout)
if err != nil {
return err
}
...
here i need to setup the proxy again
dialer, err := proxy.SOCKS5("tcp", "localhost:8157", nil, proxy.Direct)
if err != nil {
panic(err)
}
conn, err := dialer.Dial("tcp", address)
if err != nil {
return err
}
I'd be happy to submit a patch but am not sure about what interface would be best here
the connection in both rpc.NewNamenodeConnection
and rpc.BlockWriter.connectNext()
are both using
conn, err := net.DialTimeout("tcp", address, connectTimeout)
so this could be generalized to use a
type DialerTimeout interface {
DialTimeout(network, address string, timeout time.Duration) (net.Conn, error)
}
or a
type DialerTimeoutFunc func(network, address string, timeout time.Duration) (net.Conn, error)
but that could not be set in NewNamenodeConnection
anymore before making the connection
Or, in NamenodeConnection
I could add this field
type NamenodeConnection struct {
...
BlockWriterDialTimeout func(network, address string, timeout time.Duration) (net.Conn, error)
...
and rpc.BlockWriter
could use it if not nil
or do you have a better suggestion?
Hi,
Are there any plans to add an fsck ? Similar functionality is provided by the hadoop client. Should be faster than using java client I feel.
@colinmarc I added this to my folk crozzy@ee32319, would you be interested in a PR?
Hi @colinmarc
It looks like the google code path is moved completely to https://github.com/golang/protobuf
package github.com/colinmarc/hdfs
imports code.google.com/p/goprotobuf/proto: unable to detect version control system for code.google.com/ path
Would you mind to change the import path? Thanks!
Howdy,
First of all, thanks for writing and maintaining this library, it's a boon. :)
I'm trying to design a unit-test for a pile of code that uses hdfs.Client
, such that it can run in an environment without an hadoop cluster (travis, jenkins, local laptop), and I'm curious your thoughts on how to do this, because I'm running up against some trouble on it because of the lack of interface definitions within the library. I can almost get around this by defining an interface which hdfs.Client
adheres to and then pointing gomock
at that, but there are certain things that Go's type system simply won't let me mock.
For example, if I define an interface which hdfs.FileReader implements, I cannot use mocks that implement that interface in the return arguments to client.Open()
since that returns the struct directly, rather than an interface which the struct implements:
func TestFetchFile(t *testing.T) {
assert := assert.New(t)
ctrl := gomock.NewController(t)
mock := client.New(t) // mock implementing the same interface as hdfs.Client
client := mock.Build()
// client.Open(url) should return an hdfs.FileReader, which I need a mock for, I think, to
// capture the calls within *that*
// hdfs.FileReader implements hdfsl.FileReaderIface
mock.Open().Stub(func(name string) (hdfsl.FileReaderIface, error) {
return mock_hdfs.NewMockFileReaderIface(ctrl), nil
})
}
This fails to compile as:
hdfs_test.go:24:19: cannot use func literal (type func(string) ("[redacted gopath]/hdfs".FileReaderIface, error)) as type client.openStub in argument to mock.Open().Stub
this is because openStub is of type type openStub func(arg0 string) (*hdfs.FileReader, error)
And so, I don't know how to generate mocks for this thing such that I can have a mock FileReader from which I can stub the read/write/etc calls. Is there a solution that I can do within my code to create mocks which satisfy the golang type system (my initial assumption is "no"), and if not, is it possible to define interfaces for the return types within the library and refer to them instead of the struct implementations?
Namenode will return error because client don,t renew lease
Are there currently any plans to implement a WriteAt func? I think this would be a great addition to this already awesome library. It seems like this could be accomplished behind the scenes through the use of the truncate feature supported by hdfs.
It is possible to write to a sequence file using this lib? I looked through file_writer but it wasn't obvious where this might be taking place.
Hey,
great library! Looks very promising. I can see there is CreateEmptyFile
method, but I can't see any other methods to write files to hdfs. Are there plans to add support for that, also for put/copyFromLocal/moveFromLocal/etc. CLI commands?
Sorry if I missed the note about that.
Thanks
in order to upload a large file from my hdfs, i cut the file into multi blocks, but the file has only one shared blockreader, so when one block finished uploading it will close the blockreader, but others should use it, this make me very confused.
Does it support concurrency?
In my test, CopyToLocal concurrently stored in the local data is partially wrong
According to the io.ReaderAt
documentation (https://golang.org/pkg/io/#ReaderAt). For the signature
ReadAt(p []byte, off int64) (n int, err error)
“ReadAt reads len(p) bytes into p starting at offset off in the underlying input source. It returns the number of bytes read (0 <= n <= len(p)) and any error encountered.
When ReadAt returns n < len(p), it returns a non-nil error explaining why more bytes were not returned. In this respect, ReadAt is stricter than Read.”
If you simply try playing with a file you'd quickly observe that this is not followed. Taking a real world example I have a file of size 280689401. I pass in p
of length 227743. But only 214 bytes are read. And the err
is nil
.
On "block_reader.go" when block is partially read error is returned as nil and ignored (line 83).
Error should be returned in order of further treatments. It is also returned as nil on "file_reader.go"
Hi, we hit a problem in block_writer
, that if local datanode fails, the write operation would fail with connection failure too.
I tried implement a similar failover for block write as block read. The blockWriter
can find the next available datanode, but always fails with Error from datanode: ERROR ()
, which is from readBlockOpResponse
.
Can you give me some suggestions on how to fix this? Or did I miss something in writeBlockWriteRequest
, so the client cannot response correctly?
Thanks
I am using the hdfs library in combination with windows and hadoop 2.7.3.
In Go I am using the below code:
nodeaddress := "USERNAME:59351" //is Node address obtained from the Node of cluster overview of node manager, port changes on each new run of hadoop yarn
fmt.Println("Nodeaddress: ", nodeaddress)
client, err := hdfs.New(nodeaddress)
checkErr(err)
file, err := client.Open("/input/file1.txt")
fmt.Println("Open:", file)
checkErr(err)
When I execute the script I get the below error:
Error: open /input/file1.txt: permission denied
I already tried to resolve it by adding:
<property> <name>dfs.permissions</name> <value>false</value> </property>
to hdfs-site.xml, but this didn't resolve the issue.
Any ideas how to overcome the permissions error? Thanks
When the namenode has HA enabled, and we try to connect to the namenode which now is standby, a org.apache.hadoop.ipc.StandbyException gets issued. In Hadoop configuration, one can specify more than one namenodes. This is however not possible here. Am I missing something ?
I might be missing something, but it seems that the convenience functions in "Client", like ReadFile()
, doesn't call Close()
on opened resources. Is this intentional or a bug?
How can I authenticate my application if my cluster is secured with Kerberos protocol?
When there is an RPC failure, the whole connection is closed, rendering the user unable to make any more calls until they make another call to hdfs.New().
This issue is actually mentioned with a TODO in the code itself: rpc/namenode.go:121
What would the desired behavior be? Keep the connection open? Recreate the connection? What happens if the reconnection fails?
i pulled the code,git version is d961456
when i use append api like:
w, err := client.Append(*hdfsPath)
if err != nil {
log.Println("append err", err)
return
}
_, err = w.Write([]byte(mockStr))
log.Println("write err", err)
w.Close()
then got err:
addBlock call failed with ERROR_APPLICATION (org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException
the name node err:
org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException: Not replicated yet: /usr/hadoop/test2222.txt
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3382)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3169)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2033)
but i use the hdfs client,it is ok:
./hdfs dfs -appendToFile ~/test.111 /usr/hadoop/test2223.txt
the hadoop version is 2.6.0
this issue is same as #61
any advice? thank you!
See #82 . Without this fix, files are uploaded 512 bytes at a time, which prevents us from using this library due to unacceptable performance.
What is the correct way to retry when errors such as EOF or broken pipe is encountered when writing?
I saw the comment in the code that when a failure is encountered, the stream is not currently recoverable by itself? So should I Close this stream and invoke client.Append() to get a new one?
Setting the HADOOP_USER_NAME variable isn't documented in the README. I'll open a PR momentarily.
Hi, we need a function to report the HDFS usage, and it is in junjieqian@1d47001 (use GetFsStatusRequestProto
). However, one problem I have is how to test this with travis? It seems impossible to do as the storage size changes in travis test from run to run.
Can you give me some hints or suggestions on this issue?
hi colinmarc :
Thank you for your open source
I have a problem when for
append content to hdfs
example:
writer, err := client.Create(fileName)
writer.Close()
for i := 0; i < 10000; i++ {
writer, err := client.Append(fileName)
n, err := writer.Write([]byte("\nbar"))
writer.Close()
}
then nameNode log error:
2016-12-01 01:42:20,828 INFO org.apache.hadoop.ipc.Server: IPC Server handler 7 on 9000,
call org.apache.hadoop.hdfs.protocol.ClientProtocol.append from 192.168.10.209:60494
Call#2 Retry#-1: org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException:
Failed to APPEND_FILE /tmp/x/15.txt for go-hdfs-MHz41XTUfY3nheHd on 192.168.10.209 because this file lease is currently owned
by go-hdfs-sm0fZSGAQA0uvyqR on 192.168.10.209
and golang error:
file_writer_test.go:323: err: append /tmp/ab/15.txt: append call failed with
ERROR_APPLICATION (org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException)
hadoop version : 2.7.3
Single Node Cluster
hdfs-site.xml
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.webhdfs.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.permissions.enabled</name>
<value>false</value>
</property>
</configuration>
when i use script
#!/bin/bash
for i in {1..10000}
do
hdfs="hdfs dfs -appendToFile ~/tmp/1 /data/2016-12-01/2016-12-01.tmp1"
eval ${hdfs}
echo "Welcome $i times"
done
this way ok . no error
I don't know how to solve this problem .
do you have any idea ?
thanks
The timeouts for RPC do not seem to be configurable. From rpc.go
:
var (
connectTimeout = 1 * time.Second
namenodeTimeout = 3 * time.Second
datanodeTimeout = 3 * time.Second
)
These values are rather low (especially the connection timeout), and I have been experiencing quite a lot of timeouts.
Being able to configure these would be very helpful. This does not have to be per client -- a global setting would be fine too.
I've got the following error when writing to hdfs:
wwrite file error read tcp 192.168.0.38:39424->192.168.0.39:50010: read: connection reset by peer. filename:hdfs://192.168.0.39:9000/user/am/scan_task/2017-08-14/192.168.0.38_audience_f/user-bak201-20170814115110.log
write file error EOF. filename:hdfs://192.168.0.39:9000/user/am/scan_task/2017-08-14/192.168.0.38_audience_f/user-bak241-20170814115110.log
Could you please help to investigate? I saw no error in the data node log. And not every attempt of writing will raise this error
Line 32 in 1a41554
$ hdfs 'complete'
ls rm mv mkdir touch chmod chown cat head tail du checksum get getmerge
$ hdfs 'complete' 'l'
ls rm mv mkdir touch chmod chown cat head tail du checksum get getmerge
$ hdfs 'complete' 'l l'
ls rm mv mkdir touch chmod chown cat head tail du checksum get getmerge
$ hdfs 'complete' 'l /v'
ls rm mv mkdir touch chmod chown cat head tail du checksum get getmerge
$ hdfs 'complete' 'l /v' /v
ls rm mv mkdir touch chmod chown cat head tail du checksum get getmerge
$ hdfs 'complete' 'l l /v'
/var/
Directories are currently created with a mode which makes them unusable:
cmd/hdfs/get.go: err = os.Mkdir(fullDest, 0644)
This results in drw-r--r-- perms, which are good for basically nothing :)
Please update to 0755, or better yet base it on umask setting.
Cheers!
in my case I use a Socks proxy to access hdfs. I can already use the rpc.WrapNamenodeConnection
to create an *rpc.NamenodeConnection
but I cannot use it for the client.
This would work:
func NewForNamenode(namenode *rpc.NamenodeConnection) *Client {
return &Client{namenode: namenode}
}
If if fits the project should I submit a PR ?
I found "CopyToRemote()" will call Create, and get an error return telling that the file is already exist, I recommand to add a function 'CreateAnyway()', it will firstly remove the existed file, and create a new for operating. We can call it in "CopyToRemote()" instead of calling "Create()"
func (c *Client) CreateAnyway(name string) (*FileWriter, error) {
_, err := c.getFileInfo(name)
if err == nil {
if err = c.Remove(name); err != nil {
return nil, err
}
} else if !os.IsNotExist(err) {
return nil, &os.PathError{"createAnyway", name, err}
}
defaults, err := c.fetchDefaults()
if err != nil {
return nil, err
}
replication := int(defaults.GetReplication())
blockSize := int64(defaults.GetBlockSize())
return c.CreateFile(name, replication, blockSize, 0644)
}
package main
import (
"fmt"
"github.com/colinmarc/hdfs"
)
func main() {
client, err := hdfs.New("192.168.0.38:50070")
fs, err := client.ReadDir("/")
fmt.Println(err)
fmt.Println(fs)
}
err is unexpected EOF And I found that the error occurs in
func (c *NamenodeConnection) readResponse(method string, resp proto.Message) error {
...
_, err = io.ReadFull(c.conn, packet)
...
}
I'm using hadoop 2.7.3
It would be useful to be able to put from stdin.
For example:
tar -cz path/ | hdfs put - /path/tar.gz
The way cmd/hdfs/put.go
is structured makes this a little tricky, but I think I can refactor this.
hdfs ls /path/to/file*
and hdfs get /path/to/file* /local/dir/
would be amazing.
I am trying to upload a ~500MB file to HDFS running on another node. This operation seems to take more than 10-15 minutes. Is this expected?
Hey @colinmarc
Thank you for your great library, our hadoop cluster using HA, when i use client like this:
client, err := hdfs.NewForUser("AutoCluster:8020", "user")
the error is:
dial tcp 220.194.103.40:8020: getsockopt: connection timed out
please help me.
Are you supporting Kerberos, if not, is there any plan to introduce this functionnality.
d.
Hi, we would like to have random write feature with hdfs, and so truncate
is needed. I implemented the truncate
feature, and tests pass on local machine. But it fails on Travis CI with error truncate call failed with ERROR_NO_SUCH_METHOD (org.apache.hadoop.ipc.RpcNoSuchMethodException)
. Could anyone give me some hint on this?
Symphom: "too many open files" error
Repro Steps: open large file and do a bunch of ReadAt() at random positions in a loop
Details: block reader isn't closed properly, so active TCP connections might grow with the high rate. Go garbage collector doesn't "feel" memory pressure - so it isn't cleaning them up on time leading to quick exhausting of available fds.
#12 says it was added
Is there a minimum supported Hadoop version here? When I run on my CentOS 6.3 Hadoop 1.0.3 pseudo distributed mode, hdfs
gives the following error:
$ HADOOP_NAMENODE=127.0.0.1:9000 hdfs ls /
stat /: write tcp 127.0.0.1:33202->127.0.0.1:9000: write: broken pipe
or
$ HADOOP_NAMENODE=localhost:9000 hdfs ls /
stat /: EOF
However, both hadoop dfs -ls hdfs://localhost:9000/
and hadoop dfs -ls hdfs://127.0.0.1:9000/
work.
Hi All,
I see that when one opens a file to append, we do a addNewBlock. This does not append to old block at all. It would be better of to append to the old block and when it gets filled go to the new block.
Thanks,
Murali
how could i write streams to hdfs
Is there anyway I can have a equivalent to hdfs dfs -cp <src-url> <dest-url>
(as found here) in the library?
As noted in the specified command:
It is a tool used for large inter/intra-cluster copying
As of now, I did not find anyway to implement it using the library, without having to extract the data to my local environment.
Hi,
the source tree doesn't contain a LICENSE
or COPYING
file, and none of the files have a copyright header. Who owns the copyright on this code and under what license is it being published?
Thanks :)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.