Comments (3)
@wangyi2021 Currently, multiple replicas with LocalFile is not ready. We already did some refactor with shuffle read and multiple replicas should be supported next according to these refactor.
from firestorm.
@wangyi2021 Currently, multiple replicas with LocalFile is not ready. We already did some refactor with shuffle read and multiple replicas should be supported next according to these refactor.
case 4 tested by following configuration:
spark.rss.data.replica = 2
spark.rss.storage.type = LOCALFILE
result: application no re-run, application exec time is same as no rss abnormal.
task 195 error log (shuffle fetch for partiton 94 data):
2021-12-27 11:35:09,568 | INFO | [dispatcher-Executor] | Got assigned task 195 | org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
2021-12-27 11:35:09,568 | INFO | [Executor task launch worker for task 195] | Running task 94.0 in stage 1.0 (TID 195) | org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
2021-12-27 11:35:09,581 | INFO | [Executor task launch worker for task 195] | Get taskId cost 1 ms, and request expected blockIds from 100 tasks for shuffleId[0], partitionId[94] | org.apache.spark.shuffle.RssShuffleManager.getReader(RssShuffleManager.java:290)
2021-12-27 11:35:09,581 | WARN | [Executor task launch worker for task 195] | Get shuffle result is failed from ShuffleServerInfo{id[xx.xx.xx.rss-abnormal], host[xx.xx.xx.rss-abnormal], port[19999]} for appId[application_id_20211227], shuffleId[0] | com.tencent.rss.client.impl.ShuffleWriteClientImpl.getShuffleResult(ShuffleWriteClientImpl.java:313)
2021-12-27 11:35:09,583 | INFO | [Executor task launch worker for task 195] | Get shuffle blockId cost 2 ms, and get 150 blockIds for shuffleId[0], partitionId[94] | org.apache.spark.shuffle.RssShuffleManager.getReaderImpl(RssShuffleManager.java:353)
2021-12-27 11:35:09,615 | INFO | [Executor task launch worker for task 195] | Shuffle read started:appId=application_id_20211227, shuffleId=0,taskId=195_0, partitions: [94, 95), maps: [0, 2147483647) | org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:116)
2021-12-27 11:35:09,616 | INFO | [Executor task launch worker for task 195] | Inserting aggregated records to sorter | org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:137)
2021-12-27 11:35:09,616 | WARN | [Executor task launch worker for task 195] | Failed to read shuffle data with ShuffleServerGrpcClient for host[xx.xx.xx.rss-abnormal], port[19999] | com.tencent.rss.storage.handler.impl.LocalFileClientReadHandler.readShuffleData(LocalFileClientReadHandler.java:69)
io.grpc.StatusRuntimeException: UNAVAILABLE: io exception
at io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
at io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
at io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
at com.tencent.rss.proto.ShuffleServerGrpc$ShuffleServerBlockingStub.getShuffleData(ShuffleServerGrpc.java:607)
at com.tencent.rss.client.impl.grpc.ShuffleServerGrpcClient.getShuffleData(ShuffleServerGrpcClient.java:422)
at com.tencent.rss.storage.handler.impl.LocalFileClientReadHandler.readShuffleData(LocalFileClientReadHandler.java:64)
at com.tencent.rss.client.impl.ShuffleReadClientImpl.read(ShuffleReadClientImpl.java:180)
at com.tencent.rss.client.impl.ShuffleReadClientImpl.readShuffleBlockData(ShuffleReadClientImpl.java:119)
at org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:97)
at org.apache.spark.shuffle.reader.RssShuffleReader$MultiPartitionIterator.hasNext(RssShuffleReader.java:211)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:200)
at org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:139)
at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:106)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:352)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:316)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$6(Executor.scala:575)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1422)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:578)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: io.grpc.netty.shaded.io.netty.channel.AbstractChannel$AnnotatedConnectException: finishConnect(..) failed: Connection refused: /xx.xx.xx.rss-abnormal:19999
Caused by: java.net.ConnectException: finishConnect(..) failed: Connection refused
at io.grpc.netty.shaded.io.netty.channel.unix.Errors.throwConnectException(Errors.java:124)
at io.grpc.netty.shaded.io.netty.channel.unix.Socket.finishConnect(Socket.java:243)
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.doFinishConnect(AbstractEpollChannel.java:672)
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.finishConnect(AbstractEpollChannel.java:649)
at io.grpc.netty.shaded.io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe.epollOutReady(AbstractEpollChannel.java:529)
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:465)
at io.grpc.netty.shaded.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)
at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.run(Thread.java:748)
It will retry 3 times, then task succeed.
2021-12-27 11:35:10,075 | INFO | [Executor task launch worker for task 195] | GetShuffleData for appId[application_id_20211227], shuffleId[0], partitionId[94] cost 1 ms | com.tencent.rss.client.impl.grpc.ShuffleServerGrpcClient.getShuffleData(ShuffleServerGrpcClient.java:425)
2021-12-27 11:35:10,075 | INFO | [Executor task launch worker for task 195] | Metrics for shuffleId[0], partitionId[94], read data cost 129 ms, copy data cost 4 ms, crc check cost 2 ms | com.tencent.rss.client.impl.ShuffleReadClientImpl.logStatics(ShuffleReadClientImpl.java:218)
2021-12-27 11:35:10,076 | INFO | [Executor task launch worker for task 195] | Fetch 0 bytes cost 137 ms and 3 ms to serialize, 87 ms to decompress with unCompressionLength[105941276] | org.apache.spark.shuffle.reader.RssShuffleDataIterator.hasNext(RssShuffleDataIterator.java:126)
2021-12-27 11:35:10,076 | INFO | [Executor task launch worker for task 195] | Inserted aggregated records to sorter: millis:460 | org.apache.spark.shuffle.reader.RssShuffleReader.read(RssShuffleReader.java:140)
2021-12-27 11:35:19,555 | INFO | [Executor task launch worker for task 195] | Finished task 94.0 in stage 1.0 (TID 195). 1364 bytes result sent to driver | org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
In rss-abnormal server log, there is no log about this reduce task for partition 94.
In backup rss server, log about partition 94:
[INFO] 2021-12-27 11:35:10,027 Grpc-454 ShuffleServerGrpcService getShuffleData - Successfully getShuffleData cost 4 ms for appId[application_id_20211227], shuffleId[0], partitionId[94] with 4682982 bytes and 19 blocks
[INFO] 2021-12-27 11:35:10,075 Grpc-462 ShuffleServerGrpcService getShuffleData - Successfully getShuffleData cost 0 ms for appId[application_id_20211227], shuffleId[0], partitionId[94] with 0 bytes and 0 blocks
from firestorm.
@wangyi2021 For shuffle write phase, we did implementation to mark task as successful if write data to any shuffle server successfully, but it is reverted because a lot of situations should be considered to make sure data is corrected. We have some ideas on better replica support already, and I hope it can be available next month.
from firestorm.
Related Issues (20)
- Whether multiple disks are supported for local storage? HOT 4
- duplicate servlets map in Coordinator Server
- 使用firestorm-0.4.0 运行spark3.1.1官方的JavaWordCount报如下错误,并且在yarn-client模式下driver端进程一直不退出 HOT 10
- What‘s the difference between `spark.rss.storage.type` and `rss.storage.type`? HOT 18
- yarn-client模式下driver端进程一直不退出 HOT 9
- In local mode, why directory should be deleted first? HOT 1
- [QUESTION] 依赖Hadoop环境? HOT 3
- [QUESTION] Executor在shuffle write/read 过程中是否落本地盘? HOT 2
- [Feature Request]Add a web UI in Coordinated Server to show the detailed server/job/metrics information HOT 1
- hardcoded relative paths HOT 6
- Whether local multiple replicas are supported? HOT 2
- Compared to the native spark, the shuffle write data of firestorm is always smaller HOT 2
- Unexpected crc value for blockId[474989042101783], expected:1518107711, actual:3331113690 HOT 5
- Shuffle read does not read all data completely? HOT 31
- Support shuffle data replica? HOT 5
- Coordinator HA problem HOT 6
- fault tolerance HOT 4
- Clear buffered data when acquiring memory failed and then retry
- To support more tasks with Firestorm
- how to enter into uniffle wechat or dingtalk?
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from firestorm.