streamnative / kop Goto Github PK
View Code? Open in Web Editor NEWKafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
Home Page: https://streamnative.io/docs/kop
License: Apache License 2.0
Kafka-on-Pulsar - A protocol handler that brings native Kafka protocol to Apache Pulsar
Home Page: https://streamnative.io/docs/kop
License: Apache License 2.0
This is similar to what pulsar provided class PulsarVersion
, we could leverage this to get output of build and github info.
In PR #17 , we provided a test framework.
Try to use this issue to add more test cases.
Is your feature request related to a problem? Please describe.
make group coordinator running in a distributed mode
[FEATURE] release 0.1.0
Describe the bug
KafkaTopicManager provided a way to hold on-going topics. it need a cache mechanism to evict invalid/old data.
we could use KafkaTopicManager as cache for MetadataCache
(in KafkaApis.scala), to return
To Reproduce
Steps to reproduce the behavior:
Expected behavior
A clear and concise description of what you expected to happen.
Screenshots
If applicable, add screenshots to help explain your problem.
Additional context
Add any other context about the problem here.
Add security support for KOP
Proxy support for KoP.
Is your feature request related to a problem? Please describe.
This commit in librdkakfa is removing support of older versions of APIVersions to use only v3. As APIVersions is almost the first frame sent by clients, we need to support it to enable support of future librdkafka's version.
Describe the solution you'd like
We need to bump kafka-clients version to at least 2.4 as encoding and decoding is handled through this dep.
Currently, After we create a partitioned topic, for a topic that has no producer/consumer created on it,
running command
bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 127.0.0.1:9092 --topic topic8 --time -1
will return
"Topic topic8 does not exist".
Describe the bug
When test KOP with kafka-producer-perf tool, performance is very poor:
$ bin/kafka-producer-perf-test.sh --topic kop-test --producer.conf config/producer.properties --throughput -1 --num-records 100000000 --record-size 1024
4036 records sent, 804.8 records/sec (0.79 MB/sec), 2349.0 ms avg latency, 4712.0 max latency.
3735 records sent, 745.2 records/sec (0.73 MB/sec), 7095.8 ms avg latency, 9678.0 max latency.
3600 records sent, 718.0 records/sec (0.70 MB/sec), 12010.2 ms avg latency, 14667.0 max latency.
3645 records sent, 726.7 records/sec (0.71 MB/sec), 17134.9 ms avg latency, 19659.0 max latency.
while test with pulsar-perf, performance much more higher
$ bin/pulsar-perf produce -threads 1 -u pulsar://100.76.43.216:6650 -o 10000 -n 4 -b 0 -bm 0 -s 4096 -r 3000000 public/default/kop-test
09:55:10.009 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 15882.3 msg/s --- 496.3 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 337.086 ms - med: 236.380 - 95pct: 784.219 - 99pct: 861.987 - 99.9pct: 870.187 - 99.99pct: 870.439 - Max: 870.539
09:55:20.100 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 15703.9 msg/s --- 490.7 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 1645.813 ms - med: 521.495 - 95pct: 10822.271 - 99pct: 10832.767 - 99.9pct: 10838.975 - 99.99pct: 10839.359 - Max: 10839.359
09:55:30.162 [main] INFO org.apache.pulsar.testclient.PerformanceProducer - Throughput produced: 15777.3 msg/s --- 493.0 Mbit/s --- failure 0.0 msg/s --- Latency: mean: 1764.075 ms - med: 676.243 - 95pct: 10684.863 - 99pct: 10744.063 - 99.9pct: 10747.455 - 99.99pct: 10749.183 - Max: 10749.247
To Reproduce
just run kafka-producer-perf on a KOP supported pulsar cluster
Expected behavior
For kafka client, KOP should provide performance same as or close to native pulsar-client
currently, this repo is not open sourced, it need a place to find the readme. so in release file(.github/workflows/release.yml) we include the readme.
we could remove this once it open sourced, or anyother place contains it
while 2.0.0 java api is OK.
example code:
props.setProperty("bootstrap.servers", "kf17ss.jx.shbt.qihoo.net:9092");
props.setProperty("compression.type", "lz4");
props.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.setProperty("retries", "3");
int logCount = 100;
for (int i = 0; i < logCount; i++) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, bidLog.getBytes());
// should always return true, if false, LogProducer refuse to service
logProducer.send(record, new ProducerCallback(record));
}
for (int i = 0; i < logCount; i++) {
ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, bidLog.getBytes());
// should always return true, if false, LogProducer refuse to service
logProducer.send(record, new ProducerCallback(record));
}
Motivation
Currently, kop protocol handler only prints header at debug statements. We also need request and response.
In topicManager, the cursor is cached but not released. we could release it by the time that last used.
currently pulsar broker has the produce rate limit supported, but kop not add the supporting.
The implementation is to add the counter in produce context.
This is related to SSL authentication that implemented in #45, After 45, user could enable authentication using SSL. But the authorization is not align with Pulsar. Most user would like to have both authentication and authorization worked.
In SSL auth, try to map CN to pulsar role, and leverage current pulsar authorise to do the authorization. then we could get pulsar-admin and pulsar io path authorised.
Currently, in KoP, all the auth with Pulsar side is through Pulsar admin client and Pulsar client (Both get from PulsarService directly). This way we could isolate KoP auth and Pulsar auth. So this task is not in a hurry.
jenkins has some limitation, it would be good to move to github action
need a test for read a topic that have multiple ledger, and verify the read offset jump from 1 ledger to another has no issue.
This is related to Pulsar PR apache/pulsar#6337.
Before 6337, user could not create non-durable submode consumer.
After have 6337, the created non-durable submode consumer could use kafka consumer group Id as the subname of pulsar consumer.
22:55:17.650 [ForkJoinPool.commonPool-worker-13] WARN io.streamnative.pulsar.handlers.kop.coordinator.group.OffsetAcker - Error when get consumer for offset ack:
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException: Topic reader cannot be created on a partitioned topic
at java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:326) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:338) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture.uniRelay(CompletableFuture.java:911) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture$UniRelay.tryFire(CompletableFuture.java:899) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) ~[?:1.8.0_131]
at org.apache.pulsar.client.impl.PulsarClientImpl.lambda$doCreateReaderAsync$14(PulsarClientImpl.java:483) ~[org.apache.pulsar-pulsar-client-original-2.5.0-ad0224407.jar:2.5.0-ad0224407]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_131]
at org.apache.pulsar.client.impl.BinaryProtoLookupService.lambda$null$6(BinaryProtoLookupService.java:167) ~[org.apache.pulsar-pulsar-client-original-2.5.0-ad0224407.jar:2.5.0-ad0224407]
at java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) ~[?:1.8.0_131]
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) ~[?:1.8.0_131]
at org.apache.pulsar.client.impl.ClientCnx.handlePartitionResponse(ClientCnx.java:516) ~[org.apache.pulsar-pulsar-client-original-2.5.0-ad0224407.jar:2.5.0-ad0224407]
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:120) ~[org.apache.pulsar-pulsar-common-2.5.0-ad0224407.jar:2.5.0-ad0224407]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:321) ~[io.netty-netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:295) ~[io.netty-netty-codec-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:355) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:363) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[io.netty-netty-transport-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.45.Final.jar:4.1.45.Final]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_131]
Caused by: org.apache.pulsar.client.api.PulsarClientException: Topic reader cannot be created on a partitioned topic
... 37 more
Problem
Currently, we used the __offset
topic for storing the offsets and coordination groups. We need to find a solution to sync Kafka offsets to the Pulsar subscription state. So we can see the metrics at a central place.
In PR #92 , there is a reproducible issue with ListOffsets in Integration tests.
#92 (comment)
Describe the bug
Integrations tests are failing on OSX
To Reproduce
Steps to reproduce the behavior:
error Error: connect ECONNREFUSED 127.0.0.1:15014
at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1137:16) {
errno: -111,
code: ‘ECONNREFUSED’,
syscall: ‘connect’,
address: ‘127.0.0.1’,
port: 15014
}
ExitCode=1
Expected behavior
Should not fail
Additional context
We are running containers in host mode
, which means that port should be available in both side. Kop broker is started in host, tests in Docker containers.
While is it working on linux as we have no vms, on OSX, testcontainers is forcing us to use the host.testcontainers.internal
host to communicate with the host. Which is creating problem as we cannot easily advertize this adress.
Proposal
In order to quickly merge integrations tests with those in apache pulsar, I think we should be more close to what is done in apache/pulsar. We could:
What do you think?
Describe the bug
while set
brokerPublisherThrottlingMaxByteRate
it will meet error:
08:38:34.452 [pulsar-broker-publish-rate-limiter-monitor-30-1] ERROR org.apache.bookkeeper.common.util.SafeRunnable - Unexpected throwable caught
java.lang.NullPointerException: null
at org.apache.pulsar.broker.service.ServerCnx.enableCnxAutoRead(ServerCnx.java:1605) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
at org.apache.pulsar.broker.service.AbstractTopic.lambda$enableProducerRead$5(AbstractTopic.java:295) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
at java.util.concurrent.ConcurrentHashMap$ValuesView.forEach(ConcurrentHashMap.java:4707) ~[?:1.8.0_242]
at org.apache.pulsar.broker.service.AbstractTopic.enableProducerRead(AbstractTopic.java:295) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
at org.apache.pulsar.broker.service.AbstractTopic.resetBrokerPublishCountAndEnableReadIfRequired(AbstractTopic.java:286) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
at org.apache.pulsar.broker.service.BrokerService.lambda$refreshBrokerPublishRate$39(BrokerService.java:1138) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
at org.apache.pulsar.broker.service.BrokerService.lambda$forEachTopic$40(BrokerService.java:1148) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:385) ~[org.apache.pulsar-pulsar-common-2.5.0.jar:2.5.0]
at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:159) ~[org.apache.pulsar-pulsar-common-2.5.0.jar:2.5.0]
at org.apache.pulsar.broker.service.BrokerService.forEachTopic(BrokerService.java:1145) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
at org.apache.pulsar.broker.service.BrokerService.refreshBrokerPublishRate(BrokerService.java:1138) ~[org.apache.pulsar-pulsar-broker-2.5.0.jar:2.5.0]
at org.apache.pulsar.broker.service.BrokerService.lambda$setupBrokerPublishRateLimiterMonitor$9(BrokerService.java:509) ~[org.apache.pu
This is caused by the mock Producer that we registered for bundle load/unload handling.
/**
* it sets cnx auto-readable if producer's cnx is disabled due to publish-throttling
*/
protected void enableProducerRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}
}
public void enableCnxAutoRead() {
// we can add check (&& pendingSendRequest < MaxPendingSendRequests) here but then it requires
// pendingSendRequest to be volatile and it can be expensive while writing. also this will be called on if
// throttling is enable on the topic. so, avoid pendingSendRequest check will be fine.
if (!ctx.channel().config().isAutoRead() && autoReadDisabledRateLimiting) {
// Resume reading from socket if pending-request is not reached to threshold
ctx.channel().config().setAutoRead(true);
// triggers channel read
ctx.read();
autoReadDisabledRateLimiting = false;
}
}
The code may meet error in "ctx.channel().config().isAutoRead()". we need to provide related fork. for InternalProducer and InternalServerCnx.
This is the sub task for #57
Describe the bug
Force topic delete is not working when topic has active consumers/producers.
To Reproduce
Steps to reproduce the behavior:
Go inside bastion pod
Create an active consumer in new terminal for the above topic created :
Force Delete the topic using this command while the consumer is actively running in other terminal:
Do a topics list to check if the topic is deleted and observe that topic is still listed :
Expected behavior
Topics should get deleted and topic creation after deletion should work.
This issue is easily reproducible please let me know if more info/logs is required.
user may only get a tar ball, and want to know all the configs in KafkaServiceConfiguration
Describe the bug
With following parameter settings in broker.conf of KoP non-partitioned topics is being created while creating consumer and producer for non existing topic using pulsar client.
Parameter configuration:
allowAutoTopicCreation=true
allowAutoTopicCreationType=partitioned
defaultNumPartitions=1
How to Reproduce
Expected Behaviour
allowAutoTopicCreationType flag should be honoured when set to partitioned . Should create partitioned topic when auto topic creation is set to true.
Additional Info
Attached kop logs.
kop0autotopic.log
kop1autotopic.log
kop2autotopic.log
In current request handler. we passed in the request and after some handles return a responseFuture.
If several request comes in at same time, the handling time difference will cause the return future order different with the order that command comes in, And this will cause request handling order different.
This error is seen in the KafkaMessageOrderTest
:
Currently, most of the integration tests are only verified on Kafka 2.0 clients. Can we add more integration tests to cover more Kafka clients?
consider using the address in the listener as the bind address.
Add integration test for KoP
Is your feature request related to a problem? Please describe.
In metadata request implementation, we use admin api to find broker for a topic.
String broker = kafkaService.getAdminClient().lookups().lookupTopic(topic.toString());
change to use binary protocol would be good to improve performance.
Describe the bug
A clear and concise description of what the bug is.
To Reproduce
Steps to reproduce the behavior:
Expected behavior
A clear and concise description of what you expected to happen.
Screenshots
If applicable, add screenshots to help explain your problem.
Additional context
Add any other context about the problem here.
there is several things need pay attention:
currently each non-continue read request will have a new NonDurableCursor created.
we should cache latest ones, and clean the time expired ones.
Is your feature request related to a problem? Please describe.
Each kafka broker will have a node.id registered in zk. and this id is used in various places.
will try to avoid creating this, if not we will have to create zk node for this.
Is your feature request related to a problem? Please describe.
we should able to start a mock pulsar(kop) and do the unit test with it.
currently we did not do compress while produce and fetch messages. It would be good to add support for this feature.
The message handling are mostly in class "MessageRecordUtils".
Motivation
Currently, we have a customized broker for running KoP. However, people should be able to use the official Pulsar broker and install the KoP NAR package to enable Kafka protocol. We should change our tests and documentation to use this approach.
Describe the bug
When tries KOP with kafka 2.1.0,2.2.0 and 2.3.0, if KafkaConsumer#offsetsForTimes
invoked, we'll have the exception follows
Exception in thread "main" java.lang.IllegalArgumentException: Invalid negative timestamp
at org.apache.kafka.clients.consumer.OffsetAndTimestamp.<init>(OffsetAndTimestamp.java:39)
at org.apache.kafka.clients.consumer.internals.Fetcher.offsetsForTimes(Fetcher.java:500)
at org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes(KafkaConsumer.java:2009)
at org.apache.kafka.clients.consumer.KafkaConsumer.offsetsForTimes(KafkaConsumer.java:1973)
at ConsumerKafka.lambda$compatibleTest$0(ConsumerKafka.java:94)
at ConsumerKafka.tryFeature(ConsumerKafka.java:179)
at ConsumerKafka.compatibleTest(ConsumerKafka.java:93)
at ConsumerKafka.main(ConsumerKafka.java:62)
To Reproduce
KafkaConsumer#offsetsForTimes
Expected behavior
KafkaConsumer#offsetsForTimes
should work well
Additional context
When I do troubleshooting, I found KOP always return offset informations with timestamp -1.
partitionData.complete(new ListOffsetResponse.PartitionData(
Errors.NONE,
RecordBatch.NO_TIMESTAMP, \\ -1
MessageIdUtils.getOffset(finalPosition.getLedgerId(), finalPosition.getEntryId())));
But in Kafka java client of 2.1.0,2.2.0,2.3.0, there is a parameter check on the timestamp which must be bigger than 0.
for (Map.Entry<TopicPartition, ListOffsetData> entry : fetchedOffsets.entrySet()) {
// 'entry.getValue().timestamp' will not be null since we are guaranteed
// to work with a v1 (or later) ListOffset request
ListOffsetData offsetData = entry.getValue();
offsetsByTimes.put(entry.getKey(), new OffsetAndTimestamp(offsetData.offset, offsetData.timestamp,
offsetData.leaderEpoch));
}
...
public OffsetAndTimestamp(long offset, long timestamp, Optional<Integer> leaderEpoch) {
if (offset < 0)
throw new IllegalArgumentException("Invalid negative offset");
if (timestamp < 0)
throw new IllegalArgumentException("Invalid negative timestamp");
this.offset = offset;
this.timestamp = timestamp;
this.leaderEpoch = leaderEpoch;
}
This is the cause of the exception.
In Kafka Metadata cache, all topics in the whole cluster is included, not only the topics in this broker.
So Kafka could easily use it to check if a topic is exists or not, such as in offsetCommit...
In Pulsar we could use a zk cache for all the existing persistent topics to achieve this?
In PR #82 of commit keep tar build, we kept the tar.gz style of build style for current user.
And it should be removed in the future.
In PR #105, GroupCoordinatorTest#testSessionTimeout is flaky. This is used to track this issue.
It could not easy locally. but ci always failed. and seems changes in PR not affect the logic of timer and groupCooridnator.
need more investigate.
Describe the bug
Whne using Kop, configure standalone.conf
as below:
saslAllowedMechanisms=PLAIN
# Configuration to enable authentication and authorization
authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
# If using secret key
tokenSecretKey=file:///path/to/secret.key
It failed to start Pulsar.
23:31:00.438 [pulsar-web-66-13] WARN org.apache.pulsar.broker.web.AuthenticationFilter - [127.0.0.1] Failed to authenticate HTTP request: Authentication required
23:31:00.448 [pulsar-web-66-13] INFO org.eclipse.jetty.server.RequestLog - 127.0.0.1 - - [16/Apr/2020:23:31:00 +0800] "GET /admin/v2/clusters HTTP/1.1" 401 0 "-" "Pulsar-Java-v2.5.0-ad0224407" 34
23:31:00.472 [main] ERROR io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler - Failed to get retention policy for kafka metadata namespace public/__kafka
org.apache.pulsar.client.admin.PulsarAdminException$NotAuthorizedException: HTTP 401 Unauthorized
To Reproduce
Steps to reproduce the behavior:
./bin/pulsar standalone -a 127.0.0.1
Secure KoP
in StreamNative documentation to enable authentication.saslAllowedMechanisms=PLAIN
# Configuration to enable authentication and authorization
authenticationEnabled=true
authorizationEnabled=true
authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
# If using secret key
tokenSecretKey=file:///path/to/secret.key
Failed to start Pulsar.
Expected behavior
A clear and concise description of what you expected to happen.
Screenshots
Additional context
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.