Git Product home page Git Product logo

kafka-connect-fs's People

Contributors

grantatspothero avatar kylecbrodie avatar mmolimar avatar schhor avatar scottypate avatar symbianx avatar zhangyuan avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

kafka-connect-fs's Issues

New Files not getting picked up when the connectors are running.

I tested running this connector in my local machine in distributed mode. I configured 2 different rest endpoint 8083 and 8084. The "fs.uris" given is a local file directory from where it should pick.
There are sub directories from where the data should be read and posted to kafka topic.
When I created new sub-directory, the connector read that folder,
but rather it was read by connector instanceB when i stopped connector instanceA.
The continuous polling is not happening.
Please advice if iam missing something

Kafka with Kerberos

Is it possible to use kafka-connect-fs to read files from kerberized hdfs and write to kerberized Kafka?

Specifying header names for files with no header row

I can specify header names in file_reader.delimited.settings.header_names but it only works when file_reader.delimited.settings.header = true (ie. when the file has a header row already).

For files with no header row (ie. file_reader.delimited.settings.header = false), the file_reader.delimited.settings.header_names setting is ignored (instead the schema always uses 'column_1, column_2`...).

Expected behaviour:
I should be able to specify file_reader.delimited.settings.header = false and file_reader.delimited.settings.header_names=Foo,Bar,..., and the schema should use the header names Foo,Bar instead of column_1,column_2

Batching Support

Hello,

We're using this to restore messages from S3 and it's working great for 100k messages. However, we noticed the connector will read all the S3 files before starting to send the messages to the topics. We're afraid of running into memory limitations when we reach higher numbers of messages.

I was wondering if it would be possible to add batching support to this connector?

Out of Memory Error at too many files

Hi all,

i get following the connector try to read the files from a public ftp that has too many files (c.a. 470), which are updated every 10 minutes.

java.lang.OutOfMemoryError: Java heap space
[2018-04-27 20:06:37,662] ERROR Uncaught exception in thread 'kafka-producer-network-thread | producer-3': (org.apache.kafka.common.utils.KafkaThread)

Is there any configuration point to control how many files to read at one poll ?

Duplicates messages in Topics for HDFS

I have a test setup (landoop, coyote, hadoop docker) for 2 messages written to HDFS, and try to read it again with this setup:

{
  "name": "hdfs-source",
  "config": {
    "connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector",
    "tasks.max": "1",
    "fs.uris": "hdfs://hadoop:9000/topics/net.gutefrage.events/",
    "topic": "net.gutefrage.replayed-events",
    "policy.class": "com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy",
    "policy.sleepy.sleep": "10000",
    "policy.recursive": "true",
    "policy.regexp": ".*\\.avro",
    "file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader"
  }
}

Update: The problem does not occur with the Simple Policy.

It works, but instead of the expected 2 messages I see 22: 20 duplicates of the 2 messages. is this a bug or a misconfiguration?

Masking on the fly

Hi,
I have a requirement where I need to stream files in HDFS and put it in kafka topics. On the fly also I need to mast certain fields in each record in the file.
If I have to do so in which file and which method shall I have to do the change needed

Policy class interface is not a subclass

Hello Team,

I have successfully setup the connector with below config,

{
"connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector",
"fs.uris": "file:////var/kafka/connect/log",
"policy.regexp": "connect-worker.log",
"tasks.max": "2",
"policy.class": "com.github.mmolimar.kafka.connect.fs.policy.Policy",
"name": "POC_FsSourceConnector",
"topic": "kafka.connectfilepulse.source.poc",
"file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.FileReader",
"policy.recursive": "true"
}

After that when the checked the status, connector is running but the tasks are failed with below error,

"trace": "org.apache.kafka.connect.errors.ConnectException: Couldn't start FsSourceTask due to configuration error: Policy class interface com.github.mmolimar.kafka.connect.fs.policy.Policy is not a subclass of interface com.github.mmolimar.kafka.connect.fs.policy.Policy

Could you please look into the error and suggest any.

Thanks
Mac

Got UnsupportedFileSystemException when using SFTP

Below is my configuration

name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=sftp://root:password@<MY_IP_ADDR>:22/home/test
topic=connect-fs-source
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy
policy.recursive=true
policy.regexp=^.*\.txt$
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader

But I got this:

[2017-12-18 17:21:28,093] ERROR Couldn't start FsSourceConnector: (com.github.mmolimar.kafka.connect.fs.FsSourceTask:57)
org.apache.hadoop.fs.UnsupportedFileSystemException: No FileSystem for scheme "sftp"
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3220)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3240)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:121)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3291)
        at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:3265)
        at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:523)
        at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.configFs(AbstractPolicy.java:69)
        at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.<init>(AbstractPolicy.java:50)
        at com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy.<init>(SimplePolicy.java:11)
...

S3 source connector

Hi,

I'm trying to use kafka-connect-fs to source from AWS S3 and I have an issue.

here my configuration file

name=AIH_SourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=s3://XXXX/XXXX/
policy.fs.fs.s3a.access.key=XXXX
policy.fs.fs.s3a.secret.key=XXXX
topic=AIH_SOURCE
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy
policy.recursive=false
policy.regexp=.*
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader
file_reader.delimited.token=;
file_reader.delimited.header=true

but when I check the status of my connector with ./confluent status connectors, I have this response:

{  
   "name":"AIH_SourceConnector",
   "connector":{  
      "state":"RUNNING",
      "worker_id":"10.0.0.166:8083"
   },
   "tasks":[  
      {  
         "state":"FAILED",
         "trace":"org.apache.kafka.connect.errors.ConnectException: A problem has occurred reading configuration:org/apache/commons/lang/reflect/ConstructorUtils\n\tat com.github.mmolimar.kafka.connect.fs.FsSourceTask.start(FsSourceTask.java:58)\n\tat org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:157)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat java.lang.Thread.run(Thread.java:748)\n",
         "id":0,
         "worker_id":"10.0.0.166:8083"
      }
   ],
   "type":"source"
}

here a focus on the stacktrace

org.apache.kafka.connect.errors.ConnectException: A problem has occurred reading configuration:org/apache/commons/lang/reflect/ConstructorUtils\n\t
at com.github.mmolimar.kafka.connect.fs.FsSourceTask.start(FsSourceTask.java:58)\n\t
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:157)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\t
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\t
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\t
at java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\t
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\t
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\t
at java.lang.Thread.run(Thread.java:748)\n

Do I miss something I my configuration ?

There is a way to better see what happens (log) ?

Additional information:

  • I have package your plugins using mvn clean package from master branch
  • I have install your plugins by adding the generated jar on my kafka confluent platform and it is list as installed connector.
  • I am using Open source Confluent Kafka 4.0.0
  • I have try to change my fs.uris=s3://XXXX/XXXX/ into fs.uris=s3a://XXXX/XXXX/, but nothing better.
  • I am pretty new to Kafka.

Thanks

NullPointerException when using FTP uri

Trying to use the kafka-connect-fs connector to access csv files on a FTP server, however receiving the following

org.apache.kafka.connect.errors.ConnectException: An error has occurred when creating reader for file: ftp://user:password@ftpserver:21/csv/1.csv
at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:212)
at com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy.offer(SleepyPolicy.java:11)
at com.github.mmolimar.kafka.connect.fs.FsSourceTask.lambda$poll$0(FsSourceTask.java:82)
at java.util.ArrayList.forEach(ArrayList.java:1249)
at com.github.mmolimar.kafka.connect.fs.FsSourceTask.poll(FsSourceTask.java:79)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:163)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException

It looks like that the "setWorkingDirectory" method in the FTPFileSystem.class does not set the WorkingDirectory and defaults to root "/" . This results in the filter in AbstractPolicy.offer not returning a current FileSystem.

Kafka-connect-fs with Azure Data Lake

Hello,

I am currently trying to modify this connector to work with Azure Data Lake (which is based on HDFS). I did add the missing dependencies in pom.xml

<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-azure-datalake</artifactId>
  <version>${hadoop.version}</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-azure</artifactId>
  <version>${hadoop.version}</version>
</dependency>

My current configuration looks something like this

{
    "name": "uk-hdfs-se-search-metrics-source",
    "config": {
          "topic": "se.userprofile",
          "connector.class": "com.github.mmolimar.kafka.connect.fs.FsSourceConnector",
          "tasks.max": "1",
          "fs.uris": "adl://<datalakeinstance>.azuredatalakestore.net/<some_path/",
          "file_reader.class": "com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader",
          "file_reader.delimited.token": ",",
          "file_reader.delimited.header": true,
          "policy.class": "com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy",
          "policy.recursive": "true",
          "files_2_read_in_a_record": 15,
          "policy.fs.fs.adl.impl": "org.apache.hadoop.fs.adl.AdlFileSystem",
          "hadoop.conf.dir": "/tmp/hdfs"
     }
}

I built this project, deployed it in my local Confluent Kafka deployment in the /share/java/kafka-connect-fs/ as it is supposed to be done.

Did curl -X PUT -H "Content-Type: application/json" --fail --data "$config" http://localhost:8083/connectors/$connector/config to instantiate it.
But, I get the following error:

java.io.IOException: Password fs.adl.oauth2.client.id not found
	at org.apache.hadoop.fs.adl.AdlFileSystem.getPasswordString(AdlFileSystem.java:997)
	at org.apache.hadoop.fs.adl.AdlFileSystem.getConfCredentialBasedTokenProvider(AdlFileSystem.java:279)
	at org.apache.hadoop.fs.adl.AdlFileSystem.getAccessTokenProvider(AdlFileSystem.java:258)
	at org.apache.hadoop.fs.adl.AdlFileSystem.initialize(AdlFileSystem.java:163)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3242)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:121)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3291)
	at org.apache.hadoop.fs.FileSystem$Cache.getUnique(FileSystem.java:3265)
	at org.apache.hadoop.fs.FileSystem.newInstance(FileSystem.java:523)
	at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.configFs(AbstractPolicy.java:73)
	at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.<init>(AbstractPolicy.java:50)
	at com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy.<init>(SimplePolicy.java:11)

I did something similar a couple of months ago with the Confluent HDFS sink connector and that one had the property "hadoop.conf.dir": "/tmp/hdfs" where I defined the core-site.xml and hdfs-site.xml files with my credentials like it is specified in https://hadoop.apache.org/docs/current/hadoop-azure-datalake/index.html
Is there someway to define it for this connector also?

The major part of the credentials config is done in core-site.xml which looks like this

fs.adl.oauth2.access.token.provider.type
ClientCredential

fs.adl.oauth2.refresh.url TOKEN ENDPOINT FROM STEP 7 ABOVE fs.adl.oauth2.client.id CLIENT ID FROM STEP 7 ABOVE fs.adl.oauth2.credential PASSWORD FROM STEP 7 ABOVE

Are these supposed to be defined as "fs.fs.adl.oauth2.access.token.provider.type": "some value" (etc) in my JSON?

Kind regards,
Tudor

Exception: java.lang.OutOfMemoryError due to a huge text file

Hi, I get the same error "Exception: java.lang.OutOfMemoryError" when to send a 28Mb text file.
Please find all details,

[2018-05-17 15:07:44,292] INFO Processing records for file [path = file:/tmp/testFile/t.txt, length = 29288895, blocks = [[offset = 0, length = 29288895, corrupt = false]]] (com.github.mmolimar.kafka.connect.fs.FsSourceTask:73)

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "kafka-producer-network-thread | producer-1"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "pool-1-thread-1"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "org.eclipse.jetty.server.session.HashSessionManager@48ef6327Timer"

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "pool-3-thread-1"

I would like to know the limit of size of file. Thank you a lot.

Error while running command to get file permissions

Hi,

java.lang.RuntimeException: Error while running command to get file permissions : 
ExitCodeException exitCode=2: ls: cannot access /path/to/file: No such file or directory

Encountering this issue where it seems that if a file is created in the directory and later removed, the above error is thrown from filesToProcess(). The connector then goes into a failed state and does not resume.
Am using sleepy policy with text file reader. Is there any configuration to set for minimum age of file or for retrying upon this particular error thrown?

Thanks.

a question regarding offset management

Hi Mario,

I wonder while ingesting a file and somehow the code crashed, when it resumes, how kafka-connect-fs knows which line to restart with? Will it write duplicated lines into Kafka?

Support ByteArrayConverter

ByteArrayConverter

My case is to use FileTextReader to mirroring files from s3, but unfortunately if I set kafka-connect's setting with

CONNECT_KEY_CONVERTER=org.apache.kafka.connect.converters.ByteArrayConverter
CONNECT_VALUE_CONVERTER=org.apache.kafka.connect.converters.ByteArrayConverter

this connector would raise error

kafka-connect_1     | [2017-12-22 06:39:10,094] ERROR WorkerSourceTask{id=s3-source-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
kafka-connect_1     | org.apache.kafka.connect.errors.DataException: Invalid schema type for ByteArrayConverter: STRUCT
kafka-connect_1     | 	at org.apache.kafka.connect.converters.ByteArrayConverter.fromConnectData(ByteArrayConverter.java:39)
kafka-connect_1     | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220)
kafka-connect_1     | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187)
kafka-connect_1     | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
kafka-connect_1     | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
kafka-connect_1     | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
kafka-connect_1     | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
kafka-connect_1     | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
kafka-connect_1     | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
kafka-connect_1     | 	at java.lang.Thread.run(Thread.java:745)

Just asking if it gonna support it ? or do I missing something here ?

Unable to connect to remote host

Whenever I supply the "fs.uri" for the connector, I am limited to the local file system with a file URI of "file:///...", otherwise it throws an illegal argument exception:

java.lang.IllegalArgumentException: Wrong FS: file://hostname/path expected: file:///
...

Is there any way to supply a URI that points to an external file system? (e.g., remote host)

Have looked at similar posts:
https://stackoverflow.com/questions/32078441/wrong-fs-expected-file-when-trying-to-read-file-from-hdfs-in-java

Do I need to supply additional configuration as above in the class files?

java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map; (org.eclipse.jetty.util.component.AbstractLifeCycle:212)

I am seeing the java.lang.NoSuchMethodError: javax.ws.rs.core.Application.getProperties()Ljava/util/Map; (org.eclipse.jetty.util.component.AbstractLifeCycle:212), when executing the below.

/usr/bin/connect-standalone /etc/schema-registry/connect-avro-standalone.properties /etc/kafka/kafka-connect-fs.properties

I am not sure if this can be the cause but I checked under package and have v1.9 of both jersey-core & jersey-client jars.

Thanks.

not able to use S3 source connector

Hi,
I am seeing below issue when running connector. and here is my configuration, I am not using Hadoop configuration in my config but still its failing with Hadoop error, can someone please help me resolve this issue?

config -

   "name": 'test_s3_source',"config": {
        "connector.class" : "com.github.mmolimar.kafka.connect.fs.FsSourceConnector",
        "tasks.max":  1,
        "topic":  "test.s3.source",
        "fs.uris" : "s3://s3-bucket",
        "policy.class" : "com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy",
        "policy.regexp" : ".* \.json$",
        "file_reader.class" : "com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader"
       }

Error -

Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: A problem has occurred reading configuration: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.start(FsSourceTask.java:68)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration
	at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.make(ReflectionUtils.java:36)
	at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.makePolicy(ReflectionUtils.java:24)
	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.start(FsSourceTask.java:61)
	... 8 more
Caused by: java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration

Error reading file from FTP

PFB configuration file:-
name=ftp-source-connector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=sftp://<user_name>:<pass_word>@ip:<remote_directory>
topic=ftp_test
policy.fs.fs.sftp.impl=org.apache.hadoop.fs.sftp.SFTPFileSystem
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
policy.sleepy.sleep=100000
policy.recursive=false
policy.regexp=..csv$
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader
file_reader.delimited.token=,
file_reader.delimited.header=true

PFB the error :-
ERROR Error reading file from FS: <sftp_location>. Keep going... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:79)
org.apache.kafka.connect.errors.ConnectException: An error has occurred when creating reader for file: sftp://username:password@ip:directory_path
at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:208)
at com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy.offer(SleepyPolicy.java:11)
at com.github.mmolimar.kafka.connect.fs.FsSourceTask.lambda$poll$0(FsSourceTask.java:72)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at com.github.mmolimar.kafka.connect.fs.FsSourceTask.poll(FsSourceTask.java:71)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.lambda$make$0(ReflectionUtils.java:28)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545)
at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260)
at java.util.stream.ReferencePipeline.toArray(ReferencePipeline.java:438)
at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.make(ReflectionUtils.java:28)
at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.makeReader(ReflectionUtils.java:19)
at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:205)
... 13 more

multi Topics implementation

Hi Molimar

I am doing a POC
Here is my setup. I am getting data from mysql->kafkconnect->potgresql->kafkaconnect->elasticsearch->kafkaconnect->hdfs->kafkaconnect->postgresql.

I have 2 tables and thus 2 separate topics for each table. i wanted to try something below ( bold mark)
name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=hdfs://localhost:9000//topics/eselastic-poctabbackup/partition=0/,hdfs://localhost:9000//topics/eselastic-poctab1backup/partition=0/
topic=hdfs-eselastic-poctabbackup,hdfs-eselastic-poctabbackup1

policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy
policy.recursive=true
policy.regexp=.*
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader

In short, I wanted to check if multitopic can be controlled from the same configuration files like other connectors..
Example: HDFS SINC

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=eselastic-poctabbackup,eselastic-poctab1backup
hdfs.url=hdfs://locahohost :9000/
flush.size=1

Task fails with: java.lang.NoClassDefFoundError: io/confluent/connect/avro/AvroData

Hello, first of all thank you for the amazing work put into this connector :)

We've been trying to get this the connector to work with files from S3, these files were pushed there with the data formatted in Avro format by the Confluent S3 Sink connector.

We were able to get the connector to talk to S3 and it finds the files there, however we're getting the following error now. Looks like some jar is missing from the classpath.

 [2020-03-23 15:55:37,167] ERROR Error reading file from FS: s3a://some-bucket/topics/some-file+0+0000000236.avro. Keep going... (com.github.mmolimar.kafka.connect.fs.FsSourceTask:80) 
 org.apache.kafka.connect.errors.ConnectException: An error has occurred when creating reader for file: s3a://some-bucket/topics/some-file+0+0000000236.avro
  at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:208)
  at com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy.offer(SleepyPolicy.java:11)
  at com.github.mmolimar.kafka.connect.fs.FsSourceTask.lambda$poll$0(FsSourceTask.java:74)
  at java.util.ArrayList.forEach(ArrayList.java:1257)
  at com.github.mmolimar.kafka.connect.fs.FsSourceTask.poll(FsSourceTask.java:71)
  at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:244)
  at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:220)
  at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
  at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
  at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
  at java.util.concurrent.FutureTask.run(FutureTask.java:266)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
  at java.lang.Thread.run(Thread.java:748)
 Caused by: java.lang.NoClassDefFoundError: io/confluent/connect/avro/AvroData
  at com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader$GenericRecordToStruct.<init>(AvroFileReader.java:107)
  at com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader.<init>(AvroFileReader.java:32)
  at sun.reflect.GeneratedConstructorAccessor51.newInstance(Unknown Source)
  at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
  at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.make(ReflectionUtils.java:31)
  at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.makeReader(ReflectionUtils.java:19)
  at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:205)

We built the connector with:
mvn package -f /tmp/connect-fs-plugin/kafka-connect-fs-0.2

Copied the whole folder /tmp/connect-fs-plugin/kafka-connect-fs-0.2/target/kafka-connect-fs-0.2-package/share/java/kafka-connect-fs into the plugins folder of kafka-connect, shouldn't that include all the dependencies?

Offset management + seek performance

I noticed that the file reader implementations seem to implement the seekFile operation by iterating over each line in the file and throwing it away. This is expensive since every time a file is scanned by the policy, the entire file's contents will be pulled over the network.

https://github.com/mmolimar/kafka-connect-fs/blob/master/src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/TextFileReader.java#L113-L125

Instead, if we implement seekFile using the fsdatainputstream.seek method, then we can only pull out the data that has been appended since the last commit of offsets. For the common case of immutable files, this will be nothing, and thus we will get a significant performance boost.

https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/filesystem/fsdatainputstream.html#Seekable.seek.28s.29

(the hadoop s3 filesystem implementation of seek uses RANGE headers to selectively pull only the requested bytes)
https://hadoop.apache.org/docs/r2.5.2/api/org/apache/hadoop/fs/s3/package-summary.html

The catch:
the seek interface takes a byte in the file not a line number, so to do this instead of storing a line number as our offset in kafka connect we would have to store a byte offset. This would obviously break backwards compatibility of the connector, but it would come at a significant performance improvement.

Is there a reason offsets are stored as line numbers and not bytes/is there a problem you ran into along the way? Would you be open to this change if I were to contribute it?

Thanks for all your work on the connector, it has been super helpful so far.

DataException: Not a struct schema: Schema{ARRAY}

I have a confusing error while processing json files (record-per-line format). See stacktrace below.
We're used to process this kind of json with jackson, without any specific DeserializationFeature on jackson. The only difference is the record-per-line format, but this does not seem related to this error anyway.

Any clue ?
(i'm using v1.0.0)

 org.apache.kafka.connect.errors.DataException: Not a struct schema: Schema{ARRAY}
 	at org.apache.kafka.connect.data.Struct.<init>(Struct.java:53)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.mapValue(JsonFileReader.java:197)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.lambda$mapValue$2(JsonFileReader.java:207)
 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
 	at java.util.ArrayList$Itr.forEachRemaining(ArrayList.java:899)
 	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
 	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.mapValue(JsonFileReader.java:208)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.lambda$mapValue$1(JsonFileReader.java:201)
 	at java.util.Iterator.forEachRemaining(Iterator.java:116)
 	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.mapValue(JsonFileReader.java:200)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.lambda$mapValue$1(JsonFileReader.java:201)
 	at java.util.Iterator.forEachRemaining(Iterator.java:116)
 	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.mapValue(JsonFileReader.java:200)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.lambda$toStruct$0(JsonFileReader.java:161)
 	at java.util.Iterator.forEachRemaining(Iterator.java:116)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.toStruct(JsonFileReader.java:160)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.apply(JsonFileReader.java:153)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.JsonFileReader$JsonToStruct.apply(JsonFileReader.java:149)
 	at com.github.mmolimar.kafka.connect.fs.file.reader.AbstractFileReader.next(AbstractFileReader.java:63)
 	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.lambda$poll$0(FsSourceTask.java:83)
 	at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
 	at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:175)
 	at java.util.Iterator.forEachRemaining(Iterator.java:116)
 	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
 	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
 	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
 	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
 	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
 	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
 	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.poll(FsSourceTask.java:92)
 	at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:270)
 	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:237)
 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 	at java.lang.Thread.run(Thread.java:748)

Question: Does this handle processing files in real-time or near real-time?

I would like to know if this connector handles processing files while they are being written to?
For example: if I have a file.txt and I am continuously writing to it and want the records to be processed as soon as they are written line by line?
I would appreciate the answer to this question, thank you in advance.

you may get stale offsets value from context.offsetStorageReader()

This project is awesome, I'd like to thank you at the first place.

I found that in each execution of poll() method, you get a FileReader relying on context.offsetStorageReader. However the offsetStorageReader only gives the committed offsets. And by default the offsets are only committed every 60 seconds (OFFSET_COMMIT_INTERVAL_MS_CONFIG).

FileReader reader = policy.offer(metadata, context.offsetStorageReader());

This means in the current execution of poll(), you may return List with latest offsets, but in the next execution of poll(), you mostly get old values, since the offset hasn't been committed. (see org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter, org.apache.kafka.connect.runtime.OffsetStorageWriter, and org.apache.kafka.connect.runtime.WorkerSourceTask).

loader constraint violation for org.apache.avro.Schema when using ParquetFileReader

I'm receiving the below error when using 'file_reader.class': 'com.github.mmolimar.kafka.connect.fs.file.reader.ParquetFileReader'.

I'm not sure if its related to
this change in io.confluent.connect.avro.AvroData

java.lang.LinkageError: loader constraint violation: when resolving method "io.confluent.connect.avro.AvroData.toConnectData(Lorg/apache/avro/Schema;Ljava/lang/Object;)Lorg/apache/kafka/connect/data/SchemaAndValue;" the class loader (instance of org/apache/kafka/connect/runtime/isolation/PluginClassLoader) of the current class, com/github/mmolimar/kafka/connect/fs/file/reader/ParquetFileReader$GenericRecordToStruct, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, io/confluent/connect/avro/AvroData, have different Class objects for the type org/apache/avro/Schema used in the signature
        at com.github.mmolimar.kafka.connect.fs.file.reader.ParquetFileReader$GenericRecordToStruct.apply(ParquetFileReader.java:167)
        at com.github.mmolimar.kafka.connect.fs.file.reader.ParquetFileReader$GenericRecordToStruct.apply(ParquetFileReader.java:157)
        at com.github.mmolimar.kafka.connect.fs.file.reader.AbstractFileReader.next(AbstractFileReader.java:44)
        at com.github.mmolimar.kafka.connect.fs.FsSourceTask.lambda$poll$0(FsSourceTask.java:76)
        at java.util.ArrayList.forEach(ArrayList.java:1255)
        at com.github.mmolimar.kafka.connect.fs.FsSourceTask.poll(FsSourceTask.java:71)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:163)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

S3 source connector

Hi !
It s said this connector supports S3 as source but in any way it is implemented in the code, Is there a way to add S3 source connector?
Thank you

Error while try to uploading text file to kafka

Error as below

==========================================================

[2018-02-01 13:51:20,178] ERROR Failed to create job for client.properties (org.apache.kafka.connect.cli.ConnectStandalone:97)
[2018-02-01 13:51:20,179] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:108)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config {security.protocol=SASL_PLAINTEXT, sasl.kerberos.service.name=kafka} contains no connector type
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:105)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config
{security.protocol=SASL_PLAINTEXT, sasl.kerberos.service.name=kafka} contains no connector type
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:242)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:163)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:102)

==================================================================

text file as below

=============================================

[root@hostname connectsource]#
[root@cdhant1 connectsource]# cat test.txt
asd,qwe,asd,qwe,qwe
qaz,edc,rfv,tgb,yhn
[root@hostnameconnectsource]# pwd
/datan1/connectsource
[root@hostname connectsource]#

=============================================================

command = KAFKA_HEAP_OPTS="-Xms512m -Xmx1g" $CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/schema-registry/connect-avro-standalone_properties
config/kafka-connect-fs.properties client.properties

-----------------connect-avro-standalone_properties-----------------------------------------
bootstrap.servers=hostname:9092,hostname:9092

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://hostname:8083
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://hostname:8083

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets


======================================

----------------------------kafka-connect-fs.properties---------------------------
**name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=file:///datan1/connectsource/
topic=test8
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy

policy.recursive=true
policy.regexp=.*
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader**

==================================================================

-------------------client properties---------------------

security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka

DelimitedTextFileReader sets all fields as Required

Hello team,
The DelimitedTextFileReader.java creates a schema based on the header in the csv file, all fields are created as Required by default. This breaks the conversion in case one column is empty and throws "Conversion error: null value for field that is required and has no default value".
It would be great if this can be configurable.

Configuring gzip decoding while reading from S3

The goal here is to read files from S3 which are are stored in "gzip" format in S3, first unzip the file and and then read using the provided or custom file reader, and store them in kafka.

Is this already possible currently? Am I missing something? Else this maybe a feature request.

sftp with private-key

Hi,
For SFTP connection as example below:
fs.uris=sftp://root:password@<MY_IP_ADDR>:22/home/test

Is there a way to use Private Key rather than User/Pass ?

New lines/Files are not getting picked up from s3 when the connector is running.

Hi !

The connector succeeds to read correctly the files in the bucket at startup time but then it does not handle any updates of files or new files in the bucket/directory/sub-directories. The log does not show any errors [1].
Pls find here [2] my config.
Any hints would be useful.

Thank you !

[1]
...
[2018-08-30 11:42:20,210] INFO WorkerSourceTask{id=FsSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-08-30 11:42:30,210] INFO WorkerSourceTask{id=FsSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
[2018-08-30 11:42:30,211] INFO WorkerSourceTask{id=FsSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:345)
[2018-08-30 11:42:40,211] INFO WorkerSourceTask{id=FsSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:328)
....

[2]
name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=3
fs.uris=s3a://test-sg-kafka/kafka/
policy.fs.fs.s3a.access.key=XXXXXX
policy.fs.fs.s3a.secret.key=XXXXXX
topic=test-v1
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy
policy.sleepy.sleep=30000
policy.recursive=true
policy.regexp=.*
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader
file_reader.delimited.token=;
file_reader.delimited.header=true

Connecting to S3 Without using Access & Secret Key

Hi,

I am trying to configure S3 Source Connector in AWS EC2 instance and we do NOT want to use access & secret key to connect to s3. We want to grant permission to EC2 instance by providing role to read buckets from S3. How can I achieve using this s3 connector .

Question: Does the connector tail a local file?

Hi,

I am interested in a Kafka File Source connector to ingest in real time the logs generated by an application server (i.e. tail the log file and ingest it into Kafka). Please could you tell me if this connector supports that functionality?

Thanks in advance

How can I get the connector to "pace itself"?

I'm trying to use this connector to do a historical replay from Kafka topic messages that were previously archived into S3 back into a new topic. Connector properties are as follows:

name=restore-from-s3-sa
tasks.max=1
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
topic=<my_topic>
fs.uris=s3a://<...snip...>/event_date=2018-02-28/
policy.sleepy.max_execs=1
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
policy.sleepy.sleep=30000
policy.recursive=true
policy.regexp=.*\.avro$
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader
transforms=createKey
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=<my_field>

The event_date=2018-02-28/ folder contains several hundred Avro files of ~5 MB each. The issue I'm having is that even under this setup, with tasks.max set to 1 and running Kafka Connect in standalone mode, the connector seems to try to pull all of the files at once up-front before beginning to process them, so it's not long before I hit OOM errors. The logs will show several repetitions of the below (once per file) before other message about offset committing etc. begin to show up:

[2018-03-05 22:08:41,124] INFO io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum (org.apache.hadoop.conf.Configuration.deprecation)
[2018-03-05 22:08:41,564] INFO Processing records for file [path = s3a://<...snip...>/event_date=2018-02-28/mytopic_v1+1+0016543294.avro, length = 4657181, blocks = [[offset = 0, length = 4657181, corrupt = false]]] (com.github.mmolimar.kafka.connect.fs.FsSourceTask)
[2018-03-05 22:08:41,957] INFO AvroDataConfig values:
	schemas.cache.config = 100
	enhanced.avro.schema.support = false
	connect.meta.data = true
 (io.confluent.connect.avro.AvroDataConfig)

I've tried this in both standalone and connect modes, and have also tried just writing the individual file URLs into the fs.uris property and setting policy.recursive to false, with no luck. If I pare back the list of files to a small enough number, everything does seem to work as intended.

Also worth noting, though it probably doesn't matter: I'm running Confluent Platform 4.0.0 / Kafka 1.0.0, so I built this connector with a few modifications to the dependency versions, namely:

avro.version 1.8.1 -> 1.8.2
kafka.version 0.10.1.0 -> 1.0.0
confluent.version 3.1.1 -> 4.0.0

Is there something I can change to drop the concurrency, or to ask the connector to process files serially within a task? Thanks!

Issues in using the source connector with HDFS

This is my use case: There will be CSV files getting dumped on a HDFS path. I have to produce these CSV files on a Kafka Topic.
I downloaded the project on my eclipse and built using maven. Inside the target directory, I got - "kafka-connect-hdfs-0.10.2.0-package" which had etc/kafka-connect-hdfs/kafka-connect-fs.properties. This is what I updated in this file:
name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=2
fs.uris=hdfs://abc.com:9000/test
topic=mytopic
policy.class=com.github.mmolimar.kafka.connect.fs.policy.HdfsFileWatcherPolicy
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader
file_reader.delimited.token=","
file_reader.delimited.header=true

I downloaded confluent-3.2.1 and updated etc/kafka/connect-standalone.properties as below:
bootstrap.servers=1.2.3.4:9092,1.2.3.5:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

I then, moved the target directory from my windows to $CONFLUENT_HOME path in the server. I exported classpath in the same way as mentioned by you in the Getting started section:
export CLASSPATH="$(find target/ -type f -name '*.jar'| grep '-package' | tr '\n' ':')"

Used below command to start the connector:
nohup $CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/target/kafka-connect-hdfs-0.10.2.0-package/etc/kafka-connect-hdfs/kafka-connect-fs.properties &

Now, I get error as:
[2017-05-20 11:26:50,155] ERROR Failed to create job for /home/tmp/confluent-3.2.1/target/kafka-connect-hdfs-0.10.2.0-package/etc/kafka-connect-hdfs/kafka-connect-fs.properties (org.apache.kafka.connect.cli.ConnectStandalone:88)
[2017-05-20 11:26:50,156] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:99)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.github.mmolimar.kafka.connect.fs.FsSourceConnector, available connectors are: io.confluent.connect.replicator.ReplicatorSourceConnector, org.apache.kafka.connect.tools.VerifiableSourceConnector, io.confluent.connect.s3.S3SinkConnector, org.apache.kafka.connect.tools.MockSourceConnector, org.apache.kafka.connect.tools.VerifiableSinkConnector, io.confluent.connect.storage.tools.SchemaSourceConnector, io.confluent.connect.jdbc.JdbcSourceConnector, org.apache.kafka.connect.tools.SchemaSourceConnector, org.apache.kafka.connect.sink.SinkConnector, io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, org.apache.kafka.connect.tools.MockConnector, org.apache.kafka.connect.tools.MockSinkConnector, org.apache.kafka.connect.file.FileStreamSourceConnector, org.apache.kafka.connect.source.SourceConnector, io.confluent.connect.hdfs.HdfsSinkConnector, io.confluent.connect.hdfs.tools.SchemaSourceConnector, io.confluent.connect.jdbc.JdbcSinkConnector, org.apache.kafka.connect.file.FileStreamSinkConnector
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:80)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:67)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:96)
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to find any class that implements Connector and which name matches com.github.mmolimar.kafka.connect.fs.FsSourceConnector, available connectors are: io.confluent.connect.replicator.ReplicatorSourceConnector, org.apache.kafka.connect.tools.VerifiableSourceConnector, io.confluent.connect.s3.S3SinkConnector, org.apache.kafka.connect.tools.MockSourceConnector, org.apache.kafka.connect.tools.VerifiableSinkConnector, io.confluent.connect.storage.tools.SchemaSourceConnector, io.confluent.connect.jdbc.JdbcSourceConnector, org.apache.kafka.connect.tools.SchemaSourceConnector, org.apache.kafka.connect.sink.SinkConnector, io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, org.apache.kafka.connect.tools.MockConnector, org.apache.kafka.connect.tools.MockSinkConnector, org.apache.kafka.connect.file.FileStreamSourceConnector, org.apache.kafka.connect.source.SourceConnector, io.confluent.connect.hdfs.HdfsSinkConnector, io.confluent.connect.hdfs.tools.SchemaSourceConnector, io.confluent.connect.jdbc.JdbcSinkConnector, org.apache.kafka.connect.file.FileStreamSinkConnector
at org.apache.kafka.connect.runtime.ConnectorFactory.getConnectorClass(ConnectorFactory.java:84)
at org.apache.kafka.connect.runtime.ConnectorFactory.newConnector(ConnectorFactory.java:38)
at org.apache.kafka.connect.runtime.AbstractHerder.getConnector(AbstractHerder.java:336)
at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:235)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:158)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:93)
[2017-05-20 11:26:50,159] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:66)
[2017-05-20 11:26:50,159] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2017-05-20 11:26:50,159] DEBUG stopping org.eclipse.jetty.server.Server@1aa7ecca (org.eclipse.jetty.util.component.AbstractLifeCycle:194)
[2017-05-20 11:26:50,162] DEBUG Graceful shutdown org.eclipse.jetty.server.Server@1aa7ecca by (org.eclipse.jetty.server.Server:418)
[2017-05-20 11:26:50,162] DEBUG stopping ServerConnector@42a48628{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.util.component.AbstractLifeCycle:194)
[2017-05-20 11:26:50,162] DEBUG stopping org.eclipse.jetty.server.ServerConnector$ServerConnectorManager@3c19aaa5 (org.eclipse.jetty.util.component.AbstractLifeCycle:194)
[2017-05-20 11:26:50,163] DEBUG stopping org.eclipse.jetty.io.SelectorManager$ManagedSelector@4204541c keys=0 selected=0 (org.eclipse.jetty.util.component.AbstractLifeCycle:194)
[2017-05-20 11:26:50,163] DEBUG Stopping org.eclipse.jetty.io.SelectorManager$ManagedSelector@4204541c keys=0 selected=0 (org.eclipse.jetty.io.SelectorManager:432)
[2017-05-20 11:26:50,164] DEBUG Queued change org.eclipse.jetty.io.SelectorManager$ManagedSelector$Stop@2a20ba2c (org.eclipse.jetty.io.SelectorManager:480)
[2017-05-20 11:26:50,167] DEBUG Selector loop woken up from select, 0/0 selected (org.eclipse.jetty.io.SelectorManager:602)
[2017-05-20 11:26:50,168] DEBUG Running change org.eclipse.jetty.io.SelectorManager$ManagedSelector$Stop@2a20ba2c (org.eclipse.jetty.io.SelectorManager:525)
[2017-05-20 11:26:50,168] DEBUG Stopped org.eclipse.jetty.io.SelectorManager$ManagedSelector@4204541c keys=-1 selected=-1 (org.eclipse.jetty.io.SelectorManager:437)
[2017-05-20 11:26:50,168] DEBUG STOPPED org.eclipse.jetty.io.SelectorManager$ManagedSelector@4204541c keys=-1 selected=-1 (org.eclipse.jetty.util.component.AbstractLifeCycle:204)

What am I doing wrong here? Is my deployment not OK?
I also tried to put the share/java/kafka-connect-hdfs jars from target path to actual confluent's share/java/kafka-connect-hdfs, but that did not help.
May be you can throw some light here. How to run this connector end to end. I am in dire need to make this run and implement it.

Need to pass filename and data to kafka

Currently i am able to listen file and upload to kafka and consume it as below.

Consumer sample output as below

Struct{value=mobile|imsi|2050160|2|14|2|xxxxxx|20160917|vvvvvv|20370101|mmmmmm|1250158,1104006,1210003,xxxxxx,1106000,54545454,1103001|2000:14,3000:-3333333|}
Struct{value=mobile|imsi|2050160|2|14|2|xxxxxx|20160917|vvvvvv|20370101|bbbbbbbbb|1250158,1104006,1210003,xxxxxx,1106000,44444444,1103001|2000:14,3000:-4444444|}

I need my file name to be present as key of above output.

My current configs as below

=================/kafka-connect-fs.properties==========================
name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=10

fs.uris=file:///dat/source
topic=topicname

policy.class=com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy
policy.sleepy.sleep=50000

policy.regexp=.*
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader

===================================properties======================

=======connect-avro-standalone.properties==========================

bootstrap.servers=ip:9092,ip:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schema.registry.url=http://ip:8083
value.converter=org.apache.kafka.connect.storage.StringConverter
value.converter.schema.registry.url=http://ip:8083
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
key.converter.schemas.enable=true
value.converter.schemas.enable=true
sasl.kerberos.service.name=kafka
security.protocol=SASL_PLAINTEXT
producer.security.protocol=SASL_PLAINTEXT
producer.sasl.kerberos.service.name=kafka
sasl.kerberos.service.name=kafka
offset.storage.file.filename=/tmp/connect.offsets
plugin.path=share/java

====================================

I am executing as below

KAFKA_HEAP_OPTS="-Xms4g -Xmx32g" /confluent-4.0.0/bin/connect-standalone/ connect-avro-standalone.properties kafka-connect-fs.properties

Kindly advise

Change possibility for Schema Name and Namespace.

Hi, I've been testing the connector for a couple of days, focused on the DelimitedTextReader and CSV.
In our Test Case we're working with Avro (Specific Record) and Kafka Schema Registry.
The behaviour is really nice; using the CSV header to define column names, and COLUMN_# in case header is not define, but for the schema name/namespace there is not any configuration possibility.
Why we need to define a namespace? We have tons of CSVs and we expect to keep schemas orderered through namespaces, and schema names.

Thanks in advance,
BR.

java.lang.NoClassDefFoundError: org/apache/commons/lang/reflect/ConstructorUtils

Hello,

Following this config after freshly cloning & compiling your code:

name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=file:///Users/me/a/path/tofolder/with/files
topic=ingestion
policy.class=com.github.mmolimar.kafka.connect.fs.policy.SimplePolicy
policy.recursive=false
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader
file_reader.agnostic.extensions.delimited=tsv,csv,pdf

I run:
bin/connect-standalone.sh config/connect-standalone.properties config/kafka-connect-fs.properties

but I stumble into this error:

[2020-01-10 13:54:00,582] ERROR Couldn't start FsSourceConnector: (com.github.mmolimar.kafka.connect.fs.FsSourceTask:57)
java.lang.NoClassDefFoundError: org/apache/commons/lang/reflect/ConstructorUtils
	at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.make(ReflectionUtils.java:30)
	at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.makePolicy(ReflectionUtils.java:23)
	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.start(FsSourceTask.java:52)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.lang.reflect.ConstructorUtils
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 11 more
[2020-01-10 13:54:00,583] INFO WorkerSourceTask{id=FsSourceConnector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2020-01-10 13:54:00,584] INFO WorkerSourceTask{id=FsSourceConnector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2020-01-10 13:54:00,584] ERROR WorkerSourceTask{id=FsSourceConnector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: A problem has occurred reading configuration:org/apache/commons/lang/reflect/ConstructorUtils
	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.start(FsSourceTask.java:58)
	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:208)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

is there anything else to do in order to have this connector running?

help me,,

How can I download this source into Eclipse and run it after modifying the kafka-connect-fs.properties file?

What if executions AtomicInterger variable reach to its max limit.

Hi

If AtomicInteger variable reach to its max limit, after that it will set back to negative value.

@Override
public final Iterator<FileMetadata> execute() throws IOException {
    if (hasEnded()) {
        throw new IllegalWorkerStateException("Policy has ended. Cannot be retried.");
    }
    preCheck();

    executions.incrementAndGet();
    Iterator<FileMetadata> files = Collections.emptyIterator();
    for (FileSystem fs : fileSystems) {
        files = concat(files, listFiles(fs));
    }

    postCheck();

    return files;
}

Missing reader.close ?

Hi, I just encountering s3a read timeout error in worker

kafka-connect_1     | org.apache.kafka.connect.errors.ConnectException: An error has occurred when creating reader for file: s3a://XXXXX
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:208)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.policy.SleepyPolicy.offer(SleepyPolicy.java:11)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.lambda$poll$0(FsSourceTask.java:76)
kafka-connect_1     | 	at java.util.ArrayList.forEach(ArrayList.java:1249)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.FsSourceTask.poll(FsSourceTask.java:73)
kafka-connect_1     | 	at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
kafka-connect_1     | 	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
kafka-connect_1     | 	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
kafka-connect_1     | 	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
kafka-connect_1     | 	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
kafka-connect_1     | 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
kafka-connect_1     | 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
kafka-connect_1     | 	at java.lang.Thread.run(Thread.java:745)
kafka-connect_1     | Caused by: java.io.InterruptedIOException: getFileStatus on s3a://XXXXXX: com.amazonaws.SdkClientException: Unable to execute HTTP request: Timeout waiting for connection from pool
kafka-connect_1     | 	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:141)
kafka-connect_1     | 	at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:117)
kafka-connect_1     | 	at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:1857)
kafka-connect_1     | 	at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:1820)
kafka-connect_1     | 	at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:1761)
kafka-connect_1     | 	at org.apache.hadoop.fs.s3a.S3AFileSystem.open(S3AFileSystem.java:583)
kafka-connect_1     | 	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:914)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.file.reader.TextFileReader.<init>(TextFileReader.java:39)
kafka-connect_1     | 	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
kafka-connect_1     | 	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
kafka-connect_1     | 	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
kafka-connect_1     | 	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.make(ReflectionUtils.java:31)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils.makeReader(ReflectionUtils.java:19)
kafka-connect_1     | 	at com.github.mmolimar.kafka.connect.fs.policy.AbstractPolicy.offer(AbstractPolicy.java:205)
kafka-connect_1     | 	... 12 more
...

I have to increase policy.fs.fs.s3a.connection.maximum setting to solve this temporarily.

But I guess call reader.close is the right way to do it, In SourceTask#Poll https://github.com/mmolimar/kafka-connect-fs/blob/master/src/main/java/com/github/mmolimar/kafka/connect/fs/FsSourceTask.java#L74

more: aws/aws-sdk-java#269 (comment)

Moving processed files

Hi Mario, I'm using FS connector with parquet file reader on linux filesystem, so every execution of policy seeks all the input parquet files. Is there anything to avoid this? I'm considering implementation of moving processed files to different directories (processed successfully -> output directory, error files -> error directory), to avoid that parquet seeking.

What is your opinion?
Thanks

Kerberos Authentication for kafka-connect-fs???

name=FsSourceConnector
connector.class=com.github.mmolimar.kafka.connect.fs.FsSourceConnector
tasks.max=1
fs.uris=hdfs://host/user/hive/warehouse/avro_test_db.db/re_avro_formatted/
topic=kafka-connect-fs
policy.class=com.github.mmolimar.kafka.connect.fs.policy.HdfsFileWatcherPolicy
policy.recursive=true
policy.regexp=.*
file_reader.class=com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader

I am using the above property to push the data from hdfs to Kafka but i get the below exception

org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]

so is there anyways i could do kerberos authentication?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    ๐Ÿ–– Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. ๐Ÿ“Š๐Ÿ“ˆ๐ŸŽ‰

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google โค๏ธ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.